[ 
https://issues.apache.org/jira/browse/NIFI-2865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15549944#comment-15549944
 ] 

ASF GitHub Bot commented on NIFI-2865:
--------------------------------------

Github user bbende commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1097#discussion_r82070115
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
 ---
    @@ -249,242 +241,141 @@
     
         @Override
         protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    -        return DESCRIPTORS;
    +        return PROPERTIES;
         }
     
         @Override
         protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
             return new PropertyDescriptor.Builder()
    -                .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
    -                .name(propertyDescriptorName).addValidator(new 
KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)).dynamic(true)
    -                .build();
    +            .description("Specifies the value for '" + 
propertyDescriptorName + "' Kafka Configuration.")
    +            .name(propertyDescriptorName)
    +            .addValidator(new 
KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
    +            .dynamic(true)
    +            .build();
         }
     
         @Override
         protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
             return 
KafkaProcessorUtils.validateCommonProperties(validationContext);
         }
     
    -    volatile KafkaPublisher kafkaPublisher;
    -
    -    /**
    -     * This thread-safe operation will delegate to
    -     * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after 
first
    -     * checking and creating (if necessary) Kafka resource which could be 
either
    -     * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close 
and
    -     * destroy the underlying Kafka resource upon catching an {@link 
Exception}
    -     * raised by {@link #rendezvousWithKafka(ProcessContext, 
ProcessSession)}.
    -     * After Kafka resource is destroyed it will be re-created upon the 
next
    -     * invocation of this operation essentially providing a self healing
    -     * mechanism to deal with potentially corrupted resource.
    -     * <p>
    -     * Keep in mind that upon catching an exception the state of this 
processor
    -     * will be set to no longer accept any more tasks, until Kafka 
resource is
    -     * reset. This means that in a multi-threaded situation currently 
executing
    -     * tasks will be given a chance to complete while no new tasks will be
    -     * accepted.
    -     *
    -     * @param context context
    -     * @param sessionFactory factory
    -     */
    -    @Override
    -    public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
    -        if (this.acceptTask) { // acts as a circuit breaker to allow 
existing tasks to wind down so 'kafkaPublisher' can be reset before new tasks 
are accepted.
    -            this.taskCounter.incrementAndGet();
    -            final ProcessSession session = sessionFactory.createSession();
    -            try {
    -                /*
    -                 * We can't be doing double null check here since as a 
pattern
    -                 * it only works for lazy init but not reset, which is 
what we
    -                 * are doing here. In fact the first null check is 
dangerous
    -                 * since 'kafkaPublisher' can become null right after its 
null
    -                 * check passed causing subsequent NPE.
    -                 */
    -                synchronized (this) {
    -                    if (this.kafkaPublisher == null) {
    -                        this.kafkaPublisher = 
this.buildKafkaResource(context, session);
    -                    }
    -                }
    -
    -                /*
    -                 * The 'processed' boolean flag does not imply any failure 
or success. It simply states that:
    -                 * - ConsumeKafka - some messages were received form Kafka 
and 1_ FlowFile were generated
    -                 * - PublishKafka0_10 - some messages were sent to Kafka 
based on existence of the input FlowFile
    -                 */
    -                boolean processed = this.rendezvousWithKafka(context, 
session);
    -                session.commit();
    -                if (!processed) {
    -                    context.yield();
    -                }
    -            } catch (Throwable e) {
    -                this.acceptTask = false;
    -                session.rollback(true);
    -                this.getLogger().error("{} failed to process due to {}; 
rolling back session", new Object[]{this, e});
    -            } finally {
    -                synchronized (this) {
    -                    if (this.taskCounter.decrementAndGet() == 0 && 
!this.acceptTask) {
    -                        this.close();
    -                        this.acceptTask = true;
    -                    }
    -                }
    -            }
    -        } else {
    -            this.logger.debug("Task was not accepted due to the processor 
being in 'reset' state. It will be re-submitted upon completion of the reset.");
    -            this.getLogger().debug("Task was not accepted due to the 
processor being in 'reset' state. It will be re-submitted upon completion of 
the reset.");
    -            context.yield();
    +    private synchronized PublisherPool getPublisherPool(final 
ProcessContext context) {
    +        PublisherPool pool = publisherPool;
    +        if (pool != null) {
    +            return pool;
             }
    +
    +        return publisherPool = createPublisherPool(context);
    +    }
    +
    +    protected PublisherPool createPublisherPool(final ProcessContext 
context) {
    +        final int maxMessageSize = 
context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
    +        final long maxAckWaitMillis = 
context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
    +
    +        final Map<String, Object> kafkaProperties = new HashMap<>();
    +        KafkaProcessorUtils.buildCommonKafkaProperties(context, 
ProducerConfig.class, kafkaProperties);
    +        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
    +        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
    +        kafkaProperties.put("max.request.size", 
String.valueOf(maxMessageSize));
    +
    +        return new PublisherPool(kafkaProperties, getLogger(), 
maxMessageSize, maxAckWaitMillis);
         }
     
    -    /**
    -     * Will call {@link Closeable#close()} on the target resource after 
which
    -     * the target resource will be set to null. Should only be called when 
there
    -     * are no more threads being executed on this processor or when it has 
been
    -     * verified that only a single thread remains.
    -     *
    -     * @see KafkaPublisher
    -     * @see KafkaConsumer
    -     */
         @OnStopped
    -    public void close() {
    -        try {
    -            if (this.kafkaPublisher != null) {
    -                try {
    -                    this.kafkaPublisher.close();
    -                } catch (Exception e) {
    -                    this.getLogger().warn("Failed while closing " + 
this.kafkaPublisher, e);
    -                }
    -            }
    -        } finally {
    -            this.kafkaPublisher = null;
    +    public void closePool() {
    +        if (publisherPool != null) {
    +            publisherPool.close();
             }
    +
    +        publisherPool = null;
         }
     
    -    /**
    -     * Will rendezvous with Kafka if {@link ProcessSession} contains
    -     * {@link FlowFile} producing a result {@link FlowFile}.
    -     * <br>
    -     * The result {@link FlowFile} that is successful is then transfered to
    -     * {@link #REL_SUCCESS}
    -     * <br>
    -     * The result {@link FlowFile} that is failed is then transfered to
    -     * {@link #REL_FAILURE}
    -     *
    -     */
    -    protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session) {
    -        FlowFile flowFile = session.get();
    -        if (flowFile != null) {
    -            long start = System.nanoTime();
    -            flowFile = this.doRendezvousWithKafka(flowFile, context, 
session);
    -            Relationship relationship = REL_SUCCESS;
    -            if (!this.isFailedFlowFile(flowFile)) {
    -                String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    -                long executionDuration = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
    -                String transitUri = 
KafkaProcessorUtils.buildTransitURI(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(),
 this.brokers, topic);
    -                session.getProvenanceReporter().send(flowFile, transitUri, 
"Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", 
executionDuration);
    -                this.getLogger().debug("Successfully sent {} to Kafka as 
{} message(s) in {} millis",
    -                        new Object[]{flowFile, 
flowFile.getAttribute(MSG_COUNT), executionDuration});
    -            } else {
    -                relationship = REL_FAILURE;
    -                flowFile = session.penalize(flowFile);
    -            }
    -            session.transfer(flowFile, relationship);
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        final boolean useDemarcator = 
context.getProperty(MESSAGE_DEMARCATOR).isSet();
    +
    +        final List<FlowFile> flowFiles = 
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 500));
    +        if (flowFiles.isEmpty()) {
    +            return;
             }
    -        return flowFile != null;
    -    }
     
    -    /**
    -     * Builds and instance of {@link KafkaPublisher}.
    -     */
    -    protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session) {
    -        final Map<String, String> kafkaProps = new HashMap<>();
    -        KafkaProcessorUtils.buildCommonKafkaProperties(context, 
ProducerConfig.class, kafkaProps);
    -        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
    -        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
    -        kafkaProps.put("max.request.size", 
String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()));
    -        this.brokers = 
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
    -        final Properties props = new Properties();
    -        props.putAll(kafkaProps);
    -        KafkaPublisher publisher = new KafkaPublisher(props, 
this.getLogger());
    -        return publisher;
    -    }
    +        final PublisherPool pool = getPublisherPool(context);
    +        if (pool == null) {
    +            context.yield();
    +            return;
    +        }
    +
    +        final String securityProtocol = 
context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
    +        final String bootstrapServers = 
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
    +
    +        final long startTime = System.nanoTime();
    +        try (final PublisherLease lease = pool.obtainPublisher()) {
    +            // Send each FlowFile to Kafka asynchronously.
    +            for (final FlowFile flowFile : flowFiles) {
    +                if (!isScheduled()) {
    +                    // If stopped, re-queue FlowFile instead of sending it
    +                    session.transfer(flowFile);
    +                    continue;
    +                }
     
    -    /**
    -     * Will rendezvous with {@link KafkaPublisher} after building
    -     * {@link PublishingContext} and will produce the resulting
    -     * {@link FlowFile}. The resulting FlowFile contains all required
    -     * information to determine if message publishing originated from the
    -     * provided FlowFile has actually succeeded fully, partially or failed
    -     * completely (see {@link #isFailedFlowFile(FlowFile)}.
    -     */
    -    private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final 
ProcessContext context, final ProcessSession session) {
    -        final AtomicReference<KafkaPublisher.KafkaPublisherResult> 
publishResultRef = new AtomicReference<>();
    -        session.read(flowFile, new InputStreamCallback() {
    -            @Override
    -            public void process(InputStream contentStream) throws 
IOException {
    -                PublishingContext publishingContext = 
PublishKafka_0_10.this.buildPublishingContext(flowFile, context, contentStream);
    -                KafkaPublisher.KafkaPublisherResult result = 
PublishKafka_0_10.this.kafkaPublisher.publish(publishingContext);
    -                publishResultRef.set(result);
    +                final byte[] messageKey = getMessageKey(flowFile, context);
    +                final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    +                final byte[] demarcatorBytes;
    +                if (useDemarcator) {
    +                    demarcatorBytes = 
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
    +                } else {
    +                    demarcatorBytes = null;
    +                }
    +
    +                session.read(flowFile, new InputStreamCallback() {
    +                    @Override
    +                    public void process(final InputStream rawIn) throws 
IOException {
    +                        try (final InputStream in = new 
BufferedInputStream(rawIn)) {
    +                            lease.publish(flowFile, in, messageKey, 
demarcatorBytes, topic);
    +                        }
    +                    }
    +                });
                 }
    -        });
     
    -        FlowFile resultFile = publishResultRef.get().isAllAcked()
    -                ? this.cleanUpFlowFileIfNecessary(flowFile, session)
    -                : session.putAllAttributes(flowFile, 
this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(),
 flowFile, context));
    +            // Complete the send
    +            final PublishResult publishResult = lease.complete();
     
    -        if (!this.isFailedFlowFile(resultFile)) {
    -            resultFile = session.putAttribute(resultFile, MSG_COUNT, 
String.valueOf(publishResultRef.get().getMessagesSent()));
    -        }
    -        return resultFile;
    -    }
    +            // Transfer any successful FlowFiles.
    +            final long transmissionMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
    +            for (FlowFile success : 
publishResult.getSuccessfulFlowFiles()) {
    +                final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
     
    -    /**
    -     * Builds {@link PublishingContext} for message(s) to be sent to Kafka.
    -     * {@link PublishingContext} contains all contextual information 
required by
    -     * {@link KafkaPublisher} to publish to Kafka. Such information 
contains
    -     * things like topic name, content stream, delimiter, key and last 
ACKed
    -     * message for cases where provided FlowFile is being retried (failed 
in the
    -     * past).
    -     * <br>
    -     * For the clean FlowFile (file that has been sent for the first time),
    -     * PublishingContext will be built form {@link ProcessContext} 
associated
    -     * with this invocation.
    -     * <br>
    -     * For the failed FlowFile, {@link PublishingContext} will be built 
from
    -     * attributes of that FlowFile which by then will already contain 
required
    -     * information (e.g., topic, key, delimiter etc.). This is required to
    -     * ensure the affinity of the retry in the even where processor
    -     * configuration has changed. However keep in mind that failed 
FlowFile is
    -     * only considered a failed FlowFile if it is being re-processed by 
the same
    -     * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
    -     * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being 
sent to
    -     * another PublishKafka0_10 processor it is treated as a fresh FlowFile
    -     * regardless if it has #FAILED* attributes set.
    -     */
    -    private PublishingContext buildPublishingContext(FlowFile flowFile, 
ProcessContext context, InputStream contentStream) {
    -        final byte[] keyBytes = getMessageKey(flowFile, context);
    -
    -        final String topicName;
    -        final byte[] delimiterBytes;
    -        int lastAckedMessageIndex = -1;
    -        if (this.isFailedFlowFile(flowFile)) {
    -            lastAckedMessageIndex = 
Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
    -            topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
    -            delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) 
!= null
    -                    ? 
flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : 
null;
    -        } else {
    -            topicName = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
    -            delimiterBytes = 
context.getProperty(MESSAGE_DEMARCATOR).isSet() ? 
context.getProperty(MESSAGE_DEMARCATOR)
    -                    
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8)
 : null;
    -        }
    +                final int msgCount = 
publishResult.getSuccessfulMessageCount(success);
    +                success = session.putAttribute(success, MSG_COUNT, 
String.valueOf(msgCount));
    +                session.adjustCounter("Messages Sent", msgCount, true);
    +
    +                final String transitUri = 
KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
    +                session.getProvenanceReporter().send(success, transitUri, 
"Sent " + msgCount + " messages", transmissionMillis);
    +                session.transfer(success, REL_SUCCESS);
    +            }
    +
    +            // Transfer any failures.
    +            for (final FlowFile failure : 
publishResult.getFailedFlowFiles()) {
    +                final int successCount = 
publishResult.getSuccessfulMessageCount(failure);
    +                if (successCount > 0) {
    +                    getLogger().error("Failed to send all message for {} 
to Kafka, but {} messages were acknowledged by Kafka. Routing to failure due to 
{}",
    --- End diff --
    
    This logging statement looks like it was meant to include successCount 
since it has 3 parameters, but only 2 passed in. It might also read better if 
it said "Failed to send some messages for {} to Kafka, {} messages were 
acknowledged" or something like that.


> Address issues of PublishKafka blocking when having trouble communicating 
> with Kafka broker and improve performance
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: NIFI-2865
>                 URL: https://issues.apache.org/jira/browse/NIFI-2865
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>             Fix For: 1.1.0
>
>
> When NiFi is unable to communicate properly with the Kafka broker, we see the 
> NiFi threads occasionally block. This should be resolvable by calling the 
> wakeup() method of the client. Additionally, if Kafka takes too long to 
> respond, we should be able to route the FlowFile to failure and move on.
> PublishKafka has a nice feature that allows a demarcated stream to be sent as 
> separate messages, so that a large number of messages can be sent as a single 
> FlowFile. However, in the case of individual messages per FlowFile, the 
> performance could be improved by batching together multiple FlowFiles per 
> session



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to