[ 
https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628950#comment-15628950
 ] 

ASF GitHub Bot commented on NIFI-2850:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1115#discussion_r86138383
  
    --- Diff: 
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
 ---
    @@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext 
context, final ProcessSessionFacto
                 }
     
                 final ProcessSession session = sessionFactory.createSession();
    -            FlowFile flowFile = session.get();
    -            if (flowFile == null) {
    +            final List<FlowFile> flowFiles = session.get(1000);
    +            if (flowFiles.isEmpty()) {
                     break;
                 }
     
    -            flowFile = this.preprocessFlowFile(context, session, flowFile);
    -
    -            String groupId = this.getGroupId(context, flowFile);
    -
    -            final boolean binned = binManager.offer(groupId, flowFile, 
session);
    -
    -            // could not be added to a bin -- probably too large by 
itself, so create a separate bin for just this guy.
    -            if (!binned) {
    -                Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, 
null);
    -                bin.offer(flowFile, session);
    -                this.readyBins.add(bin);
    +            final Map<String, List<FlowFile>> flowFileGroups = new 
HashMap<>();
    +            for (FlowFile flowFile : flowFiles) {
    +                flowFile = this.preprocessFlowFile(context, session, 
flowFile);
    +                final String groupingIdentifier = getGroupId(context, 
flowFile);
    +                flowFileGroups.computeIfAbsent(groupingIdentifier, id -> 
new ArrayList<>()).add(flowFile);
                 }
     
    -            flowFilesBinned++;
    +            for (final Map.Entry<String, List<FlowFile>> entry : 
flowFileGroups.entrySet()) {
    +                final Set<FlowFile> unbinned = 
binManager.offer(entry.getKey(), entry.getValue(), session, sessionFactory);
    +                for (final FlowFile flowFile : unbinned) {
    +                    Bin bin = new Bin(session, 0, Long.MAX_VALUE, 0, 
Integer.MAX_VALUE, null);
    +                    bin.offer(flowFile, session);
    +                    this.readyBins.add(bin);
    +                }
    +            }
    --- End diff --
    
    Not exactly. The loop above says "if the bin manager didn't bin it for 
whatever reason, create our own one-element bin and process it (by adding to 
this.readyBins) - nothing else will go in this bin." In BinManager:201, it is 
saying "if the FlowFile didn't fit in any of the bins that are available, 
create a new bin and add this FlowFile to it. Subsequent FlowFiles may then go 
into this bin."


> Provide ability for a FlowFile to be migrated from one Process Session to 
> another
> ---------------------------------------------------------------------------------
>
>                 Key: NIFI-2850
>                 URL: https://issues.apache.org/jira/browse/NIFI-2850
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Oleg Zhurakousky
>             Fix For: 1.1.0
>
>
> Currently, the MergeContent processor creates a separate ProcessSession for 
> each FlowFile that it pulls. This is done so that we can ensure that we can 
> commit all Process Sessions when a bin is full. Unfortunately, this means 
> that MergeContent is required to call ProcessSession.get() many times, which 
> adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be 
> migrated from 1 session to another, we can have a session per bin, and then 
> use ProcessSession.get(100) to greatly reduce lock contention. This will 
> likely have benefits in other processors as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to