[
https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628950#comment-15628950
]
ASF GitHub Bot commented on NIFI-2850:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1115#discussion_r86138383
--- Diff:
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
---
@@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext
context, final ProcessSessionFacto
}
final ProcessSession session = sessionFactory.createSession();
- FlowFile flowFile = session.get();
- if (flowFile == null) {
+ final List<FlowFile> flowFiles = session.get(1000);
+ if (flowFiles.isEmpty()) {
break;
}
- flowFile = this.preprocessFlowFile(context, session, flowFile);
-
- String groupId = this.getGroupId(context, flowFile);
-
- final boolean binned = binManager.offer(groupId, flowFile,
session);
-
- // could not be added to a bin -- probably too large by
itself, so create a separate bin for just this guy.
- if (!binned) {
- Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE,
null);
- bin.offer(flowFile, session);
- this.readyBins.add(bin);
+ final Map<String, List<FlowFile>> flowFileGroups = new
HashMap<>();
+ for (FlowFile flowFile : flowFiles) {
+ flowFile = this.preprocessFlowFile(context, session,
flowFile);
+ final String groupingIdentifier = getGroupId(context,
flowFile);
+ flowFileGroups.computeIfAbsent(groupingIdentifier, id ->
new ArrayList<>()).add(flowFile);
}
- flowFilesBinned++;
+ for (final Map.Entry<String, List<FlowFile>> entry :
flowFileGroups.entrySet()) {
+ final Set<FlowFile> unbinned =
binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
+ for (final FlowFile flowFile : unbinned) {
+ Bin bin = new Bin(session, 0, Long.MAX_VALUE, 0,
Integer.MAX_VALUE, null);
+ bin.offer(flowFile, session);
+ this.readyBins.add(bin);
+ }
+ }
--- End diff --
Not exactly. The loop above says "if the bin manager didn't bin it for
whatever reason, create our own one-element bin and process it (by adding to
this.readyBins) - nothing else will go in this bin." In BinManager:201, it is
saying "if the FlowFile didn't fit in any of the bins that are available,
create a new bin and add this FlowFile to it. Subsequent FlowFiles may then go
into this bin."
> Provide ability for a FlowFile to be migrated from one Process Session to
> another
> ---------------------------------------------------------------------------------
>
> Key: NIFI-2850
> URL: https://issues.apache.org/jira/browse/NIFI-2850
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Oleg Zhurakousky
> Fix For: 1.1.0
>
>
> Currently, the MergeContent processor creates a separate ProcessSession for
> each FlowFile that it pulls. This is done so that we can ensure that we can
> commit all Process Sessions when a bin is full. Unfortunately, this means
> that MergeContent is required to call ProcessSession.get() many times, which
> adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be
> migrated from 1 session to another, we can have a session per bin, and then
> use ProcessSession.get(100) to greatly reduce lock contention. This will
> likely have benefits in other processors as well.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)