[
https://issues.apache.org/jira/browse/NIFI-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15254527#comment-15254527
]
ASF GitHub Bot commented on NIFI-1296:
--------------------------------------
Github user joewitt commented on a diff in the pull request:
https://github.com/apache/nifi/pull/366#discussion_r60792649
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
---
@@ -78,152 +81,182 @@
}
/**
- *
- */
- void setProcessLog(ProcessorLog processLog) {
- this.processLog = processLog;
- }
-
- /**
- * Publishes messages to Kafka topic. It supports three publishing
- * mechanisms.
+ * Publishes messages to Kafka topic. It uses {@link StreamDemarcator}
to
+ * determine how many messages to Kafka will be sent from a provided
+ * {@link InputStream} (see {@link
PublishingContext#getContentStream()}).
+ * It supports two publishing modes:
* <ul>
- * <li>Sending the entire content stream as a single Kafka
message.</li>
- * <li>Splitting the incoming content stream into chunks and sending
- * individual chunks as separate Kafka messages.</li>
- * <li>Splitting the incoming content stream into chunks and sending
only
- * the chunks that have failed previously @see
- * {@link SplittableMessageContext#getFailedSegments()}.</li>
+ * <li>Sending all messages constructed from
+ * {@link StreamDemarcator#nextToken()} operation.</li>
+ * <li>Sending only unacknowledged messages constructed from
+ * {@link StreamDemarcator#nextToken()} operation.</li>
* </ul>
+ * The unacknowledged messages are determined from the value of
+ * {@link PublishingContext#getLastAckedMessageIndex()}.
+ * <br>
* This method assumes content stream affinity where it is expected
that the
* content stream that represents the same Kafka message(s) will
remain the
* same across possible retries. This is required specifically for
cases
* where delimiter is used and a single content stream may represent
- * multiple Kafka messages. The failed segment list will keep the
index of
- * of each content stream segment that had failed to be sent to Kafka,
so
- * upon retry only the failed segments are sent.
+ * multiple Kafka messages. The
+ * {@link PublishingContext#getLastAckedMessageIndex()} will provide
the
+ * index of the last ACKed message, so upon retry only messages with
the
+ * higher index are sent.
*
- * @param messageContext
- * instance of {@link SplittableMessageContext} which hold
- * context information about the message to be sent
- * @param contentStream
- * instance of open {@link InputStream} carrying the
content of
- * the message(s) to be send to Kafka
- * @param partitionKey
- * the value of the partition key. Only relevant is user
wishes
- * to provide a custom partition key instead of relying on
- * variety of provided {@link Partitioner}(s)
- * @param maxBufferSize maximum message size
- * @return The set containing the failed segment indexes for messages
that
- * failed to be sent to Kafka.
+ * @param publishingContext
+ * instance of {@link PublishingContext} which hold context
+ * information about the message(s) to be sent.
+ * @return The index of the last successful offset.
*/
- BitSet publish(SplittableMessageContext messageContext, InputStream
contentStream, Integer partitionKey,
- int maxBufferSize) {
- List<Future<RecordMetadata>> sendFutures =
this.split(messageContext, contentStream, partitionKey, maxBufferSize);
- return this.publish(sendFutures);
+ KafkaPublisherResult publish(PublishingContext publishingContext) {
+ StreamDemarcator streamTokenizer = new
StreamDemarcator(publishingContext.getContentStream(),
+ publishingContext.getDelimiterBytes(),
publishingContext.getMaxRequestSize());
+
+ int prevLastAckedMessageIndex =
publishingContext.getLastAckedMessageIndex();
+ List<Future<RecordMetadata>> resultFutures = new ArrayList<>();
+
+ byte[] messageBytes;
+ int tokenCounter = 0;
+ for (; (messageBytes = streamTokenizer.nextToken()) != null;
tokenCounter++) {
+ if (prevLastAckedMessageIndex < tokenCounter) {
+ Integer partitionId = publishingContext.getPartitionId();
+ if (partitionId == null && publishingContext.getKeyBytes()
!= null) {
+ partitionId =
this.getPartition(publishingContext.getKeyBytes(),
publishingContext.getTopic());
+ }
+ ProducerRecord<byte[], byte[]> message =
+ new ProducerRecord<>(publishingContext.getTopic(),
publishingContext.getPartitionId(), publishingContext.getKeyBytes(),
messageBytes);
+ resultFutures.add(this.kafkaProducer.send(message));
+ }
+ }
+
+ tokenCounter -= 1;
--- End diff --
this decrement then increment appears to be left over from some previous or
incomplete logic. Probably can remove this line and drop the increment that
follows, right?
> Add capabilities to Kafka NAR to use new Kafka API (0.9)
> --------------------------------------------------------
>
> Key: NIFI-1296
> URL: https://issues.apache.org/jira/browse/NIFI-1296
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Affects Versions: 0.4.0
> Reporter: Oleg Zhurakousky
> Assignee: Oleg Zhurakousky
> Fix For: 0.7.0
>
>
> Not sure when can we address this, but the interesting comment in
> https://github.com/apache/nifi/pull/143. The usage of new API may introduce
> issues with running against older Kafka brokers (e.g., 0.8). Need to
> investigate.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)