[ 
https://issues.apache.org/jira/browse/NIFI-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254527#comment-15254527
 ] 

ASF GitHub Bot commented on NIFI-1296:
--------------------------------------

Github user joewitt commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/366#discussion_r60792649
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
 ---
    @@ -78,152 +81,182 @@
         }
     
         /**
    -     *
    -     */
    -    void setProcessLog(ProcessorLog processLog) {
    -        this.processLog = processLog;
    -    }
    -
    -    /**
    -     * Publishes messages to Kafka topic. It supports three publishing
    -     * mechanisms.
    +     * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} 
to
    +     * determine how many messages to Kafka will be sent from a provided
    +     * {@link InputStream} (see {@link 
PublishingContext#getContentStream()}).
    +     * It supports two publishing modes:
          * <ul>
    -     * <li>Sending the entire content stream as a single Kafka 
message.</li>
    -     * <li>Splitting the incoming content stream into chunks and sending
    -     * individual chunks as separate Kafka messages.</li>
    -     * <li>Splitting the incoming content stream into chunks and sending 
only
    -     * the chunks that have failed previously @see
    -     * {@link SplittableMessageContext#getFailedSegments()}.</li>
    +     * <li>Sending all messages constructed from
    +     * {@link StreamDemarcator#nextToken()} operation.</li>
    +     * <li>Sending only unacknowledged messages constructed from
    +     * {@link StreamDemarcator#nextToken()} operation.</li>
          * </ul>
    +     * The unacknowledged messages are determined from the value of
    +     * {@link PublishingContext#getLastAckedMessageIndex()}.
    +     * <br>
          * This method assumes content stream affinity where it is expected 
that the
          * content stream that represents the same Kafka message(s) will 
remain the
          * same across possible retries. This is required specifically for 
cases
          * where delimiter is used and a single content stream may represent
    -     * multiple Kafka messages. The failed segment list will keep the 
index of
    -     * of each content stream segment that had failed to be sent to Kafka, 
so
    -     * upon retry only the failed segments are sent.
    +     * multiple Kafka messages. The
    +     * {@link PublishingContext#getLastAckedMessageIndex()} will provide 
the
    +     * index of the last ACKed message, so upon retry only messages with 
the
    +     * higher index are sent.
          *
    -     * @param messageContext
    -     *            instance of {@link SplittableMessageContext} which hold
    -     *            context information about the message to be sent
    -     * @param contentStream
    -     *            instance of open {@link InputStream} carrying the 
content of
    -     *            the message(s) to be send to Kafka
    -     * @param partitionKey
    -     *            the value of the partition key. Only relevant is user 
wishes
    -     *            to provide a custom partition key instead of relying on
    -     *            variety of provided {@link Partitioner}(s)
    -     * @param maxBufferSize maximum message size
    -     * @return The set containing the failed segment indexes for messages 
that
    -     *         failed to be sent to Kafka.
    +     * @param publishingContext
    +     *            instance of {@link PublishingContext} which hold context
    +     *            information about the message(s) to be sent.
    +     * @return The index of the last successful offset.
          */
    -    BitSet publish(SplittableMessageContext messageContext, InputStream 
contentStream, Integer partitionKey,
    -            int maxBufferSize) {
    -        List<Future<RecordMetadata>> sendFutures = 
this.split(messageContext, contentStream, partitionKey, maxBufferSize);
    -        return this.publish(sendFutures);
    +    KafkaPublisherResult publish(PublishingContext publishingContext) {
    +        StreamDemarcator streamTokenizer = new 
StreamDemarcator(publishingContext.getContentStream(),
    +                publishingContext.getDelimiterBytes(), 
publishingContext.getMaxRequestSize());
    +
    +        int prevLastAckedMessageIndex = 
publishingContext.getLastAckedMessageIndex();
    +        List<Future<RecordMetadata>> resultFutures = new ArrayList<>();
    +
    +        byte[] messageBytes;
    +        int tokenCounter = 0;
    +        for (; (messageBytes = streamTokenizer.nextToken()) != null; 
tokenCounter++) {
    +            if (prevLastAckedMessageIndex < tokenCounter) {
    +                Integer partitionId = publishingContext.getPartitionId();
    +                if (partitionId == null && publishingContext.getKeyBytes() 
!= null) {
    +                    partitionId = 
this.getPartition(publishingContext.getKeyBytes(), 
publishingContext.getTopic());
    +                }
    +                ProducerRecord<byte[], byte[]> message =
    +                        new ProducerRecord<>(publishingContext.getTopic(), 
publishingContext.getPartitionId(), publishingContext.getKeyBytes(), 
messageBytes);
    +                resultFutures.add(this.kafkaProducer.send(message));
    +            }
    +        }
    +
    +        tokenCounter -= 1;
    --- End diff --
    
    this decrement then increment appears to be left over from some previous or 
incomplete logic.  Probably can remove this line and drop the increment that 
follows, right?


> Add capabilities to Kafka NAR to use new Kafka API (0.9)
> --------------------------------------------------------
>
>                 Key: NIFI-1296
>                 URL: https://issues.apache.org/jira/browse/NIFI-1296
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>    Affects Versions: 0.4.0
>            Reporter: Oleg Zhurakousky
>            Assignee: Oleg Zhurakousky
>             Fix For: 0.7.0
>
>
> Not sure when can we address this, but the interesting comment in 
> https://github.com/apache/nifi/pull/143. The usage of new API may introduce 
> issues with running against older Kafka brokers (e.g., 0.8). Need to 
> investigate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to