Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-250 13fb1a758 -> 6b91546d9


NIFI-305: Refactoring superclass BinFiles from MergeContent


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/98afcce0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/98afcce0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/98afcce0

Branch: refs/heads/NIFI-250
Commit: 98afcce0dc3672fe04e2130ac72b9a201e17ba1a
Parents: 6b560b9
Author: gresockj <[email protected]>
Authored: Tue Jan 27 17:32:38 2015 -0500
Committer: gresockj <[email protected]>
Committed: Tue Jan 27 17:32:38 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/BinFiles.java      | 388 ++++++++++++++++++
 .../nifi/processors/standard/MergeContent.java  | 400 ++++---------------
 2 files changed, 473 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/98afcce0/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
new file mode 100644
index 0000000..7846c7d
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/BinFiles.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.util.Bin;
+import org.apache.nifi.processors.standard.util.BinManager;
+import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
+
+/**
+ * Base class for MergeContent.
+ *
+ */
+public abstract class BinFiles extends AbstractSessionFactoryProcessor {
+
+    public static final PropertyDescriptor MIN_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Minimum Group Size")
+            .description("The minimum size of for the bundle")
+            .required(true)
+            .defaultValue("0 B")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor MAX_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Maximum Group Size")
+            .description("The maximum size for the bundle. If not specified, 
there is no maximum.")
+            .required(false)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MIN_ENTRIES = new 
PropertyDescriptor.Builder()
+            .name("Minimum Number of Entries")
+            .description("The minimum number of files to include in a bundle")
+            .required(true)
+            .defaultValue("1")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor MAX_ENTRIES = new 
PropertyDescriptor.Builder()
+            .name("Maximum Number of Entries")
+            .description("The maximum number of files to include in a bundle. 
If not specified, there is no maximum.")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BIN_COUNT = new 
PropertyDescriptor.Builder()
+            .name("Maximum number of Bins")
+            .description("Specifies the maximum number of bins that can be 
held in memory at any one time")
+            .defaultValue("100")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MAX_BIN_AGE = new 
PropertyDescriptor.Builder()
+            .name("Max Bin Age")
+            .description("The maximum age of a Bin that will trigger a Bin to 
be complete. Expected format is <duration> <time unit> where <duration> is a 
positive integer and time unit is one of seconds, minutes, hours")
+            .required(false)
+            .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original").description("The FlowFiles that were 
used to create the bundle").build();
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("If the bundle cannot be 
created, all FlowFiles that would have been used to created the bundle will be 
transferred to failure").build();
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+    private final BinManager binManager = new BinManager();
+
+    private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+
+       final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
+        Set<Relationship> additionalRelationships = 
defineAdditionalRelationships();
+        if (additionalRelationships != null) {
+               relationships.addAll(additionalRelationships);
+        }
+        this.relationships = Collections.unmodifiableSet(relationships);
+
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(MIN_ENTRIES);
+        descriptors.add(MAX_ENTRIES);
+        descriptors.add(MIN_SIZE);
+        descriptors.add(MAX_SIZE);
+        descriptors.add(MAX_BIN_AGE);
+        descriptors.add(MAX_BIN_COUNT);
+        List<PropertyDescriptor> additionalPropertyDescriptors = 
this.defineAdditionalPropertyDescriptors();
+        if (additionalPropertyDescriptors != null) {
+               descriptors.addAll(additionalPropertyDescriptors);
+        }
+
+        this.descriptors = Collections.unmodifiableList(descriptors);
+    }
+
+    @OnStopped
+    public void resetState() {
+        binManager.purge();
+
+        Bin bin;
+        while ((bin = readyBins.poll()) != null) {
+            for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
+                wrapper.getSession().rollback();
+            }
+        }
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    /**
+     * Allows any additional relationships to be defined.
+     * @return Relationships to be added in the init() method
+     */ 
+    protected abstract Set<Relationship> defineAdditionalRelationships();
+    
+    /**
+     * Allows any additional property descriptors to be defined.
+     * @return Properties to be added in the init() method
+     */
+    protected abstract List<PropertyDescriptor> 
defineAdditionalPropertyDescriptors();
+
+       /**
+        * Allows general pre-processing of a flow file before it is offered to 
a
+        * bin. This is called before getGroupId().
+        * 
+        * @param context
+        * @param session
+        * @param flowFile
+        * @return The flow file, possibly altered
+        */
+    protected abstract FlowFile preprocessFlowFile(final ProcessContext 
context, final ProcessSession session, final FlowFile flowFile);
+    
+    /**
+     * Returns a group ID representing a bin.  This allows flow files to be
+     * binned into like groups.
+     * @param context 
+     * @param flowFile
+     * @return The appropriate group ID
+     */
+    protected abstract String getGroupId(final ProcessContext context, final 
FlowFile flowFile);
+
+    /**
+     * Performs any additional setup of the bin manager.  Called during the
+     * OnScheduled phase.
+     * @param binManager The bin manager
+     * @param context
+     */
+    protected abstract void setUpBinManager(BinManager binManager, 
ProcessContext context);
+    
+    /**
+        * Processes a single bin. Implementing class is responsible for 
committing
+        * each session
+        * 
+        * @param unmodifiableBin
+        *            A reference to a single bin of flow file/session wrappers
+        * @param binContents
+        *            A copy of the contents of the bin
+        * @param context
+        *            The context
+        * @param session
+        *            The session that created the bin
+        * @param logger
+        *            The logger
+        * @return Return true if the input bin was already committed. E.g., in 
case of a
+        * failure, the implementation may choose to transfer all binned files
+        * to Failure and commit their sessions.  If false, the 
+        * processBins() method will transfer the files to Original and commit
+        * the sessions
+        * @throws Exception
+        *             This will be handled appropriately, and all flow files 
in the
+        *             bin will be transferred to failure and the session rolled
+        *             back
+        */
+       protected abstract boolean processBin(Bin unmodifiableBin,
+                       List<FlowFileSessionWrapper> binContents, 
ProcessContext context,
+                       ProcessSession session, ProcessorLog logger) throws 
Exception;
+
+    @Override
+    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
+        int binsAdded = binFlowFiles(context, sessionFactory);
+        getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded});
+        
+        if (!isScheduled()) {
+            return;
+        }
+
+        binsAdded += migrateBins(context);
+
+        final int binsProcessed = processBins(context, sessionFactory);
+        if (binsProcessed == 0 && binsAdded == 0) {
+            context.yield();
+        }
+    }
+
+    private int migrateBins(final ProcessContext context) {
+        int added = 0;
+        for (final Bin bin : binManager.removeReadyBins(true)) {
+            this.readyBins.add(bin);
+            added++;
+        }
+
+        // if we have created all of the bins that are allowed, go ahead and 
remove the oldest one. If we don't do
+        // this, then we will simply wait for it to expire because we can't 
get any more FlowFiles into the
+        // bins. So we may as well expire it now.
+        if (added == 0 && binManager.getBinCount() >= 
context.getProperty(MAX_BIN_COUNT).asInteger()) {
+            final Bin bin = binManager.removeOldestBin();
+            if (bin != null) {
+                added++;
+                this.readyBins.add(bin);
+            }
+        }
+
+        return added;
+    }
+
+    private int processBins(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
+        final Bin bin = readyBins.poll();
+        if (bin == null) {
+            return 0;
+        }
+
+        final List<Bin> bins = new ArrayList<>();
+        bins.add(bin);
+
+        final ProcessorLog logger = getLogger();
+        final ProcessSession session = sessionFactory.createSession();
+
+        final List<FlowFileSessionWrapper> binCopy = new 
ArrayList<>(bin.getContents());
+
+       boolean binAlreadyCommitted = false;
+        try {
+               binAlreadyCommitted = this.processBin(bin, binCopy, context, 
session, logger);
+        } catch (final Exception e) {
+            logger.error("Failed to process bundle of {} files due to {}", new 
Object[]{binCopy.size(), e});
+
+            for (final FlowFileSessionWrapper wrapper : binCopy) {
+                wrapper.getSession().transfer(wrapper.getFlowFile(), 
REL_FAILURE);
+                wrapper.getSession().commit();
+            }
+            session.rollback();
+            return 1;
+        }
+
+        // we first commit the bundle's session before the originals' sessions 
because if we are restarted or crash
+        // between commits, we favor data redundancy over data loss. Since we 
have no Distributed Transaction capability
+        // across multiple sessions, we cannot guarantee atomicity across the 
sessions
+        session.commit();
+        // If this bin's session has been committed, move on.
+        if ( !binAlreadyCommitted ) {
+            for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
+                wrapper.getSession().transfer(wrapper.getFlowFile(), 
REL_ORIGINAL);
+                wrapper.getSession().commit();
+            }
+        }
+
+        return 1;
+    }
+    
+       private int binFlowFiles(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
+        int binsAdded = 0;
+        while (binManager.getBinCount() < 
context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
+            if (!isScheduled()) {
+                return binsAdded;
+            }
+
+            final ProcessSession session = sessionFactory.createSession();
+            FlowFile flowFile = session.get();
+            if (flowFile == null) {
+                return binsAdded;
+            }
+
+            flowFile = this.preprocessFlowFile(context, session, flowFile);
+
+            String groupId = this.getGroupId(context, flowFile);
+
+            final boolean binned = binManager.offer(groupId, flowFile, 
session);
+
+            // could not be added to a bin -- probably too large by itself, so 
create a separate bin for just this guy.
+            if (!binned) {
+                Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, 
null);
+                bin.offer(flowFile, session);
+                this.readyBins.add(bin);
+            }
+
+            binsAdded++;
+        }
+
+        return binsAdded;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws IOException {
+        
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
+
+        if (context.getProperty(MAX_BIN_AGE).isSet() ) {
+            
binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
+        } else {
+            binManager.setMaxBinAge(Integer.MAX_VALUE);
+        }
+        
+        if ( context.getProperty(MAX_SIZE).isSet() ) {
+            
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
+        } else {
+            binManager.setMaximumSize(Long.MAX_VALUE);
+        }
+        
+        
binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
+
+        if ( context.getProperty(MAX_ENTRIES).isSet() ) {
+            
binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
+        } else {
+            binManager.setMaximumEntries(Integer.MAX_VALUE);
+        }
+
+        this.setUpBinManager(binManager, context);
+    }
+    
+       @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext context) {
+        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(context));
+
+        final long minBytes = 
context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
+        final Double maxBytes = 
context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
+
+        if (maxBytes != null && maxBytes.longValue() < minBytes) {
+            problems.add(new 
ValidationResult.Builder().subject(MIN_SIZE.getName()).input(
+                    
context.getProperty(MIN_SIZE).getValue()).valid(false).explanation("Min Size 
must be less than or equal to Max Size").build());
+        }
+
+        final Long min = context.getProperty(MIN_ENTRIES).asLong();
+        final Long max = context.getProperty(MAX_ENTRIES).asLong();
+
+        if (min != null && max != null) {
+            if (min > max) {
+                problems.add(new 
ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min
 Entries must be less than or equal to Max Entries").build());
+            }
+        }
+
+        return problems;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/98afcce0/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index f2e4a8d..73cb5a6 100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -23,7 +23,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -31,59 +30,47 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.NonCloseableOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
-import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.standard.util.Bin;
 import org.apache.nifi.processors.standard.util.BinManager;
 import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FlowFilePackager;
 import org.apache.nifi.util.FlowFilePackagerV1;
 import org.apache.nifi.util.FlowFilePackagerV2;
 import org.apache.nifi.util.FlowFilePackagerV3;
 import org.apache.nifi.util.ObjectHolder;
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 
 @SideEffectFree
 @TriggerWhenEmpty
 @Tags({"merge", "content", "correlation", "tar", "zip", "stream", 
"concatenation", "archive", "flowfile-stream", "flowfile-stream-v3"})
 @CapabilityDescription("Merges a Group of FlowFiles together based on a 
user-defined strategy and packages them into a single FlowFile. It is 
recommended that the Processor be configured with only a single incoming 
connection, as Group of FlowFiles will not be created from FlowFiles in 
different connections. This processor updates the mime.type attribute as 
appropriate.")
-public class MergeContent extends AbstractSessionFactoryProcessor {
+public class MergeContent extends BinFiles {
 
     // preferred attributes
     public static final String FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
@@ -207,160 +194,82 @@ public class MergeContent extends 
AbstractSessionFactoryProcessor {
             .defaultValue("false")
             .build();
 
-    public static final PropertyDescriptor MIN_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Minimum Group Size")
-            .description("The minimum size of for the bundle")
-            .required(true)
-            .defaultValue("0 B")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor MAX_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Maximum Group Size")
-            .description("The maximum size for the bundle. If not specified, 
there is no maximum.")
-            .required(false)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MIN_ENTRIES = new 
PropertyDescriptor.Builder()
-            .name("Minimum Number of Entries")
-            .description("The minimum number of files to include in a bundle")
-            .required(true)
-            .defaultValue("1")
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor MAX_ENTRIES = new 
PropertyDescriptor.Builder()
-            .name("Maximum Number of Entries")
-            .description("The maximum number of files to include in a bundle. 
If not specified, there is no maximum.")
-            .required(false)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MAX_BIN_COUNT = new 
PropertyDescriptor.Builder()
-            .name("Maximum number of Bins")
-            .description("Specifies the maximum number of bins that can be 
held in memory at any one time")
-            .defaultValue("100")
-            .required(true)
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .build();
-
-    public static final PropertyDescriptor MAX_BIN_AGE = new 
PropertyDescriptor.Builder()
-            .name("Max Bin Age")
-            .description("The maximum age of a Bin that will trigger a Bin to 
be complete. Expected format is <duration> <time unit> where <duration> is a 
positive integer and time unit is one of seconds, minutes, hours")
-            .required(false)
-            .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
-            .build();
-
-    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original").description("The FlowFiles that were 
used to create the bundle").build();
     public static final Relationship REL_MERGED = new 
Relationship.Builder().name("merged").description("The FlowFile containing the 
merged content").build();
-    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("If the bundle cannot be 
created, all FlowFiles that would have been used to created the bundle will be 
transferred to failure").build();
 
     public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
 
-    private Set<Relationship> relationships;
-    private List<PropertyDescriptor> descriptors;
-    private final BinManager binManager = new BinManager();
-
-    private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
+       @Override
+       protected Set<Relationship> defineAdditionalRelationships() {
         final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_ORIGINAL);
         relationships.add(REL_MERGED);
-        relationships.add(REL_FAILURE);
-        this.relationships = Collections.unmodifiableSet(relationships);
+        
+        return relationships;
+    }
 
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+       @Override
+       protected List<PropertyDescriptor> 
defineAdditionalPropertyDescriptors() {
+               final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(MERGE_STRATEGY);
         descriptors.add(MERGE_FORMAT);
         descriptors.add(ATTRIBUTE_STRATEGY);
         descriptors.add(CORRELATION_ATTRIBUTE_NAME);
-        descriptors.add(MIN_ENTRIES);
-        descriptors.add(MAX_ENTRIES);
-        descriptors.add(MIN_SIZE);
-        descriptors.add(MAX_SIZE);
-        descriptors.add(MAX_BIN_AGE);
-        descriptors.add(MAX_BIN_COUNT);
         descriptors.add(HEADER);
         descriptors.add(FOOTER);
         descriptors.add(DEMARCATOR);
         descriptors.add(COMPRESSION_LEVEL);
         descriptors.add(KEEP_PATH);
-
-        this.descriptors = Collections.unmodifiableList(descriptors);
-    }
-
-    @OnStopped
-    public void resetState() {
-        binManager.purge();
-
-        Bin bin;
-        while ((bin = readyBins.poll()) != null) {
-            for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
-                wrapper.getSession().rollback();
-            }
-        }
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        
         return descriptors;
-    }
+       }
 
     private byte[] readContent(final String filename) throws IOException {
         return Files.readAllBytes(Paths.get(filename));
     }
 
-    
-    @Override
-    public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
-        int binsAdded = binFlowFiles(context, sessionFactory);
-        getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded});
-        
-        if (!isScheduled()) {
-            return;
+
+       @Override
+       protected FlowFile preprocessFlowFile(ProcessContext context,
+                       ProcessSession session, FlowFile flowFile) {
+               
+        // handle backward compatibility with old segment attributes
+        if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && 
flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
+            flowFile = session.putAttribute(flowFile, 
FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
+        }
+        if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && 
flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
+            flowFile = session.putAttribute(flowFile, 
FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
         }
+        if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && 
flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
+            flowFile = session.putAttribute(flowFile, FRAGMENT_ID_ATTRIBUTE, 
flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
+        }
+        
+        return flowFile;
+       }
 
-        binsAdded += migrateBins(context);
+       @Override
+       protected String getGroupId(ProcessContext context, FlowFile flowFile) {
 
-        final int binsProcessed = processBins(context, sessionFactory);
-        if (binsProcessed == 0 && binsAdded == 0) {
-            context.yield();
-        }
-    }
-    
-
-    private int migrateBins(final ProcessContext context) {
-        int added = 0;
-        for (final Bin bin : binManager.removeReadyBins(true)) {
-            this.readyBins.add(bin);
-            added++;
-        }
-
-        // if we have created all of the bins that are allowed, go ahead and 
remove the oldest one. If we don't do
-        // this, then we will simply wait for it to expire because we can't 
get any more FlowFiles into the
-        // bins. So we may as well expire it now.
-        if (added == 0 && binManager.getBinCount() >= 
context.getProperty(MAX_BIN_COUNT).asInteger()) {
-            final Bin bin = binManager.removeOldestBin();
-            if (bin != null) {
-                added++;
-                this.readyBins.add(bin);
-            }
+        final String correlationAttributeName = 
context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
+        String groupId = (correlationAttributeName == null) ? null : 
flowFile.getAttribute(correlationAttributeName);
+
+        // when MERGE_STRATEGY is Defragment and correlationAttributeName is 
null then bin by fragment.identifier
+        if (groupId == null && 
MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue()))
 {
+            groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
         }
 
-        return added;
-    }
+        return groupId;
+       }
 
-    private int processBins(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
-        final Bin bin = readyBins.poll();
-        if (bin == null) {
-            return 0;
+       @Override
+       protected void setUpBinManager(BinManager binManager, ProcessContext 
context) {
+               if 
(MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue()))
 {
+            binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
         }
+       }
+
+       @Override
+       protected boolean processBin(Bin unmodifiableBin,
+                       List<FlowFileSessionWrapper> binCopy, ProcessContext 
context,
+                       ProcessSession session, ProcessorLog logger) throws 
Exception {
 
         final String mergeFormat = 
context.getProperty(MERGE_FORMAT).getValue();
         MergeBin merger;
@@ -398,130 +307,45 @@ public class MergeContent extends 
AbstractSessionFactoryProcessor {
                 break;
         }
 
-        final List<Bin> bins = new ArrayList<>();
-        bins.add(bin);
-
-        final ProcessorLog logger = getLogger();
-        final ProcessSession session = sessionFactory.createSession();
-
-        final Set<Bin> committedBins = new HashSet<>();
-        
-        for (final Bin unmodifiableBin : bins) {
-            final List<FlowFileSessionWrapper> binCopy = new 
ArrayList<>(unmodifiableBin.getContents());
-
-            if 
(MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue()))
 {
-                final String error = getDefragmentValidationError(binCopy);
-                if (error != null) {
-                    final String binDescription = binCopy.size() <= 10 ? 
binCopy.toString() : binCopy.size() + " FlowFiles";
-                    logger.error(error + "; routing {} to failure", new 
Object[]{binDescription});
-                    for ( final FlowFileSessionWrapper wrapper : binCopy ) {
-                        wrapper.getSession().transfer(wrapper.getFlowFile(), 
REL_FAILURE);
-                        wrapper.getSession().commit();
-                        committedBins.add(unmodifiableBin);
-                    }
-                    
-                    continue;
-                }
-
-                Collections.sort(binCopy, new FragmentComparator());
-            }
-
-            FlowFile bundle = null;
-            try {
-                bundle = merger.merge(context, session, binCopy);
-
-                // keep the filename, as it is added to the bundle.
-                final String filename = 
bundle.getAttribute(CoreAttributes.FILENAME.key());
-
-                // merge all of the attributes
-                final Map<String, String> bundleAttributes = 
attributeStrategy.getMergedAttributes(binCopy);
-                bundleAttributes.put(CoreAttributes.MIME_TYPE.key(), 
merger.getMergedContentType());
-                // restore the filename of the bundle
-                bundleAttributes.put(CoreAttributes.FILENAME.key(), filename);
-                bundleAttributes.put(MERGE_COUNT_ATTRIBUTE, 
Integer.toString(binCopy.size()));
-                bundleAttributes.put(MERGE_BIN_AGE_ATTRIBUTE, 
Long.toString(bin.getBinAge()));
-
-                bundle = session.putAllAttributes(bundle, bundleAttributes);
-
-                final String inputDescription = (binCopy.size() < 10) ? 
binCopy.toString() : binCopy.size() + " FlowFiles";
-                logger.info("Merged {} into {}", new 
Object[]{inputDescription, bundle});
-            } catch (final Exception e) {
-                logger.error("Failed to process bundle of {} files due to {}", 
new Object[]{binCopy.size(), e});
 
-                for (final FlowFileSessionWrapper wrapper : binCopy) {
+        if 
(MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue()))
 {
+            final String error = getDefragmentValidationError(binCopy);
+            
+            // Fail the flow files and commit them
+            if (error != null) {
+                final String binDescription = binCopy.size() <= 10 ? 
binCopy.toString() : binCopy.size() + " FlowFiles";
+                logger.error(error + "; routing {} to failure", new 
Object[]{binDescription});
+                for ( final FlowFileSessionWrapper wrapper : binCopy ) {
                     wrapper.getSession().transfer(wrapper.getFlowFile(), 
REL_FAILURE);
                     wrapper.getSession().commit();
                 }
-                session.rollback();
-                return 1;
-            }
-            session.transfer(bundle, REL_MERGED);
-        }
-
-        // we first commit the bundle's session before the originals' sessions 
because if we are restarted or crash
-        // between commits, we favor data redundancy over data loss. Since we 
have no Distributed Transaction capability
-        // across multiple sessions, we cannot guarantee atomicity across the 
sessions
-        session.commit();
-        for (final Bin unmodifiableBin : bins) {
-            // If this bin's session has been committed, move on.
-            if ( committedBins.contains(unmodifiableBin) ) {
-                continue;
-            }
-            
-            for (final FlowFileSessionWrapper wrapper : 
unmodifiableBin.getContents()) {
-                wrapper.getSession().transfer(wrapper.getFlowFile(), 
REL_ORIGINAL);
-                wrapper.getSession().commit();
+                
+                return true;
             }
+            Collections.sort(binCopy, new FragmentComparator());
         }
 
-        return 1;
-    }
-
-    private int binFlowFiles(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
-        int binsAdded = 0;
-        while (binManager.getBinCount() < 
context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
-            if (!isScheduled()) {
-                return binsAdded;
-            }
-
-            final ProcessSession session = sessionFactory.createSession();
-            FlowFile flowFile = session.get();
-            if (flowFile == null) {
-                return binsAdded;
-            }
+        FlowFile bundle = merger.merge(context, session, binCopy);
 
-            // handle backward compatibility with old segment attributes
-            if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && 
flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
-                flowFile = session.putAttribute(flowFile, 
FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
-            }
-            if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && 
flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
-                flowFile = session.putAttribute(flowFile, 
FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
-            }
-            if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && 
flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
-                flowFile = session.putAttribute(flowFile, 
FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
-            }
+        // keep the filename, as it is added to the bundle.
+        final String filename = 
bundle.getAttribute(CoreAttributes.FILENAME.key());
 
-            final String correlationAttributeName = 
context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
-            String groupId = (correlationAttributeName == null) ? null : 
flowFile.getAttribute(correlationAttributeName);
+        // merge all of the attributes
+        final Map<String, String> bundleAttributes = 
attributeStrategy.getMergedAttributes(binCopy);
+        bundleAttributes.put(CoreAttributes.MIME_TYPE.key(), 
merger.getMergedContentType());
+        // restore the filename of the bundle
+        bundleAttributes.put(CoreAttributes.FILENAME.key(), filename);
+        bundleAttributes.put(MERGE_COUNT_ATTRIBUTE, 
Integer.toString(binCopy.size()));
+        bundleAttributes.put(MERGE_BIN_AGE_ATTRIBUTE, 
Long.toString(unmodifiableBin.getBinAge()));
 
-            // when MERGE_STRATEGY is Defragment and correlationAttributeName 
is null then bin by fragment.identifier
-            if (groupId == null && 
MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue()))
 {
-                groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
-            }
+        bundle = session.putAllAttributes(bundle, bundleAttributes);
 
-            final boolean binned = binManager.offer(groupId, flowFile, 
session);
+        final String inputDescription = (binCopy.size() < 10) ? 
binCopy.toString() : binCopy.size() + " FlowFiles";
+        logger.info("Merged {} into {}", new Object[]{inputDescription, 
bundle});
+        session.transfer(bundle, REL_MERGED);
 
-            // could not be added to a bin -- probably too large by itself, so 
create a separate bin for just this guy.
-            if (!binned) {
-                Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, 
null);
-                bin.offer(flowFile, session);
-                this.readyBins.add(bin);
-            }
-
-            binsAdded++;
-        }
-
-        return binsAdded;
+        // We haven't committed anything, parent will take care of it
+        return false;
     }
 
     private String getDefragmentValidationError(final 
List<FlowFileSessionWrapper> bin) {
@@ -578,60 +402,6 @@ public class MergeContent extends 
AbstractSessionFactoryProcessor {
         return NUMBER_PATTERN.matcher(value).matches();
     }
 
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) throws IOException {
-        
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
-
-        if (context.getProperty(MAX_BIN_AGE).isSet() ) {
-            
binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
-        } else {
-            binManager.setMaxBinAge(Integer.MAX_VALUE);
-        }
-        
-        if ( context.getProperty(MAX_SIZE).isSet() ) {
-            
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
-        } else {
-            binManager.setMaximumSize(Long.MAX_VALUE);
-        }
-        
-        if 
(MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue()))
 {
-            binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
-        } else {
-            
binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
-
-            if ( context.getProperty(MAX_ENTRIES).isSet() ) {
-                
binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
-            } else {
-                binManager.setMaximumEntries(Integer.MAX_VALUE);
-            }
-        }
-
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final 
ValidationContext context) {
-        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(context));
-
-        final long minBytes = 
context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
-        final Double maxBytes = 
context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
-
-        if (maxBytes != null && maxBytes.longValue() < minBytes) {
-            problems.add(new 
ValidationResult.Builder().subject(MIN_SIZE.getName()).input(
-                    
context.getProperty(MIN_SIZE).getValue()).valid(false).explanation("Min Size 
must be less than or equal to Max Size").build());
-        }
-
-        final Long min = context.getProperty(MIN_ENTRIES).asLong();
-        final Long max = context.getProperty(MAX_ENTRIES).asLong();
-
-        if (min != null && max != null) {
-            if (min > max) {
-                problems.add(new 
ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min
 Entries must be less than or equal to Max Entries").build());
-            }
-        }
-
-        return problems;
-    }
-
     private class BinaryConcatenationMerge implements MergeBin {
 
         private String mimeType = "application/octet-stream";

Reply via email to