[ 
https://issues.apache.org/jira/browse/NIFI-2850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15628907#comment-15628907
 ] 

ASF GitHub Bot commented on NIFI-2850:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1115#discussion_r86136287
  
    --- Diff: 
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
 ---
    @@ -273,25 +262,26 @@ private int binFlowFiles(final ProcessContext 
context, final ProcessSessionFacto
                 }
     
                 final ProcessSession session = sessionFactory.createSession();
    -            FlowFile flowFile = session.get();
    -            if (flowFile == null) {
    +            final List<FlowFile> flowFiles = session.get(1000);
    +            if (flowFiles.isEmpty()) {
                     break;
                 }
     
    -            flowFile = this.preprocessFlowFile(context, session, flowFile);
    -
    -            String groupId = this.getGroupId(context, flowFile);
    -
    -            final boolean binned = binManager.offer(groupId, flowFile, 
session);
    -
    -            // could not be added to a bin -- probably too large by 
itself, so create a separate bin for just this guy.
    -            if (!binned) {
    -                Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, 
null);
    -                bin.offer(flowFile, session);
    -                this.readyBins.add(bin);
    +            final Map<String, List<FlowFile>> flowFileGroups = new 
HashMap<>();
    +            for (FlowFile flowFile : flowFiles) {
    +                flowFile = this.preprocessFlowFile(context, session, 
flowFile);
    +                final String groupingIdentifier = getGroupId(context, 
flowFile);
    +                flowFileGroups.computeIfAbsent(groupingIdentifier, id -> 
new ArrayList<>()).add(flowFile);
    --- End diff --
    
    I don't believe so. Using putIfAbsent, we would be creating a new ArrayList 
every time. By using the computeIfAbsent, it allows us to create the ArrayList 
only if the key is not already present.


> Provide ability for a FlowFile to be migrated from one Process Session to 
> another
> ---------------------------------------------------------------------------------
>
>                 Key: NIFI-2850
>                 URL: https://issues.apache.org/jira/browse/NIFI-2850
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Oleg Zhurakousky
>             Fix For: 1.1.0
>
>
> Currently, the MergeContent processor creates a separate ProcessSession for 
> each FlowFile that it pulls. This is done so that we can ensure that we can 
> commit all Process Sessions when a bin is full. Unfortunately, this means 
> that MergeContent is required to call ProcessSession.get() many times, which 
> adds a lot of contention on the FlowFile Queue. If we allow FlowFiles to be 
> migrated from 1 session to another, we can have a session per bin, and then 
> use ProcessSession.get(100) to greatly reduce lock contention. This will 
> likely have benefits in other processors as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to