[
https://issues.apache.org/jira/browse/NIFI-2865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15549944#comment-15549944
]
ASF GitHub Bot commented on NIFI-2865:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1097#discussion_r82070115
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
---
@@ -249,242 +241,141 @@
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return DESCRIPTORS;
+ return PROPERTIES;
}
@Override
protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
- .description("Specifies the value for '" +
propertyDescriptorName + "' Kafka Configuration.")
- .name(propertyDescriptorName).addValidator(new
KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)).dynamic(true)
- .build();
+ .description("Specifies the value for '" +
propertyDescriptorName + "' Kafka Configuration.")
+ .name(propertyDescriptorName)
+ .addValidator(new
KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+ .dynamic(true)
+ .build();
}
@Override
protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
return
KafkaProcessorUtils.validateCommonProperties(validationContext);
}
- volatile KafkaPublisher kafkaPublisher;
-
- /**
- * This thread-safe operation will delegate to
- * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after
first
- * checking and creating (if necessary) Kafka resource which could be
either
- * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close
and
- * destroy the underlying Kafka resource upon catching an {@link
Exception}
- * raised by {@link #rendezvousWithKafka(ProcessContext,
ProcessSession)}.
- * After Kafka resource is destroyed it will be re-created upon the
next
- * invocation of this operation essentially providing a self healing
- * mechanism to deal with potentially corrupted resource.
- * <p>
- * Keep in mind that upon catching an exception the state of this
processor
- * will be set to no longer accept any more tasks, until Kafka
resource is
- * reset. This means that in a multi-threaded situation currently
executing
- * tasks will be given a chance to complete while no new tasks will be
- * accepted.
- *
- * @param context context
- * @param sessionFactory factory
- */
- @Override
- public final void onTrigger(final ProcessContext context, final
ProcessSessionFactory sessionFactory) throws ProcessException {
- if (this.acceptTask) { // acts as a circuit breaker to allow
existing tasks to wind down so 'kafkaPublisher' can be reset before new tasks
are accepted.
- this.taskCounter.incrementAndGet();
- final ProcessSession session = sessionFactory.createSession();
- try {
- /*
- * We can't be doing double null check here since as a
pattern
- * it only works for lazy init but not reset, which is
what we
- * are doing here. In fact the first null check is
dangerous
- * since 'kafkaPublisher' can become null right after its
null
- * check passed causing subsequent NPE.
- */
- synchronized (this) {
- if (this.kafkaPublisher == null) {
- this.kafkaPublisher =
this.buildKafkaResource(context, session);
- }
- }
-
- /*
- * The 'processed' boolean flag does not imply any failure
or success. It simply states that:
- * - ConsumeKafka - some messages were received form Kafka
and 1_ FlowFile were generated
- * - PublishKafka0_10 - some messages were sent to Kafka
based on existence of the input FlowFile
- */
- boolean processed = this.rendezvousWithKafka(context,
session);
- session.commit();
- if (!processed) {
- context.yield();
- }
- } catch (Throwable e) {
- this.acceptTask = false;
- session.rollback(true);
- this.getLogger().error("{} failed to process due to {};
rolling back session", new Object[]{this, e});
- } finally {
- synchronized (this) {
- if (this.taskCounter.decrementAndGet() == 0 &&
!this.acceptTask) {
- this.close();
- this.acceptTask = true;
- }
- }
- }
- } else {
- this.logger.debug("Task was not accepted due to the processor
being in 'reset' state. It will be re-submitted upon completion of the reset.");
- this.getLogger().debug("Task was not accepted due to the
processor being in 'reset' state. It will be re-submitted upon completion of
the reset.");
- context.yield();
+ private synchronized PublisherPool getPublisherPool(final
ProcessContext context) {
+ PublisherPool pool = publisherPool;
+ if (pool != null) {
+ return pool;
}
+
+ return publisherPool = createPublisherPool(context);
+ }
+
+ protected PublisherPool createPublisherPool(final ProcessContext
context) {
+ final int maxMessageSize =
context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
+ final long maxAckWaitMillis =
context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+
+ final Map<String, Object> kafkaProperties = new HashMap<>();
+ KafkaProcessorUtils.buildCommonKafkaProperties(context,
ProducerConfig.class, kafkaProperties);
+ kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ kafkaProperties.put("max.request.size",
String.valueOf(maxMessageSize));
+
+ return new PublisherPool(kafkaProperties, getLogger(),
maxMessageSize, maxAckWaitMillis);
}
- /**
- * Will call {@link Closeable#close()} on the target resource after
which
- * the target resource will be set to null. Should only be called when
there
- * are no more threads being executed on this processor or when it has
been
- * verified that only a single thread remains.
- *
- * @see KafkaPublisher
- * @see KafkaConsumer
- */
@OnStopped
- public void close() {
- try {
- if (this.kafkaPublisher != null) {
- try {
- this.kafkaPublisher.close();
- } catch (Exception e) {
- this.getLogger().warn("Failed while closing " +
this.kafkaPublisher, e);
- }
- }
- } finally {
- this.kafkaPublisher = null;
+ public void closePool() {
+ if (publisherPool != null) {
+ publisherPool.close();
}
+
+ publisherPool = null;
}
- /**
- * Will rendezvous with Kafka if {@link ProcessSession} contains
- * {@link FlowFile} producing a result {@link FlowFile}.
- * <br>
- * The result {@link FlowFile} that is successful is then transfered to
- * {@link #REL_SUCCESS}
- * <br>
- * The result {@link FlowFile} that is failed is then transfered to
- * {@link #REL_FAILURE}
- *
- */
- protected boolean rendezvousWithKafka(ProcessContext context,
ProcessSession session) {
- FlowFile flowFile = session.get();
- if (flowFile != null) {
- long start = System.nanoTime();
- flowFile = this.doRendezvousWithKafka(flowFile, context,
session);
- Relationship relationship = REL_SUCCESS;
- if (!this.isFailedFlowFile(flowFile)) {
- String topic =
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
- long executionDuration =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- String transitUri =
KafkaProcessorUtils.buildTransitURI(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(),
this.brokers, topic);
- session.getProvenanceReporter().send(flowFile, transitUri,
"Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages",
executionDuration);
- this.getLogger().debug("Successfully sent {} to Kafka as
{} message(s) in {} millis",
- new Object[]{flowFile,
flowFile.getAttribute(MSG_COUNT), executionDuration});
- } else {
- relationship = REL_FAILURE;
- flowFile = session.penalize(flowFile);
- }
- session.transfer(flowFile, relationship);
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ final boolean useDemarcator =
context.getProperty(MESSAGE_DEMARCATOR).isSet();
+
+ final List<FlowFile> flowFiles =
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 500));
+ if (flowFiles.isEmpty()) {
+ return;
}
- return flowFile != null;
- }
- /**
- * Builds and instance of {@link KafkaPublisher}.
- */
- protected KafkaPublisher buildKafkaResource(ProcessContext context,
ProcessSession session) {
- final Map<String, String> kafkaProps = new HashMap<>();
- KafkaProcessorUtils.buildCommonKafkaProperties(context,
ProducerConfig.class, kafkaProps);
- kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
- kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
- kafkaProps.put("max.request.size",
String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()));
- this.brokers =
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
- final Properties props = new Properties();
- props.putAll(kafkaProps);
- KafkaPublisher publisher = new KafkaPublisher(props,
this.getLogger());
- return publisher;
- }
+ final PublisherPool pool = getPublisherPool(context);
+ if (pool == null) {
+ context.yield();
+ return;
+ }
+
+ final String securityProtocol =
context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+ final String bootstrapServers =
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+
+ final long startTime = System.nanoTime();
+ try (final PublisherLease lease = pool.obtainPublisher()) {
+ // Send each FlowFile to Kafka asynchronously.
+ for (final FlowFile flowFile : flowFiles) {
+ if (!isScheduled()) {
+ // If stopped, re-queue FlowFile instead of sending it
+ session.transfer(flowFile);
+ continue;
+ }
- /**
- * Will rendezvous with {@link KafkaPublisher} after building
- * {@link PublishingContext} and will produce the resulting
- * {@link FlowFile}. The resulting FlowFile contains all required
- * information to determine if message publishing originated from the
- * provided FlowFile has actually succeeded fully, partially or failed
- * completely (see {@link #isFailedFlowFile(FlowFile)}.
- */
- private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final
ProcessContext context, final ProcessSession session) {
- final AtomicReference<KafkaPublisher.KafkaPublisherResult>
publishResultRef = new AtomicReference<>();
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(InputStream contentStream) throws
IOException {
- PublishingContext publishingContext =
PublishKafka_0_10.this.buildPublishingContext(flowFile, context, contentStream);
- KafkaPublisher.KafkaPublisherResult result =
PublishKafka_0_10.this.kafkaPublisher.publish(publishingContext);
- publishResultRef.set(result);
+ final byte[] messageKey = getMessageKey(flowFile, context);
+ final String topic =
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+ final byte[] demarcatorBytes;
+ if (useDemarcator) {
+ demarcatorBytes =
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
+ } else {
+ demarcatorBytes = null;
+ }
+
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream rawIn) throws
IOException {
+ try (final InputStream in = new
BufferedInputStream(rawIn)) {
+ lease.publish(flowFile, in, messageKey,
demarcatorBytes, topic);
+ }
+ }
+ });
}
- });
- FlowFile resultFile = publishResultRef.get().isAllAcked()
- ? this.cleanUpFlowFileIfNecessary(flowFile, session)
- : session.putAllAttributes(flowFile,
this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(),
flowFile, context));
+ // Complete the send
+ final PublishResult publishResult = lease.complete();
- if (!this.isFailedFlowFile(resultFile)) {
- resultFile = session.putAttribute(resultFile, MSG_COUNT,
String.valueOf(publishResultRef.get().getMessagesSent()));
- }
- return resultFile;
- }
+ // Transfer any successful FlowFiles.
+ final long transmissionMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+ for (FlowFile success :
publishResult.getSuccessfulFlowFiles()) {
+ final String topic =
context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
- /**
- * Builds {@link PublishingContext} for message(s) to be sent to Kafka.
- * {@link PublishingContext} contains all contextual information
required by
- * {@link KafkaPublisher} to publish to Kafka. Such information
contains
- * things like topic name, content stream, delimiter, key and last
ACKed
- * message for cases where provided FlowFile is being retried (failed
in the
- * past).
- * <br>
- * For the clean FlowFile (file that has been sent for the first time),
- * PublishingContext will be built form {@link ProcessContext}
associated
- * with this invocation.
- * <br>
- * For the failed FlowFile, {@link PublishingContext} will be built
from
- * attributes of that FlowFile which by then will already contain
required
- * information (e.g., topic, key, delimiter etc.). This is required to
- * ensure the affinity of the retry in the even where processor
- * configuration has changed. However keep in mind that failed
FlowFile is
- * only considered a failed FlowFile if it is being re-processed by
the same
- * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
- * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being
sent to
- * another PublishKafka0_10 processor it is treated as a fresh FlowFile
- * regardless if it has #FAILED* attributes set.
- */
- private PublishingContext buildPublishingContext(FlowFile flowFile,
ProcessContext context, InputStream contentStream) {
- final byte[] keyBytes = getMessageKey(flowFile, context);
-
- final String topicName;
- final byte[] delimiterBytes;
- int lastAckedMessageIndex = -1;
- if (this.isFailedFlowFile(flowFile)) {
- lastAckedMessageIndex =
Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
- topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
- delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR)
!= null
- ?
flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) :
null;
- } else {
- topicName =
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
- delimiterBytes =
context.getProperty(MESSAGE_DEMARCATOR).isSet() ?
context.getProperty(MESSAGE_DEMARCATOR)
-
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8)
: null;
- }
+ final int msgCount =
publishResult.getSuccessfulMessageCount(success);
+ success = session.putAttribute(success, MSG_COUNT,
String.valueOf(msgCount));
+ session.adjustCounter("Messages Sent", msgCount, true);
+
+ final String transitUri =
KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+ session.getProvenanceReporter().send(success, transitUri,
"Sent " + msgCount + " messages", transmissionMillis);
+ session.transfer(success, REL_SUCCESS);
+ }
+
+ // Transfer any failures.
+ for (final FlowFile failure :
publishResult.getFailedFlowFiles()) {
+ final int successCount =
publishResult.getSuccessfulMessageCount(failure);
+ if (successCount > 0) {
+ getLogger().error("Failed to send all message for {}
to Kafka, but {} messages were acknowledged by Kafka. Routing to failure due to
{}",
--- End diff --
This logging statement looks like it was meant to include successCount
since it has 3 parameters, but only 2 passed in. It might also read better if
it said "Failed to send some messages for {} to Kafka, {} messages were
acknowledged" or something like that.
> Address issues of PublishKafka blocking when having trouble communicating
> with Kafka broker and improve performance
> -------------------------------------------------------------------------------------------------------------------
>
> Key: NIFI-2865
> URL: https://issues.apache.org/jira/browse/NIFI-2865
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Reporter: Mark Payne
> Assignee: Mark Payne
> Fix For: 1.1.0
>
>
> When NiFi is unable to communicate properly with the Kafka broker, we see the
> NiFi threads occasionally block. This should be resolvable by calling the
> wakeup() method of the client. Additionally, if Kafka takes too long to
> respond, we should be able to route the FlowFile to failure and move on.
> PublishKafka has a nice feature that allows a demarcated stream to be sent as
> separate messages, so that a large number of messages can be sent as a single
> FlowFile. However, in the case of individual messages per FlowFile, the
> performance could be improved by batching together multiple FlowFiles per
> session
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)