Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1097#discussion_r82070115
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
---
@@ -249,242 +241,141 @@
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return DESCRIPTORS;
+ return PROPERTIES;
}
@Override
protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
- .description("Specifies the value for '" +
propertyDescriptorName + "' Kafka Configuration.")
- .name(propertyDescriptorName).addValidator(new
KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)).dynamic(true)
- .build();
+ .description("Specifies the value for '" +
propertyDescriptorName + "' Kafka Configuration.")
+ .name(propertyDescriptorName)
+ .addValidator(new
KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+ .dynamic(true)
+ .build();
}
@Override
protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
return
KafkaProcessorUtils.validateCommonProperties(validationContext);
}
- volatile KafkaPublisher kafkaPublisher;
-
- /**
- * This thread-safe operation will delegate to
- * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after
first
- * checking and creating (if necessary) Kafka resource which could be
either
- * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close
and
- * destroy the underlying Kafka resource upon catching an {@link
Exception}
- * raised by {@link #rendezvousWithKafka(ProcessContext,
ProcessSession)}.
- * After Kafka resource is destroyed it will be re-created upon the
next
- * invocation of this operation essentially providing a self healing
- * mechanism to deal with potentially corrupted resource.
- * <p>
- * Keep in mind that upon catching an exception the state of this
processor
- * will be set to no longer accept any more tasks, until Kafka
resource is
- * reset. This means that in a multi-threaded situation currently
executing
- * tasks will be given a chance to complete while no new tasks will be
- * accepted.
- *
- * @param context context
- * @param sessionFactory factory
- */
- @Override
- public final void onTrigger(final ProcessContext context, final
ProcessSessionFactory sessionFactory) throws ProcessException {
- if (this.acceptTask) { // acts as a circuit breaker to allow
existing tasks to wind down so 'kafkaPublisher' can be reset before new tasks
are accepted.
- this.taskCounter.incrementAndGet();
- final ProcessSession session = sessionFactory.createSession();
- try {
- /*
- * We can't be doing double null check here since as a
pattern
- * it only works for lazy init but not reset, which is
what we
- * are doing here. In fact the first null check is
dangerous
- * since 'kafkaPublisher' can become null right after its
null
- * check passed causing subsequent NPE.
- */
- synchronized (this) {
- if (this.kafkaPublisher == null) {
- this.kafkaPublisher =
this.buildKafkaResource(context, session);
- }
- }
-
- /*
- * The 'processed' boolean flag does not imply any failure
or success. It simply states that:
- * - ConsumeKafka - some messages were received form Kafka
and 1_ FlowFile were generated
- * - PublishKafka0_10 - some messages were sent to Kafka
based on existence of the input FlowFile
- */
- boolean processed = this.rendezvousWithKafka(context,
session);
- session.commit();
- if (!processed) {
- context.yield();
- }
- } catch (Throwable e) {
- this.acceptTask = false;
- session.rollback(true);
- this.getLogger().error("{} failed to process due to {};
rolling back session", new Object[]{this, e});
- } finally {
- synchronized (this) {
- if (this.taskCounter.decrementAndGet() == 0 &&
!this.acceptTask) {
- this.close();
- this.acceptTask = true;
- }
- }
- }
- } else {
- this.logger.debug("Task was not accepted due to the processor
being in 'reset' state. It will be re-submitted upon completion of the reset.");
- this.getLogger().debug("Task was not accepted due to the
processor being in 'reset' state. It will be re-submitted upon completion of
the reset.");
- context.yield();
+ private synchronized PublisherPool getPublisherPool(final
ProcessContext context) {
+ PublisherPool pool = publisherPool;
+ if (pool != null) {
+ return pool;
}
+
+ return publisherPool = createPublisherPool(context);
+ }
+
+ protected PublisherPool createPublisherPool(final ProcessContext
context) {
+ final int maxMessageSize =
context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
+ final long maxAckWaitMillis =
context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+
+ final Map<String, Object> kafkaProperties = new HashMap<>();
+ KafkaProcessorUtils.buildCommonKafkaProperties(context,
ProducerConfig.class, kafkaProperties);
+ kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ kafkaProperties.put("max.request.size",
String.valueOf(maxMessageSize));
+
+ return new PublisherPool(kafkaProperties, getLogger(),
maxMessageSize, maxAckWaitMillis);
}
- /**
- * Will call {@link Closeable#close()} on the target resource after
which
- * the target resource will be set to null. Should only be called when
there
- * are no more threads being executed on this processor or when it has
been
- * verified that only a single thread remains.
- *
- * @see KafkaPublisher
- * @see KafkaConsumer
- */
@OnStopped
- public void close() {
- try {
- if (this.kafkaPublisher != null) {
- try {
- this.kafkaPublisher.close();
- } catch (Exception e) {
- this.getLogger().warn("Failed while closing " +
this.kafkaPublisher, e);
- }
- }
- } finally {
- this.kafkaPublisher = null;
+ public void closePool() {
+ if (publisherPool != null) {
+ publisherPool.close();
}
+
+ publisherPool = null;
}
- /**
- * Will rendezvous with Kafka if {@link ProcessSession} contains
- * {@link FlowFile} producing a result {@link FlowFile}.
- * <br>
- * The result {@link FlowFile} that is successful is then transfered to
- * {@link #REL_SUCCESS}
- * <br>
- * The result {@link FlowFile} that is failed is then transfered to
- * {@link #REL_FAILURE}
- *
- */
- protected boolean rendezvousWithKafka(ProcessContext context,
ProcessSession session) {
- FlowFile flowFile = session.get();
- if (flowFile != null) {
- long start = System.nanoTime();
- flowFile = this.doRendezvousWithKafka(flowFile, context,
session);
- Relationship relationship = REL_SUCCESS;
- if (!this.isFailedFlowFile(flowFile)) {
- String topic =
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
- long executionDuration =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- String transitUri =
KafkaProcessorUtils.buildTransitURI(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(),
this.brokers, topic);
- session.getProvenanceReporter().send(flowFile, transitUri,
"Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages",
executionDuration);
- this.getLogger().debug("Successfully sent {} to Kafka as
{} message(s) in {} millis",
- new Object[]{flowFile,
flowFile.getAttribute(MSG_COUNT), executionDuration});
- } else {
- relationship = REL_FAILURE;
- flowFile = session.penalize(flowFile);
- }
- session.transfer(flowFile, relationship);
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ final boolean useDemarcator =
context.getProperty(MESSAGE_DEMARCATOR).isSet();
+
+ final List<FlowFile> flowFiles =
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 500));
+ if (flowFiles.isEmpty()) {
+ return;
}
- return flowFile != null;
- }
- /**
- * Builds and instance of {@link KafkaPublisher}.
- */
- protected KafkaPublisher buildKafkaResource(ProcessContext context,
ProcessSession session) {
- final Map<String, String> kafkaProps = new HashMap<>();
- KafkaProcessorUtils.buildCommonKafkaProperties(context,
ProducerConfig.class, kafkaProps);
- kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
- kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
- kafkaProps.put("max.request.size",
String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()));
- this.brokers =
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
- final Properties props = new Properties();
- props.putAll(kafkaProps);
- KafkaPublisher publisher = new KafkaPublisher(props,
this.getLogger());
- return publisher;
- }
+ final PublisherPool pool = getPublisherPool(context);
+ if (pool == null) {
+ context.yield();
+ return;
+ }
+
+ final String securityProtocol =
context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+ final String bootstrapServers =
context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+
+ final long startTime = System.nanoTime();
+ try (final PublisherLease lease = pool.obtainPublisher()) {
+ // Send each FlowFile to Kafka asynchronously.
+ for (final FlowFile flowFile : flowFiles) {
+ if (!isScheduled()) {
+ // If stopped, re-queue FlowFile instead of sending it
+ session.transfer(flowFile);
+ continue;
+ }
- /**
- * Will rendezvous with {@link KafkaPublisher} after building
- * {@link PublishingContext} and will produce the resulting
- * {@link FlowFile}. The resulting FlowFile contains all required
- * information to determine if message publishing originated from the
- * provided FlowFile has actually succeeded fully, partially or failed
- * completely (see {@link #isFailedFlowFile(FlowFile)}.
- */
- private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final
ProcessContext context, final ProcessSession session) {
- final AtomicReference<KafkaPublisher.KafkaPublisherResult>
publishResultRef = new AtomicReference<>();
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(InputStream contentStream) throws
IOException {
- PublishingContext publishingContext =
PublishKafka_0_10.this.buildPublishingContext(flowFile, context, contentStream);
- KafkaPublisher.KafkaPublisherResult result =
PublishKafka_0_10.this.kafkaPublisher.publish(publishingContext);
- publishResultRef.set(result);
+ final byte[] messageKey = getMessageKey(flowFile, context);
+ final String topic =
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+ final byte[] demarcatorBytes;
+ if (useDemarcator) {
+ demarcatorBytes =
context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
+ } else {
+ demarcatorBytes = null;
+ }
+
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream rawIn) throws
IOException {
+ try (final InputStream in = new
BufferedInputStream(rawIn)) {
+ lease.publish(flowFile, in, messageKey,
demarcatorBytes, topic);
+ }
+ }
+ });
}
- });
- FlowFile resultFile = publishResultRef.get().isAllAcked()
- ? this.cleanUpFlowFileIfNecessary(flowFile, session)
- : session.putAllAttributes(flowFile,
this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(),
flowFile, context));
+ // Complete the send
+ final PublishResult publishResult = lease.complete();
- if (!this.isFailedFlowFile(resultFile)) {
- resultFile = session.putAttribute(resultFile, MSG_COUNT,
String.valueOf(publishResultRef.get().getMessagesSent()));
- }
- return resultFile;
- }
+ // Transfer any successful FlowFiles.
+ final long transmissionMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+ for (FlowFile success :
publishResult.getSuccessfulFlowFiles()) {
+ final String topic =
context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
- /**
- * Builds {@link PublishingContext} for message(s) to be sent to Kafka.
- * {@link PublishingContext} contains all contextual information
required by
- * {@link KafkaPublisher} to publish to Kafka. Such information
contains
- * things like topic name, content stream, delimiter, key and last
ACKed
- * message for cases where provided FlowFile is being retried (failed
in the
- * past).
- * <br>
- * For the clean FlowFile (file that has been sent for the first time),
- * PublishingContext will be built form {@link ProcessContext}
associated
- * with this invocation.
- * <br>
- * For the failed FlowFile, {@link PublishingContext} will be built
from
- * attributes of that FlowFile which by then will already contain
required
- * information (e.g., topic, key, delimiter etc.). This is required to
- * ensure the affinity of the retry in the even where processor
- * configuration has changed. However keep in mind that failed
FlowFile is
- * only considered a failed FlowFile if it is being re-processed by
the same
- * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
- * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being
sent to
- * another PublishKafka0_10 processor it is treated as a fresh FlowFile
- * regardless if it has #FAILED* attributes set.
- */
- private PublishingContext buildPublishingContext(FlowFile flowFile,
ProcessContext context, InputStream contentStream) {
- final byte[] keyBytes = getMessageKey(flowFile, context);
-
- final String topicName;
- final byte[] delimiterBytes;
- int lastAckedMessageIndex = -1;
- if (this.isFailedFlowFile(flowFile)) {
- lastAckedMessageIndex =
Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
- topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
- delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR)
!= null
- ?
flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) :
null;
- } else {
- topicName =
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
- delimiterBytes =
context.getProperty(MESSAGE_DEMARCATOR).isSet() ?
context.getProperty(MESSAGE_DEMARCATOR)
-
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8)
: null;
- }
+ final int msgCount =
publishResult.getSuccessfulMessageCount(success);
+ success = session.putAttribute(success, MSG_COUNT,
String.valueOf(msgCount));
+ session.adjustCounter("Messages Sent", msgCount, true);
+
+ final String transitUri =
KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+ session.getProvenanceReporter().send(success, transitUri,
"Sent " + msgCount + " messages", transmissionMillis);
+ session.transfer(success, REL_SUCCESS);
+ }
+
+ // Transfer any failures.
+ for (final FlowFile failure :
publishResult.getFailedFlowFiles()) {
+ final int successCount =
publishResult.getSuccessfulMessageCount(failure);
+ if (successCount > 0) {
+ getLogger().error("Failed to send all message for {}
to Kafka, but {} messages were acknowledged by Kafka. Routing to failure due to
{}",
--- End diff --
This logging statement looks like it was meant to include successCount
since it has 3 parameters, but only 2 passed in. It might also read better if
it said "Failed to send some messages for {} to Kafka, {} messages were
acknowledged" or something like that.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---