[
https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15609009#comment-15609009
]
ASF GitHub Bot commented on NIFI-2850:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1115#discussion_r84719151
--- Diff:
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
---
@@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext
context, final ProcessSessionFacto
}
final ProcessSession session = sessionFactory.createSession();
- FlowFile flowFile = session.get();
- if (flowFile == null) {
+ final List<FlowFile> flowFiles = session.get(1000);
+ if (flowFiles.isEmpty()) {
break;
}
- flowFile = this.preprocessFlowFile(context, session, flowFile);
-
- String groupId = this.getGroupId(context, flowFile);
-
- final boolean binned = binManager.offer(groupId, flowFile,
session);
-
- // could not be added to a bin -- probably too large by
itself, so create a separate bin for just this guy.
- if (!binned) {
- Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE,
null);
- bin.offer(flowFile, session);
- this.readyBins.add(bin);
+ final Map<String, List<FlowFile>> flowFileGroups = new
HashMap<>();
+ for (FlowFile flowFile : flowFiles) {
+ flowFile = this.preprocessFlowFile(context, session,
flowFile);
+ final String groupingIdentifier = getGroupId(context,
flowFile);
+ flowFileGroups.computeIfAbsent(groupingIdentifier, id ->
new ArrayList<>()).add(flowFile);
}
- flowFilesBinned++;
+ for (final Map.Entry<String, List<FlowFile>> entry :
flowFileGroups.entrySet()) {
+ final Set<FlowFile> unbinned =
binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
+ for (final FlowFile flowFile : unbinned) {
+ Bin bin = new Bin(session, 0, Long.MAX_VALUE, 0,
Integer.MAX_VALUE, null);
+ bin.offer(flowFile, session);
+ this.readyBins.add(bin);
+ }
+ }
--- End diff --
After looking at ```BinManager.offer(..)``` I am not sure I understand
what's happening in inner loop above. Arn't you essentially doing the same
thing in ```BinManager:201```?
> Provide ability for a FlowFile to be migrated from one Process Session to
> another
> ---------------------------------------------------------------------------------
>
> Key: NIFI-2850
> URL: https://issues.apache.org/jira/browse/NIFI-2850
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Joseph Witt
> Fix For: 1.1.0
>
>
> Currently, the MergeContent processor creates a separate ProcessSession for
> each FlowFile that it pulls. This is done so that we can ensure that we can
> commit all Process Sessions when a bin is full. Unfortunately, this means
> that MergeContent is required to call ProcessSession.get() many times, which
> adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be
> migrated from 1 session to another, we can have a session per bin, and then
> use ProcessSession.get(100) to greatly reduce lock contention. This will
> likely have benefits in other processors as well.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)