[
https://issues.apache.org/jira/browse/NIFI-1645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15207657#comment-15207657
]
ASF GitHub Bot commented on NIFI-1645:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/295#discussion_r57096148
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
---
@@ -104,21 +128,52 @@ BitSet publish(SplittableMessageContext
messageContext, InputStream contentStrea
byte[] content = scanner.next().getBytes();
if (content.length > 0){
byte[] key = messageContext.getKeyBytes();
- partitionKey = partitionKey == null ? key :
partitionKey;// the whole thing may still be null
String topicName = messageContext.getTopicName();
+ if (partitionKey == null && key != null) {
+ partitionKey = this.getPartition(key, topicName);
+ }
if (prevFailedSegmentIndexes == null ||
prevFailedSegmentIndexes.get(segmentCounter)) {
- KeyedMessage<byte[], byte[]> message = new
KeyedMessage<byte[], byte[]>(topicName, key, partitionKey, content);
- if (!this.toKafka(message)) {
- failedSegments.set(segmentCounter);
- }
+ ProducerRecord<byte[], byte[]> message = new
ProducerRecord<byte[], byte[]>(topicName, partitionKey, key, content);
+ sendFutures.add(this.toKafka(message));
}
}
segmentCounter++;
}
}
+ segmentCounter = 0;
+ for (Future<RecordMetadata> future : sendFutures) {
+ try {
+ future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
--- End diff --
Here, we could end up waiting the ack wait time for each one of the
messages. If we sent 1,000 messages we could wait 1,000 times the length of the
ack wait time. Perhaps we should instead do something like:
```
final long endTime = System.currentTimeMillis() + ackWaitTime;
for (Future<RecordMetadata> future : sendFutures) {
long toWait = endTime - System.currentTimeMillis();
if (toWait < 0 && !future.isDone() {
// consider failure
} else {
try {
future.get(toWait, TimeUnit.MILLISECONDS);
}
catch (...) {
...
}
}
}
```
> When using delimited data feature PutKafka ack'd ranges feature can break
> -------------------------------------------------------------------------
>
> Key: NIFI-1645
> URL: https://issues.apache.org/jira/browse/NIFI-1645
> Project: Apache NiFi
> Issue Type: Bug
> Reporter: Oleg Zhurakousky
> Assignee: Oleg Zhurakousky
> Fix For: 0.6.0
>
>
> When using the delimited lines feature to send data to Kafka such that a
> large set of lines that appear to be one 'flowfile' in NiFi is sent as a
> series of 1..N messages in Kafka the mechanism of asynchronous
> acknowledgement can break down whereby we will receive acknowledgements but
> be unable to act on them appropriately because by then the session/data would
> have already been considered successfully transferred. This could in
> certain/specific conditions mean failed acknowledgements would not result in
> a retransfer.
> The logic this processor supports for creating child objects to address
> failed/partial segments is extremely complicated and should likely be
> rewritten to be greatly simplified. Instead the SplitText feature should be
> used to create more manageable chunks of data over which if any segment is
> ack'd as a failure then the whole thing is failed and thus can be
> retransmitted. Always best to enable the user to prefer data loss or data
> duplication on their own terms.
> Below is the relevant stack trace
> {code}
> 17:12:37 EDTERROR6162d00f-737f-3710-85f9-318c886af95f
> clpen0004.foo.com:8090PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f]
> PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] failed to process session
> due to java.lang.IllegalStateException:
> java.util.concurrent.ExecutionException:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1458158883054-93724,
> container=cont2, section=540], offset=756882,
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in
> this session (StandardProcessSession[id=97534]):
> java.lang.IllegalStateException: java.util.concurrent.ExecutionException:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1458158883054-93724,
> container=cont2, section=540], offset=756882,
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in
> this session (StandardProcessSession[id=97534])
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)