[
https://issues.apache.org/jira/browse/NIFI-1645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206799#comment-15206799
]
ASF GitHub Bot commented on NIFI-1645:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/295#discussion_r57026907
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
---
@@ -446,444 +336,136 @@ protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String
.build();
}
-
@Override
- public void onTrigger(final ProcessContext context, final
ProcessSessionFactory sessionFactory) throws ProcessException {
- FlowFileMessageBatch batch;
- while ((batch = completeBatches.poll()) != null) {
- batch.completeSession();
- }
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
- final ProcessSession session = sessionFactory.createSession();
- final FlowFile flowFile = session.get();
- if (flowFile != null){
- Future<Void> consumptionFuture = this.executor.submit(new
Callable<Void>() {
- @Override
- public Void call() throws Exception {
- doOnTrigger(context, session, flowFile);
- return null;
- }
- });
- try {
- consumptionFuture.get(this.deadlockTimeout,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- consumptionFuture.cancel(true);
- Thread.currentThread().interrupt();
- getLogger().warn("Interrupted while sending messages", e);
- } catch (ExecutionException e) {
- throw new IllegalStateException(e);
- } catch (TimeoutException e) {
- consumptionFuture.cancel(true);
- getLogger().warn("Timed out after " + this.deadlockTimeout
+ " milliseconds while sending messages", e);
- }
- } else {
- context.yield();
+ final String partitionStrategy =
validationContext.getProperty(PARTITION_STRATEGY).getValue();
+ if
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())
+ && !validationContext.getProperty(PARTITION).isSet()) {
+ results.add(new
ValidationResult.Builder().subject("Partition").valid(false)
+ .explanation("The <Partition> property must be set
when configured to use the User-Defined Partitioning Strategy")
+ .build());
}
+ return results;
}
- private void doOnTrigger(final ProcessContext context, ProcessSession
session, final FlowFile flowFile) throws ProcessException {
- final String topic =
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
- final String key =
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
- final byte[] keyBytes = key == null ? null :
key.getBytes(StandardCharsets.UTF_8);
- String delimiter =
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
- if (delimiter != null) {
- delimiter = delimiter.replace("\\n", "\n").replace("\\r",
"\r").replace("\\t", "\t");
- }
-
- final Producer<byte[], byte[]> producer = getProducer();
-
- if (delimiter == null) {
- // Send the entire FlowFile as a single message.
- final byte[] value = new byte[(int) flowFile.getSize()];
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws
IOException {
- StreamUtils.fillBuffer(in, value);
- }
- });
-
- final Integer partition;
- try {
- partition = getPartition(context, flowFile, topic);
- } catch (final Exception e) {
- getLogger().error("Failed to obtain a partition for {} due
to {}", new Object[] {flowFile, e});
- session.transfer(session.penalize(flowFile), REL_FAILURE);
- session.commit();
- return;
- }
-
- final ProducerRecord<byte[], byte[]> producerRecord = new
ProducerRecord<>(topic, partition, keyBytes, value);
-
- final FlowFileMessageBatch messageBatch = new
FlowFileMessageBatch(session, flowFile, topic);
- messageBatch.setNumMessages(1);
- activeBatches.add(messageBatch);
-
- try {
- producer.send(producerRecord, new Callback() {
- @Override
- public void onCompletion(final RecordMetadata
metadata, final Exception exception) {
- if (exception == null) {
- // record was successfully sent.
- messageBatch.addSuccessfulRange(0L,
flowFile.getSize(), metadata.offset());
- } else {
- messageBatch.addFailedRange(0L,
flowFile.getSize(), exception);
- }
- }
- });
- } catch (final BufferExhaustedException bee) {
- messageBatch.addFailedRange(0L, flowFile.getSize(), bee);
- context.yield();
- return;
- }
- } else {
- final byte[] delimiterBytes =
delimiter.getBytes(StandardCharsets.UTF_8);
-
- // The NonThreadSafeCircularBuffer allows us to add a byte
from the stream one at a time and see
- // if it matches some pattern. We can use this to search for
the delimiter as we read through
- // the stream of bytes in the FlowFile
- final NonThreadSafeCircularBuffer buffer = new
NonThreadSafeCircularBuffer(delimiterBytes);
-
- final LongHolder messagesSent = new LongHolder(0L);
- final FlowFileMessageBatch messageBatch = new
FlowFileMessageBatch(session, flowFile, topic);
- activeBatches.add(messageBatch);
-
- try (final ByteArrayOutputStream baos = new
ByteArrayOutputStream()) {
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream rawIn) throws
IOException {
- byte[] data = null; // contents of a single message
-
- boolean streamFinished = false;
-
- int nextByte;
- try (final InputStream bufferedIn = new
BufferedInputStream(rawIn);
- final ByteCountingInputStream in = new
ByteCountingInputStream(bufferedIn)) {
-
- long messageStartOffset =
in.getBytesConsumed();
-
- // read until we're out of data.
- while (!streamFinished) {
- nextByte = in.read();
-
- if (nextByte > -1) {
- baos.write(nextByte);
- }
-
- if (nextByte == -1) {
- // we ran out of data. This message is
complete.
- data = baos.toByteArray();
- streamFinished = true;
- } else if (buffer.addAndCompare((byte)
nextByte)) {
- // we matched our delimiter. This
message is complete. We want all of the bytes from the
- // underlying BAOS exception for the
last 'delimiterBytes.length' bytes because we don't want
- // the delimiter itself to be sent.
- data =
Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() -
delimiterBytes.length);
- }
-
- if (data != null) {
- final long messageEndOffset =
in.getBytesConsumed();
-
- // If the message has no data, ignore
it.
- if (data.length != 0) {
- final Integer partition;
- try {
- partition =
getPartition(context, flowFile, topic);
- } catch (final Exception e) {
-
messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e);
- getLogger().error("Failed to
obtain a partition for {} due to {}", new Object[] {flowFile, e});
- continue;
- }
-
-
- final ProducerRecord<byte[],
byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data);
- final long rangeStart =
messageStartOffset;
-
- try {
- producer.send(producerRecord,
new Callback() {
- @Override
- public void
onCompletion(final RecordMetadata metadata, final Exception exception) {
- if (exception == null)
{
- // record was
successfully sent.
-
messageBatch.addSuccessfulRange(rangeStart, messageEndOffset,
metadata.offset());
- } else {
-
messageBatch.addFailedRange(rangeStart, messageEndOffset, exception);
- }
- }
- });
-
- messagesSent.incrementAndGet();
- } catch (final
BufferExhaustedException bee) {
- // Not enough room in the
buffer. Add from the beginning of this message to end of FlowFile as a failed
range
-
messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee);
- context.yield();
- return;
- }
-
- }
-
- // reset BAOS so that we can start a
new message.
- baos.reset();
- data = null;
- messageStartOffset =
in.getBytesConsumed();
- }
- }
- }
- }
- });
-
- messageBatch.setNumMessages(messagesSent.get());
- }
+ /**
+ *
+ */
+ private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile,
ProcessSession session) {
+ if (flowFile.getAttribute(ATTR_FAILED_SEGMENTS) != null) {
+ flowFile = session.removeAttribute(flowFile,
ATTR_FAILED_SEGMENTS);
+ flowFile = session.removeAttribute(flowFile, ATTR_KEY);
+ flowFile = session.removeAttribute(flowFile, ATTR_TOPIC);
+ flowFile = session.removeAttribute(flowFile, ATTR_DELIMITER);
}
+ return flowFile;
}
+ /**
+ *
+ */
+ private Object determinePartition(SplittableMessageContext
messageContext, ProcessContext context, FlowFile flowFile) {
+ String partitionStrategy =
context.getProperty(PARTITION_STRATEGY).getValue();
+ String partitionValue = null;
+ if
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) {
+ partitionValue =
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+ }
+ return partitionValue;
+ }
- private static class Range {
- private final long start;
- private final long end;
- private final Long kafkaOffset;
-
- public Range(final long start, final long end, final Long
kafkaOffset) {
- this.start = start;
- this.end = end;
- this.kafkaOffset = kafkaOffset;
- }
-
- public long getStart() {
- return start;
- }
-
- public long getEnd() {
- return end;
- }
-
- public Long getKafkaOffset() {
- return kafkaOffset;
- }
-
- @Override
- public String toString() {
- return "Range[" + start + "-" + end + "]";
+ /**
+ *
+ */
+ private Map<String, String>
buildFailedFlowFileAttributes(List<Integer> failedSegments,
+ SplittableMessageContext messageContext) {
+ StringBuffer buffer = new StringBuffer();
--- End diff --
I think we should use StringBuilder rather than StringBuffer here.
> When using delimited data feature PutKafka ack'd ranges feature can break
> -------------------------------------------------------------------------
>
> Key: NIFI-1645
> URL: https://issues.apache.org/jira/browse/NIFI-1645
> Project: Apache NiFi
> Issue Type: Bug
> Reporter: Oleg Zhurakousky
> Assignee: Oleg Zhurakousky
> Fix For: 0.6.0
>
>
> When using the delimited lines feature to send data to Kafka such that a
> large set of lines that appear to be one 'flowfile' in NiFi is sent as a
> series of 1..N messages in Kafka the mechanism of asynchronous
> acknowledgement can break down whereby we will receive acknowledgements but
> be unable to act on them appropriately because by then the session/data would
> have already been considered successfully transferred. This could in
> certain/specific conditions mean failed acknowledgements would not result in
> a retransfer.
> The logic this processor supports for creating child objects to address
> failed/partial segments is extremely complicated and should likely be
> rewritten to be greatly simplified. Instead the SplitText feature should be
> used to create more manageable chunks of data over which if any segment is
> ack'd as a failure then the whole thing is failed and thus can be
> retransmitted. Always best to enable the user to prefer data loss or data
> duplication on their own terms.
> Below is the relevant stack trace
> {code}
> 17:12:37 EDTERROR6162d00f-737f-3710-85f9-318c886af95f
> clpen0004.foo.com:8090PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f]
> PutKafka[id=6162d00f-737f-3710-85f9-318c886af95f] failed to process session
> due to java.lang.IllegalStateException:
> java.util.concurrent.ExecutionException:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1458158883054-93724,
> container=cont2, section=540], offset=756882,
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in
> this session (StandardProcessSession[id=97534]):
> java.lang.IllegalStateException: java.util.concurrent.ExecutionException:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
> StandardFlowFileRecord[uuid=a9a7f10d-674e-421f-80f2-7fc0e28a0d1d,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1458158883054-93724,
> container=cont2, section=540], offset=756882,
> length=6107144],offset=0,name=1648095619968535,size=6107144] is not known in
> this session (StandardProcessSession[id=97534])
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)