Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/295#discussion_r57026907
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
---
@@ -446,444 +336,136 @@ protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String
.build();
}
-
@Override
- public void onTrigger(final ProcessContext context, final
ProcessSessionFactory sessionFactory) throws ProcessException {
- FlowFileMessageBatch batch;
- while ((batch = completeBatches.poll()) != null) {
- batch.completeSession();
- }
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
- final ProcessSession session = sessionFactory.createSession();
- final FlowFile flowFile = session.get();
- if (flowFile != null){
- Future<Void> consumptionFuture = this.executor.submit(new
Callable<Void>() {
- @Override
- public Void call() throws Exception {
- doOnTrigger(context, session, flowFile);
- return null;
- }
- });
- try {
- consumptionFuture.get(this.deadlockTimeout,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- consumptionFuture.cancel(true);
- Thread.currentThread().interrupt();
- getLogger().warn("Interrupted while sending messages", e);
- } catch (ExecutionException e) {
- throw new IllegalStateException(e);
- } catch (TimeoutException e) {
- consumptionFuture.cancel(true);
- getLogger().warn("Timed out after " + this.deadlockTimeout
+ " milliseconds while sending messages", e);
- }
- } else {
- context.yield();
+ final String partitionStrategy =
validationContext.getProperty(PARTITION_STRATEGY).getValue();
+ if
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())
+ && !validationContext.getProperty(PARTITION).isSet()) {
+ results.add(new
ValidationResult.Builder().subject("Partition").valid(false)
+ .explanation("The <Partition> property must be set
when configured to use the User-Defined Partitioning Strategy")
+ .build());
}
+ return results;
}
- private void doOnTrigger(final ProcessContext context, ProcessSession
session, final FlowFile flowFile) throws ProcessException {
- final String topic =
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
- final String key =
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
- final byte[] keyBytes = key == null ? null :
key.getBytes(StandardCharsets.UTF_8);
- String delimiter =
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
- if (delimiter != null) {
- delimiter = delimiter.replace("\\n", "\n").replace("\\r",
"\r").replace("\\t", "\t");
- }
-
- final Producer<byte[], byte[]> producer = getProducer();
-
- if (delimiter == null) {
- // Send the entire FlowFile as a single message.
- final byte[] value = new byte[(int) flowFile.getSize()];
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws
IOException {
- StreamUtils.fillBuffer(in, value);
- }
- });
-
- final Integer partition;
- try {
- partition = getPartition(context, flowFile, topic);
- } catch (final Exception e) {
- getLogger().error("Failed to obtain a partition for {} due
to {}", new Object[] {flowFile, e});
- session.transfer(session.penalize(flowFile), REL_FAILURE);
- session.commit();
- return;
- }
-
- final ProducerRecord<byte[], byte[]> producerRecord = new
ProducerRecord<>(topic, partition, keyBytes, value);
-
- final FlowFileMessageBatch messageBatch = new
FlowFileMessageBatch(session, flowFile, topic);
- messageBatch.setNumMessages(1);
- activeBatches.add(messageBatch);
-
- try {
- producer.send(producerRecord, new Callback() {
- @Override
- public void onCompletion(final RecordMetadata
metadata, final Exception exception) {
- if (exception == null) {
- // record was successfully sent.
- messageBatch.addSuccessfulRange(0L,
flowFile.getSize(), metadata.offset());
- } else {
- messageBatch.addFailedRange(0L,
flowFile.getSize(), exception);
- }
- }
- });
- } catch (final BufferExhaustedException bee) {
- messageBatch.addFailedRange(0L, flowFile.getSize(), bee);
- context.yield();
- return;
- }
- } else {
- final byte[] delimiterBytes =
delimiter.getBytes(StandardCharsets.UTF_8);
-
- // The NonThreadSafeCircularBuffer allows us to add a byte
from the stream one at a time and see
- // if it matches some pattern. We can use this to search for
the delimiter as we read through
- // the stream of bytes in the FlowFile
- final NonThreadSafeCircularBuffer buffer = new
NonThreadSafeCircularBuffer(delimiterBytes);
-
- final LongHolder messagesSent = new LongHolder(0L);
- final FlowFileMessageBatch messageBatch = new
FlowFileMessageBatch(session, flowFile, topic);
- activeBatches.add(messageBatch);
-
- try (final ByteArrayOutputStream baos = new
ByteArrayOutputStream()) {
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream rawIn) throws
IOException {
- byte[] data = null; // contents of a single message
-
- boolean streamFinished = false;
-
- int nextByte;
- try (final InputStream bufferedIn = new
BufferedInputStream(rawIn);
- final ByteCountingInputStream in = new
ByteCountingInputStream(bufferedIn)) {
-
- long messageStartOffset =
in.getBytesConsumed();
-
- // read until we're out of data.
- while (!streamFinished) {
- nextByte = in.read();
-
- if (nextByte > -1) {
- baos.write(nextByte);
- }
-
- if (nextByte == -1) {
- // we ran out of data. This message is
complete.
- data = baos.toByteArray();
- streamFinished = true;
- } else if (buffer.addAndCompare((byte)
nextByte)) {
- // we matched our delimiter. This
message is complete. We want all of the bytes from the
- // underlying BAOS exception for the
last 'delimiterBytes.length' bytes because we don't want
- // the delimiter itself to be sent.
- data =
Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() -
delimiterBytes.length);
- }
-
- if (data != null) {
- final long messageEndOffset =
in.getBytesConsumed();
-
- // If the message has no data, ignore
it.
- if (data.length != 0) {
- final Integer partition;
- try {
- partition =
getPartition(context, flowFile, topic);
- } catch (final Exception e) {
-
messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e);
- getLogger().error("Failed to
obtain a partition for {} due to {}", new Object[] {flowFile, e});
- continue;
- }
-
-
- final ProducerRecord<byte[],
byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data);
- final long rangeStart =
messageStartOffset;
-
- try {
- producer.send(producerRecord,
new Callback() {
- @Override
- public void
onCompletion(final RecordMetadata metadata, final Exception exception) {
- if (exception == null)
{
- // record was
successfully sent.
-
messageBatch.addSuccessfulRange(rangeStart, messageEndOffset,
metadata.offset());
- } else {
-
messageBatch.addFailedRange(rangeStart, messageEndOffset, exception);
- }
- }
- });
-
- messagesSent.incrementAndGet();
- } catch (final
BufferExhaustedException bee) {
- // Not enough room in the
buffer. Add from the beginning of this message to end of FlowFile as a failed
range
-
messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee);
- context.yield();
- return;
- }
-
- }
-
- // reset BAOS so that we can start a
new message.
- baos.reset();
- data = null;
- messageStartOffset =
in.getBytesConsumed();
- }
- }
- }
- }
- });
-
- messageBatch.setNumMessages(messagesSent.get());
- }
+ /**
+ *
+ */
+ private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile,
ProcessSession session) {
+ if (flowFile.getAttribute(ATTR_FAILED_SEGMENTS) != null) {
+ flowFile = session.removeAttribute(flowFile,
ATTR_FAILED_SEGMENTS);
+ flowFile = session.removeAttribute(flowFile, ATTR_KEY);
+ flowFile = session.removeAttribute(flowFile, ATTR_TOPIC);
+ flowFile = session.removeAttribute(flowFile, ATTR_DELIMITER);
}
+ return flowFile;
}
+ /**
+ *
+ */
+ private Object determinePartition(SplittableMessageContext
messageContext, ProcessContext context, FlowFile flowFile) {
+ String partitionStrategy =
context.getProperty(PARTITION_STRATEGY).getValue();
+ String partitionValue = null;
+ if
(partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) {
+ partitionValue =
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+ }
+ return partitionValue;
+ }
- private static class Range {
- private final long start;
- private final long end;
- private final Long kafkaOffset;
-
- public Range(final long start, final long end, final Long
kafkaOffset) {
- this.start = start;
- this.end = end;
- this.kafkaOffset = kafkaOffset;
- }
-
- public long getStart() {
- return start;
- }
-
- public long getEnd() {
- return end;
- }
-
- public Long getKafkaOffset() {
- return kafkaOffset;
- }
-
- @Override
- public String toString() {
- return "Range[" + start + "-" + end + "]";
+ /**
+ *
+ */
+ private Map<String, String>
buildFailedFlowFileAttributes(List<Integer> failedSegments,
+ SplittableMessageContext messageContext) {
+ StringBuffer buffer = new StringBuffer();
--- End diff --
I think we should use StringBuilder rather than StringBuffer here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---