[
https://issues.apache.org/jira/browse/NIFI-1645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206694#comment-15206694
]
ASF GitHub Bot commented on NIFI-1645:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/295#discussion_r57017510
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Scanner;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Partitioner;
+import kafka.producer.ProducerConfig;
+
+/**
+ * Wrapper over {@link KafkaProducer} to assist {@link PutKafka} processor
with
+ * sending content of {@link FlowFile}s to Kafka.
+ */
+public class KafkaPublisher implements AutoCloseable {
+
+ private static final Logger logger =
LoggerFactory.getLogger(KafkaPublisher.class);
+
+ private final Producer<byte[], byte[]> producer;
+
+ private ProcessorLog processLog;
+
+ /**
+ * Creates an instance of this class as well as the instance of the
+ * corresponding Kafka {@link KafkaProducer} using provided Kafka
+ * configuration properties.
+ */
+ KafkaPublisher(Properties kafkaProperties) {
+ ProducerConfig producerConfig = new
ProducerConfig(kafkaProperties);
+ this.producer = new Producer<>(producerConfig);
+ }
+
+ /**
+ *
+ */
+ void setProcessLog(ProcessorLog processLog) {
+ this.processLog = processLog;
+ }
+
+ /**
+ * Publishes messages to Kafka topic. It supports three publishing
+ * mechanisms.
+ * <ul>
+ * <li>Sending the entire content stream as a single Kafka
message.</li>
+ * <li>Splitting the incoming content stream into chunks and sending
+ * individual chunks as separate Kafka messages.</li>
+ * <li>Splitting the incoming content stream into chunks and sending
only
+ * the chunks that have failed previously @see
+ * {@link SplittableMessageContext#getFailedSegments()}.</li>
+ * </ul>
+ * This method assumes content stream affinity where it is expected
that the
+ * content stream that represents the same Kafka message(s) will
remain the
+ * same across possible retries. This is required specifically for
cases
+ * where delimiter is used and a single content stream may represent
+ * multiple Kafka messages. The failed segment list will keep the
index of
+ * of each content stream segment that had failed to be sent to Kafka,
so
+ * upon retry only the failed segments are sent.
+ *
+ * @param messageContext
+ * instance of {@link SplittableMessageContext} which hold
+ * context information about the message to be sent
+ * @param contentStream
+ * instance of open {@link InputStream} carrying the
content of
+ * the message(s) to be send to Kafka
+ * @param partitionKey
+ * the value of the partition key. Only relevant is user
wishes
+ * to provide a custom partition key instead of relying on
+ * variety of provided {@link Partitioner}(s)
+ * @return The list containing the failed segment indexes for messages
that
+ * failed to be sent to Kafka.
+ */
+ List<Integer> publish(SplittableMessageContext messageContext,
InputStream contentStream, Object partitionKey) {
+ List<Integer> prevFailedSegmentIndexes =
messageContext.getFailedSegments();
+ List<Integer> failedSegments = new ArrayList<>();
+ int segmentCounter = 0;
+ try (Scanner scanner = new Scanner(contentStream)) {
+ scanner.useDelimiter(messageContext.getDelimiterPattern());
+ while (scanner.hasNext()) {
+ //TODO Improve content InputStream so it's skip supported
so one can zoom straight to the correct segment
+ byte[] content = scanner.next().getBytes();
+ if (content.length > 0){
+ byte[] key = messageContext.getKeyBytes();
+ partitionKey = partitionKey == null ? key :
partitionKey;// the whole thing may still be null
+ String topicName = messageContext.getTopicName();
+ if (prevFailedSegmentIndexes != null) {
--- End diff --
Can we simplify this code a bit and just use:
`if (prevFailedSegmentIndexes == null ||
prevFailedSegmentIndexes.contains(segmentCounter)) {`
instead of replicating those 3 lines of code?
> When using delimited data feature PutKafka ack'd ranges feature can break
> -------------------------------------------------------------------------
>
> Key: NIFI-1645
> URL: https://issues.apache.org/jira/browse/NIFI-1645
> Project: Apache NiFi
> Issue Type: Bug
> Reporter: Oleg Zhurakousky
> Assignee: Oleg Zhurakousky
> Fix For: 0.7.0
>
>
> When using the delimited lines feature to send data to Kafka such that a
> large set of lines that appear to be one 'flowfile' in NiFi is sent as a
> series of 1..N messages in Kafka the mechanism of asynchronous
> acknowledgement can break down whereby we will receive acknowledgements but
> be unable to act on them appropriately because by then the session/data would
> have already been considered successfully transferred. This could in
> certain/specific conditions mean failed acknowledgements would not result in
> a retransfer.
> The logic this processor supports for creating child objects to address
> failed/partial segments is extremely complicated and should likely be
> rewritten to be greatly simplified. Instead the SplitText feature should be
> used to create more manageable chunks of data over which if any segment is
> ack'd as a failure then the whole thing is failed and thus can be
> retransmitted. Always best to enable the user to prefer data loss or data
> duplication on their own terms.
> Below is the relevant stack trace
> {code}
> 17:12:37 EDTERROR6162d00f-737f-3710-85f9-318c886af95f
> clpen0004.foo.com:8090PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f]
> PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] failed to process session
> due to java.lang.IllegalStateException:
> java.util.concurrent.ExecutionException:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1458158883054-93724,
> container=cont2, section=540], offset=756882,
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in
> this session (StandardProcessSession[id=97534]):
> java.lang.IllegalStateException: java.util.concurrent.ExecutionException:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1458158883054-93724,
> container=cont2, section=540], offset=756882,
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in
> this session (StandardProcessSession[id=97534])
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)