[ https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628907#comment-15628907 ]
ASF GitHub Bot commented on NIFI-2850: -------------------------------------- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1115#discussion_r86136287 --- Diff: nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java --- @@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext context, final ProcessSessionFacto } final ProcessSession session = sessionFactory.createSession(); - FlowFile flowFile = session.get(); - if (flowFile == null) { + final List<FlowFile> flowFiles = session.get(1000); + if (flowFiles.isEmpty()) { break; } - flowFile = this.preprocessFlowFile(context, session, flowFile); - - String groupId = this.getGroupId(context, flowFile); - - final boolean binned = binManager.offer(groupId, flowFile, session); - - // could not be added to a bin -- probably too large by itself, so create a separate bin for just this guy. - if (!binned) { - Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, null); - bin.offer(flowFile, session); - this.readyBins.add(bin); + final Map<String, List<FlowFile>> flowFileGroups = new HashMap<>(); + for (FlowFile flowFile : flowFiles) { + flowFile = this.preprocessFlowFile(context, session, flowFile); + final String groupingIdentifier = getGroupId(context, flowFile); + flowFileGroups.computeIfAbsent(groupingIdentifier, id -> new ArrayList<>()).add(flowFile); --- End diff -- I don't believe so. Using putIfAbsent, we would be creating a new ArrayList every time. By using the computeIfAbsent, it allows us to create the ArrayList only if the key is not already present. > Provide ability for a FlowFile to be migrated from one Process Session to > another > --------------------------------------------------------------------------------- > > Key: NIFI-2850 > URL: https://issues.apache.org/jira/browse/NIFI-2850 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework > Reporter: Mark Payne > Assignee: Oleg Zhurakousky > Fix For: 1.1.0 > > > Currently, the MergeContent processor creates a separate ProcessSession for > each FlowFile that it pulls. This is done so that we can ensure that we can > commit all Process Sessions when a bin is full. Unfortunately, this means > that MergeContent is required to call ProcessSession.get() many times, which > adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be > migrated from 1 session to another, we can have a session per bin, and then > use ProcessSession.get(100) to greatly reduce lock contention. This will > likely have benefits in other processors as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)