[
https://issues.apache.org/jira/browse/NIFI-2732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15467593#comment-15467593
]
ASF GitHub Bot commented on NIFI-2732:
--------------------------------------
Github user joewitt commented on a diff in the pull request:
https://github.com/apache/nifi/pull/987#discussion_r77650137
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
---
@@ -47,22 +156,244 @@
* kafka client to collect more data from Kafka before committing the
* offsets.
*
- * @param offsets offsets
- * @throws KafkaException if issue occurs talking to underlying
resource.
+ * if false then we didn't do anything and should probably yield if
true
+ * then we committed new data
+ *
+ */
+ boolean commit() {
+ if (uncommittedOffsetsMap.isEmpty()) {
+ resetInternalState();
+ return false;
+ }
+ try {
+ /**
+ * Committing the nifi session then the offsets means we have
an at
+ * least once guarantee here. If we reversed the order we'd
have at
+ * most once.
+ */
+ final Collection<FlowFile> bundledFlowFiles = getBundles();
+ if (!bundledFlowFiles.isEmpty()) {
+ getProcessSession().transfer(getBundles(), REL_SUCCESS);
--- End diff --
no reason - just me being a goofball. Fixed and pushed
> ConsumeKafka 0.9 and 0.10 not handling partition reassignment case
> sufficiently
> -------------------------------------------------------------------------------
>
> Key: NIFI-2732
> URL: https://issues.apache.org/jira/browse/NIFI-2732
> Project: Apache NiFi
> Issue Type: Bug
> Reporter: Joseph Witt
> Assignee: Joseph Witt
> Priority: Critical
> Fix For: 1.1.0
>
>
> The new ConsumeKafka clients handle the threading model of the consumer api
> correctly. However, they are not yet honoring partition reassignment cases
> sufficiently which means we could have avoidable cases of duplication. By
> registering a partition reassignment listener we can handle it correctly.
> Further, the processor is loading subsequent polls of messages into memory
> rather than writing directly to the process session/disk. We could write
> them to disk and achieve far better performance and efficiency. Early
> testing shows easily achieving 100MB/s sustained per thread on a simple
> laptop setup with defaults which will scale very nicely on a legit installed
> content repository.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)