http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 0000000,9311ee9..f2e4a8d
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@@ -1,0 -1,1015 +1,1015 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.nio.file.Paths;
+ import java.util.ArrayList;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.Comparator;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Queue;
+ import java.util.Set;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.regex.Pattern;
+ import java.util.zip.ZipEntry;
+ import java.util.zip.ZipOutputStream;
+ 
+ import org.apache.nifi.components.AllowableValue;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.components.ValidationContext;
+ import org.apache.nifi.components.ValidationResult;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.stream.io.BufferedInputStream;
+ import org.apache.nifi.stream.io.BufferedOutputStream;
+ import org.apache.nifi.stream.io.NonCloseableOutputStream;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.OnScheduled;
 -import org.apache.nifi.processor.annotation.OnStopped;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.Tags;
 -import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.InputStreamCallback;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.processors.standard.util.Bin;
+ import org.apache.nifi.processors.standard.util.BinManager;
+ import org.apache.nifi.processors.standard.util.FlowFileSessionWrapper;
+ import org.apache.nifi.util.FlowFilePackager;
+ import org.apache.nifi.util.FlowFilePackagerV1;
+ import org.apache.nifi.util.FlowFilePackagerV2;
+ import org.apache.nifi.util.FlowFilePackagerV3;
+ import org.apache.nifi.util.ObjectHolder;
+ import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+ import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+ 
+ @SideEffectFree
+ @TriggerWhenEmpty
+ @Tags({"merge", "content", "correlation", "tar", "zip", "stream", 
"concatenation", "archive", "flowfile-stream", "flowfile-stream-v3"})
+ @CapabilityDescription("Merges a Group of FlowFiles together based on a 
user-defined strategy and packages them into a single FlowFile. It is 
recommended that the Processor be configured with only a single incoming 
connection, as Group of FlowFiles will not be created from FlowFiles in 
different connections. This processor updates the mime.type attribute as 
appropriate.")
+ public class MergeContent extends AbstractSessionFactoryProcessor {
+ 
+     // preferred attributes
+     public static final String FRAGMENT_ID_ATTRIBUTE = "fragment.identifier";
+     public static final String FRAGMENT_INDEX_ATTRIBUTE = "fragment.index";
+     public static final String FRAGMENT_COUNT_ATTRIBUTE = "fragment.count";
+ 
+     // old style attributes
+     public static final String SEGMENT_ID_ATTRIBUTE = "segment.identifier";
+     public static final String SEGMENT_INDEX_ATTRIBUTE = "segment.index";
+     public static final String SEGMENT_COUNT_ATTRIBUTE = "segment.count";
+     public static final String SEGMENT_ORIGINAL_FILENAME = 
"segment.original.filename";
+ 
+     public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new 
AllowableValue(
+             "Bin-Packing Algorithm",
+             "Bin-Packing Algorithm",
+             "Generates 'bins' of FlowFiles and fills each bin as full as 
possible. FlowFiles are placed into a bin based on their size and optionally 
their attributes (if the <Correlation Attribute> property is set)");
+     public static final AllowableValue MERGE_STRATEGY_DEFRAGMENT = new 
AllowableValue(
+             "Defragment",
+             "Defragment",
+             "Combines fragments that are associated by attributes back into a 
single cohesive FlowFile. If using this strategy, all FlowFiles must have the 
attributes <fragment.identifier>, <fragment.count>, and <fragment.index> or 
alternatively (for backward compatibility purposes) <segment.identifier>, 
<segment.count>, and <segment.index>");
+ 
+     public static final String MERGE_FORMAT_TAR_VALUE = "TAR";
+     public static final String MERGE_FORMAT_ZIP_VALUE = "ZIP";
+     public static final String MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE = 
"FlowFile Stream, v3";
+     public static final String MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE = 
"FlowFile Stream, v2";
+     public static final String MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE = "FlowFile 
Tar, v1";
+     public static final String MERGE_FORMAT_CONCAT_VALUE = "Binary 
Concatenation";
+ 
+     public static final AllowableValue MERGE_FORMAT_TAR = new AllowableValue(
+             MERGE_FORMAT_TAR_VALUE,
+             MERGE_FORMAT_TAR_VALUE,
+             "A bin of FlowFiles will be combined into a single TAR file. The 
FlowFiles' <path> attribute will be used to create a directory in the TAR file 
if the <Keep Paths> property is set to true; otherwise, all FlowFiles will be 
added at the root of the TAR file. If a FlowFile has an attribute named 
<tar.permissions> that is 3 characters, each between 0-7, that attribute will 
be used as the TAR entry's 'mode'.");
+     public static final AllowableValue MERGE_FORMAT_ZIP = new AllowableValue(
+             MERGE_FORMAT_ZIP_VALUE,
+             MERGE_FORMAT_ZIP_VALUE,
+             "A bin of FlowFiles will be combined into a single ZIP file. The 
FlowFiles' <path> attribute will be used to create a directory in the ZIP file 
if the <Keep Paths> property is set to true; otherwise, all FlowFiles will be 
added at the root of the ZIP file. The <Compression Level> property indicates 
the ZIP compression to use.");
+     public static final AllowableValue MERGE_FORMAT_FLOWFILE_STREAM_V3 = new 
AllowableValue(
+             MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE,
+             MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE,
+             "A bin of FlowFiles will be combined into a single Version 3 
FlowFile Stream");
+     public static final AllowableValue MERGE_FORMAT_FLOWFILE_STREAM_V2 = new 
AllowableValue(
+             MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE,
+             MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE,
+             "A bin of FlowFiles will be combined into a single Version 2 
FlowFile Stream");
+     public static final AllowableValue MERGE_FORMAT_FLOWFILE_TAR_V1 = new 
AllowableValue(
+             MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE,
+             MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE,
+             "A bin of FlowFiles will be combined into a single Version 1 
FlowFile Package");
+     public static final AllowableValue MERGE_FORMAT_CONCAT = new 
AllowableValue(
+             MERGE_FORMAT_CONCAT_VALUE,
+             MERGE_FORMAT_CONCAT_VALUE,
+             "The contents of all FlowFiles will be concatenated together into 
a single FlowFile");
+ 
+     public static final String ATTRIBUTE_STRATEGY_ALL_COMMON = "Keep Only 
Common Attributes";
+     public static final String ATTRIBUTE_STRATEGY_ALL_UNIQUE = "Keep All 
Unique Attributes";
+ 
+     public static final String TAR_PERMISSIONS_ATTRIBUTE = "tar.permissions";
+     public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
+     public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
+ 
+     public static final PropertyDescriptor MERGE_STRATEGY = new 
PropertyDescriptor.Builder()
+             .name("Merge Strategy")
+             .description("Specifies the algorithm used to merge content. The 
'Defragment' algorithm combines fragments that are associated by attributes 
back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a 
FlowFile populated by arbitrarily chosen FlowFiles")
+             .required(true)
+             .allowableValues(MERGE_STRATEGY_BIN_PACK, 
MERGE_STRATEGY_DEFRAGMENT)
+             .defaultValue(MERGE_STRATEGY_BIN_PACK.getValue())
+             .build();
+     public static final PropertyDescriptor MERGE_FORMAT = new 
PropertyDescriptor.Builder()
+             .required(true)
+             .name("Merge Format")
+             .description("Determines the format that will be used to merge 
the content.")
+             .allowableValues(MERGE_FORMAT_TAR, MERGE_FORMAT_ZIP, 
MERGE_FORMAT_FLOWFILE_STREAM_V3, MERGE_FORMAT_FLOWFILE_STREAM_V2, 
MERGE_FORMAT_FLOWFILE_TAR_V1, MERGE_FORMAT_CONCAT)
+             .defaultValue(MERGE_FORMAT_CONCAT.getValue())
+             .build();
+     public static final PropertyDescriptor ATTRIBUTE_STRATEGY = new 
PropertyDescriptor.Builder()
+             .required(true)
+             .name("Attribute Strategy")
+             .description("Determines which FlowFile attributes should be 
added to the bundle. If 'Keep All Unique Attributes' is selected, any attribute 
on any FlowFile that gets bundled will be kept unless its value conflicts with 
the value from another FlowFile. If 'Keep Only Common Attributes' is selected, 
only the attributes that exist on all FlowFiles in the bundle, with the same 
value, will be preserved.")
+             .allowableValues(ATTRIBUTE_STRATEGY_ALL_COMMON, 
ATTRIBUTE_STRATEGY_ALL_UNIQUE)
+             .defaultValue(ATTRIBUTE_STRATEGY_ALL_COMMON)
+             .build();
+ 
+     public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new 
PropertyDescriptor.Builder()
+             .name("Correlation Attribute Name")
+             .description("If specified, like FlowFiles will be binned 
together, where 'like FlowFiles' means FlowFiles that have the same value for 
this Attribute. If not specified, FlowFiles are bundled by the order in which 
they are pulled from the queue.")
+             .required(false)
+             .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+             .defaultValue(null)
+             .build();
+ 
+     public static final PropertyDescriptor HEADER = new 
PropertyDescriptor.Builder()
+             .name("Header File")
+             .description("Filename specifying the header to use. If not 
specified, no header is supplied. This property is valid only when using the 
binary-concatenation merge strategy; otherwise, it is ignored.")
+             .required(false)
+             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor FOOTER = new 
PropertyDescriptor.Builder()
+             .name("Footer File")
+             .description("Filename specifying the footer to use. If not 
specified, no footer is supplied. This property is valid only when using the 
binary-concatenation merge strategy; otherwise, it is ignored.")
+             .required(false)
+             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor DEMARCATOR = new 
PropertyDescriptor.Builder()
+             .name("Demarcator File")
+             .description("Filename specifying the demarcator to use. If not 
specified, no demarcator is supplied. This property is valid only when using 
the binary-concatenation merge strategy; otherwise, it is ignored.")
+             .required(false)
+             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor COMPRESSION_LEVEL = new 
PropertyDescriptor.Builder()
+             .name("Compression Level")
+             .description("Specifies the compression level to use when using 
the Zip Merge Format; if not using the Zip Merge Format, this value is ignored")
+             .required(true)
+             .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
+             .defaultValue("1")
+             .build();
+     public static final PropertyDescriptor KEEP_PATH = new 
PropertyDescriptor.Builder()
+             .name("Keep Path")
+             .description("If using the Zip or Tar Merge Format, specifies 
whether or not the FlowFiles' paths should be included in their entry names; if 
using other merge strategy, this value is ignored")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+ 
+     public static final PropertyDescriptor MIN_SIZE = new 
PropertyDescriptor.Builder()
+             .name("Minimum Group Size")
+             .description("The minimum size of for the bundle")
+             .required(true)
+             .defaultValue("0 B")
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor MAX_SIZE = new 
PropertyDescriptor.Builder()
+             .name("Maximum Group Size")
+             .description("The maximum size for the bundle. If not specified, 
there is no maximum.")
+             .required(false)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor MIN_ENTRIES = new 
PropertyDescriptor.Builder()
+             .name("Minimum Number of Entries")
+             .description("The minimum number of files to include in a bundle")
+             .required(true)
+             .defaultValue("1")
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .build();
+     public static final PropertyDescriptor MAX_ENTRIES = new 
PropertyDescriptor.Builder()
+             .name("Maximum Number of Entries")
+             .description("The maximum number of files to include in a bundle. 
If not specified, there is no maximum.")
+             .required(false)
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor MAX_BIN_COUNT = new 
PropertyDescriptor.Builder()
+             .name("Maximum number of Bins")
+             .description("Specifies the maximum number of bins that can be 
held in memory at any one time")
+             .defaultValue("100")
+             .required(true)
+             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+             .build();
+ 
+     public static final PropertyDescriptor MAX_BIN_AGE = new 
PropertyDescriptor.Builder()
+             .name("Max Bin Age")
+             .description("The maximum age of a Bin that will trigger a Bin to 
be complete. Expected format is <duration> <time unit> where <duration> is a 
positive integer and time unit is one of seconds, minutes, hours")
+             .required(false)
+             .addValidator(StandardValidators.createTimePeriodValidator(1, 
TimeUnit.SECONDS, Integer.MAX_VALUE, TimeUnit.SECONDS))
+             .build();
+ 
+     public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original").description("The FlowFiles that were 
used to create the bundle").build();
+     public static final Relationship REL_MERGED = new 
Relationship.Builder().name("merged").description("The FlowFile containing the 
merged content").build();
+     public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("If the bundle cannot be 
created, all FlowFiles that would have been used to created the bundle will be 
transferred to failure").build();
+ 
+     public static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+ 
+     private Set<Relationship> relationships;
+     private List<PropertyDescriptor> descriptors;
+     private final BinManager binManager = new BinManager();
+ 
+     private final Queue<Bin> readyBins = new LinkedBlockingQueue<>();
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_ORIGINAL);
+         relationships.add(REL_MERGED);
+         relationships.add(REL_FAILURE);
+         this.relationships = Collections.unmodifiableSet(relationships);
+ 
+         final List<PropertyDescriptor> descriptors = new ArrayList<>();
+         descriptors.add(MERGE_STRATEGY);
+         descriptors.add(MERGE_FORMAT);
+         descriptors.add(ATTRIBUTE_STRATEGY);
+         descriptors.add(CORRELATION_ATTRIBUTE_NAME);
+         descriptors.add(MIN_ENTRIES);
+         descriptors.add(MAX_ENTRIES);
+         descriptors.add(MIN_SIZE);
+         descriptors.add(MAX_SIZE);
+         descriptors.add(MAX_BIN_AGE);
+         descriptors.add(MAX_BIN_COUNT);
+         descriptors.add(HEADER);
+         descriptors.add(FOOTER);
+         descriptors.add(DEMARCATOR);
+         descriptors.add(COMPRESSION_LEVEL);
+         descriptors.add(KEEP_PATH);
+ 
+         this.descriptors = Collections.unmodifiableList(descriptors);
+     }
+ 
+     @OnStopped
+     public void resetState() {
+         binManager.purge();
+ 
+         Bin bin;
+         while ((bin = readyBins.poll()) != null) {
+             for (final FlowFileSessionWrapper wrapper : bin.getContents()) {
+                 wrapper.getSession().rollback();
+             }
+         }
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return descriptors;
+     }
+ 
+     private byte[] readContent(final String filename) throws IOException {
+         return Files.readAllBytes(Paths.get(filename));
+     }
+ 
+     
+     @Override
+     public void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
+         int binsAdded = binFlowFiles(context, sessionFactory);
+         getLogger().debug("Binned {} FlowFiles", new Object[] {binsAdded});
+         
+         if (!isScheduled()) {
+             return;
+         }
+ 
+         binsAdded += migrateBins(context);
+ 
+         final int binsProcessed = processBins(context, sessionFactory);
+         if (binsProcessed == 0 && binsAdded == 0) {
+             context.yield();
+         }
+     }
+     
+ 
+     private int migrateBins(final ProcessContext context) {
+         int added = 0;
+         for (final Bin bin : binManager.removeReadyBins(true)) {
+             this.readyBins.add(bin);
+             added++;
+         }
+ 
+         // if we have created all of the bins that are allowed, go ahead and 
remove the oldest one. If we don't do
+         // this, then we will simply wait for it to expire because we can't 
get any more FlowFiles into the
+         // bins. So we may as well expire it now.
+         if (added == 0 && binManager.getBinCount() >= 
context.getProperty(MAX_BIN_COUNT).asInteger()) {
+             final Bin bin = binManager.removeOldestBin();
+             if (bin != null) {
+                 added++;
+                 this.readyBins.add(bin);
+             }
+         }
+ 
+         return added;
+     }
+ 
+     private int processBins(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
+         final Bin bin = readyBins.poll();
+         if (bin == null) {
+             return 0;
+         }
+ 
+         final String mergeFormat = 
context.getProperty(MERGE_FORMAT).getValue();
+         MergeBin merger;
+         switch (mergeFormat) {
+             case MERGE_FORMAT_TAR_VALUE:
+                 merger = new TarMerge();
+                 break;
+             case MERGE_FORMAT_ZIP_VALUE:
+                 merger = new 
ZipMerge(context.getProperty(COMPRESSION_LEVEL).asInteger());
+                 break;
+             case MERGE_FORMAT_FLOWFILE_STREAM_V3_VALUE:
+                 merger = new FlowFileStreamMerger(new FlowFilePackagerV3(), 
"application/flowfile-v3");
+                 break;
+             case MERGE_FORMAT_FLOWFILE_STREAM_V2_VALUE:
+                 merger = new FlowFileStreamMerger(new FlowFilePackagerV2(), 
"application/flowfile-v2");
+                 break;
+             case MERGE_FORMAT_FLOWFILE_TAR_V1_VALUE:
+                 merger = new FlowFileStreamMerger(new FlowFilePackagerV1(), 
"application/flowfile-v1");
+                 break;
+             case MERGE_FORMAT_CONCAT_VALUE:
+                 merger = new BinaryConcatenationMerge();
+                 break;
+             default:
+                 throw new AssertionError();
+         }
+ 
+         final AttributeStrategy attributeStrategy;
+         switch (context.getProperty(ATTRIBUTE_STRATEGY).getValue()) {
+             case ATTRIBUTE_STRATEGY_ALL_UNIQUE:
+                 attributeStrategy = new KeepUniqueAttributeStrategy();
+                 break;
+             case ATTRIBUTE_STRATEGY_ALL_COMMON:
+             default:
+                 attributeStrategy = new KeepCommonAttributeStrategy();
+                 break;
+         }
+ 
+         final List<Bin> bins = new ArrayList<>();
+         bins.add(bin);
+ 
+         final ProcessorLog logger = getLogger();
+         final ProcessSession session = sessionFactory.createSession();
+ 
+         final Set<Bin> committedBins = new HashSet<>();
+         
+         for (final Bin unmodifiableBin : bins) {
+             final List<FlowFileSessionWrapper> binCopy = new 
ArrayList<>(unmodifiableBin.getContents());
+ 
+             if 
(MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue()))
 {
+                 final String error = getDefragmentValidationError(binCopy);
+                 if (error != null) {
+                     final String binDescription = binCopy.size() <= 10 ? 
binCopy.toString() : binCopy.size() + " FlowFiles";
+                     logger.error(error + "; routing {} to failure", new 
Object[]{binDescription});
+                     for ( final FlowFileSessionWrapper wrapper : binCopy ) {
+                         wrapper.getSession().transfer(wrapper.getFlowFile(), 
REL_FAILURE);
+                         wrapper.getSession().commit();
+                         committedBins.add(unmodifiableBin);
+                     }
+                     
+                     continue;
+                 }
+ 
+                 Collections.sort(binCopy, new FragmentComparator());
+             }
+ 
+             FlowFile bundle = null;
+             try {
+                 bundle = merger.merge(context, session, binCopy);
+ 
+                 // keep the filename, as it is added to the bundle.
+                 final String filename = 
bundle.getAttribute(CoreAttributes.FILENAME.key());
+ 
+                 // merge all of the attributes
+                 final Map<String, String> bundleAttributes = 
attributeStrategy.getMergedAttributes(binCopy);
+                 bundleAttributes.put(CoreAttributes.MIME_TYPE.key(), 
merger.getMergedContentType());
+                 // restore the filename of the bundle
+                 bundleAttributes.put(CoreAttributes.FILENAME.key(), filename);
+                 bundleAttributes.put(MERGE_COUNT_ATTRIBUTE, 
Integer.toString(binCopy.size()));
+                 bundleAttributes.put(MERGE_BIN_AGE_ATTRIBUTE, 
Long.toString(bin.getBinAge()));
+ 
+                 bundle = session.putAllAttributes(bundle, bundleAttributes);
+ 
+                 final String inputDescription = (binCopy.size() < 10) ? 
binCopy.toString() : binCopy.size() + " FlowFiles";
+                 logger.info("Merged {} into {}", new 
Object[]{inputDescription, bundle});
+             } catch (final Exception e) {
+                 logger.error("Failed to process bundle of {} files due to 
{}", new Object[]{binCopy.size(), e});
+ 
+                 for (final FlowFileSessionWrapper wrapper : binCopy) {
+                     wrapper.getSession().transfer(wrapper.getFlowFile(), 
REL_FAILURE);
+                     wrapper.getSession().commit();
+                 }
+                 session.rollback();
+                 return 1;
+             }
+             session.transfer(bundle, REL_MERGED);
+         }
+ 
+         // we first commit the bundle's session before the originals' 
sessions because if we are restarted or crash
+         // between commits, we favor data redundancy over data loss. Since we 
have no Distributed Transaction capability
+         // across multiple sessions, we cannot guarantee atomicity across the 
sessions
+         session.commit();
+         for (final Bin unmodifiableBin : bins) {
+             // If this bin's session has been committed, move on.
+             if ( committedBins.contains(unmodifiableBin) ) {
+                 continue;
+             }
+             
+             for (final FlowFileSessionWrapper wrapper : 
unmodifiableBin.getContents()) {
+                 wrapper.getSession().transfer(wrapper.getFlowFile(), 
REL_ORIGINAL);
+                 wrapper.getSession().commit();
+             }
+         }
+ 
+         return 1;
+     }
+ 
+     private int binFlowFiles(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) {
+         int binsAdded = 0;
+         while (binManager.getBinCount() < 
context.getProperty(MAX_BIN_COUNT).asInteger().intValue()) {
+             if (!isScheduled()) {
+                 return binsAdded;
+             }
+ 
+             final ProcessSession session = sessionFactory.createSession();
+             FlowFile flowFile = session.get();
+             if (flowFile == null) {
+                 return binsAdded;
+             }
+ 
+             // handle backward compatibility with old segment attributes
+             if (flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null && 
flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
+                 flowFile = session.putAttribute(flowFile, 
FRAGMENT_COUNT_ATTRIBUTE, flowFile.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
+             }
+             if (flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null && 
flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
+                 flowFile = session.putAttribute(flowFile, 
FRAGMENT_INDEX_ATTRIBUTE, flowFile.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
+             }
+             if (flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null && 
flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
+                 flowFile = session.putAttribute(flowFile, 
FRAGMENT_ID_ATTRIBUTE, flowFile.getAttribute(SEGMENT_ID_ATTRIBUTE));
+             }
+ 
+             final String correlationAttributeName = 
context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
+             String groupId = (correlationAttributeName == null) ? null : 
flowFile.getAttribute(correlationAttributeName);
+ 
+             // when MERGE_STRATEGY is Defragment and correlationAttributeName 
is null then bin by fragment.identifier
+             if (groupId == null && 
MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue()))
 {
+                 groupId = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
+             }
+ 
+             final boolean binned = binManager.offer(groupId, flowFile, 
session);
+ 
+             // could not be added to a bin -- probably too large by itself, 
so create a separate bin for just this guy.
+             if (!binned) {
+                 Bin bin = new Bin(0, Long.MAX_VALUE, 0, Integer.MAX_VALUE, 
null);
+                 bin.offer(flowFile, session);
+                 this.readyBins.add(bin);
+             }
+ 
+             binsAdded++;
+         }
+ 
+         return binsAdded;
+     }
+ 
+     private String getDefragmentValidationError(final 
List<FlowFileSessionWrapper> bin) {
+         if (bin.isEmpty()) {
+             return null;
+         }
+ 
+         // If we are defragmenting, all fragments must have the appropriate 
attributes.
+         String decidedFragmentCount = null;
+         String fragmentIdentifier = null;
+         for (final FlowFileSessionWrapper flowFileWrapper : bin) {
+             final FlowFile flowFile = flowFileWrapper.getFlowFile();
+ 
+             final String fragmentIndex = 
flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE);
+             if (!isNumber(fragmentIndex)) {
+                 return "Cannot Defragment " + flowFile + " because it does 
not have an integer value for the " + FRAGMENT_INDEX_ATTRIBUTE + " attribute";
+             }
+             
+             fragmentIdentifier = flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
+ 
+             final String fragmentCount = 
flowFile.getAttribute(FRAGMENT_COUNT_ATTRIBUTE);
+             if (!isNumber(fragmentCount)) {
+                 return "Cannot Defragment " + flowFile + " because it does 
not have an integer value for the " + FRAGMENT_COUNT_ATTRIBUTE + " attribute";
+             } else if (decidedFragmentCount == null) {
+                 decidedFragmentCount = fragmentCount;
+             } else if (!decidedFragmentCount.equals(fragmentCount)) {
+                 return "Cannot Defragment " + flowFile + " because it is 
grouped with another FlowFile, and the two have differing values for the " + 
FRAGMENT_COUNT_ATTRIBUTE + " attribute: " + decidedFragmentCount + " and " + 
fragmentCount;
+             }
+         }
+         
+         final int numericFragmentCount;
+         try {
+             numericFragmentCount = Integer.parseInt(decidedFragmentCount);
+         } catch (final NumberFormatException nfe) {
+             return "Cannot Defragment FlowFiles with Fragment Identifier " + 
fragmentIdentifier + " because the " + FRAGMENT_COUNT_ATTRIBUTE + " has a 
non-integer value of " + decidedFragmentCount;
+         }
+         
+         if ( bin.size() < numericFragmentCount ) {
+             return "Cannot Defragment FlowFiles with Fragment Identifier " + 
fragmentIdentifier + " because the expected number of fragments is " + 
decidedFragmentCount + " but found only " + bin.size() + " fragments";
+         }
+         
+         if ( bin.size() > numericFragmentCount ) {
+             return "Cannot Defragment FlowFiles with Fragment Identifier " + 
fragmentIdentifier + " because the expected number of fragments is " + 
decidedFragmentCount + " but found " + bin.size() + " fragments for this 
identifier";
+         }
+ 
+         return null;
+     }
+ 
+     private boolean isNumber(final String value) {
+         if (value == null) {
+             return false;
+         }
+ 
+         return NUMBER_PATTERN.matcher(value).matches();
+     }
+ 
+     @OnScheduled
+     public void onScheduled(final ProcessContext context) throws IOException {
+         
binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue());
+ 
+         if (context.getProperty(MAX_BIN_AGE).isSet() ) {
+             
binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue());
+         } else {
+             binManager.setMaxBinAge(Integer.MAX_VALUE);
+         }
+         
+         if ( context.getProperty(MAX_SIZE).isSet() ) {
+             
binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue());
+         } else {
+             binManager.setMaximumSize(Long.MAX_VALUE);
+         }
+         
+         if 
(MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue()))
 {
+             binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE);
+         } else {
+             
binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger());
+ 
+             if ( context.getProperty(MAX_ENTRIES).isSet() ) {
+                 
binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue());
+             } else {
+                 binManager.setMaximumEntries(Integer.MAX_VALUE);
+             }
+         }
+ 
+     }
+ 
+     @Override
+     protected Collection<ValidationResult> customValidate(final 
ValidationContext context) {
+         final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(context));
+ 
+         final long minBytes = 
context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue();
+         final Double maxBytes = 
context.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
+ 
+         if (maxBytes != null && maxBytes.longValue() < minBytes) {
+             problems.add(new 
ValidationResult.Builder().subject(MIN_SIZE.getName()).input(
+                     
context.getProperty(MIN_SIZE).getValue()).valid(false).explanation("Min Size 
must be less than or equal to Max Size").build());
+         }
+ 
+         final Long min = context.getProperty(MIN_ENTRIES).asLong();
+         final Long max = context.getProperty(MAX_ENTRIES).asLong();
+ 
+         if (min != null && max != null) {
+             if (min > max) {
+                 problems.add(new 
ValidationResult.Builder().subject(MIN_ENTRIES.getName()).input(context.getProperty(MIN_ENTRIES).getValue()).valid(false).explanation("Min
 Entries must be less than or equal to Max Entries").build());
+             }
+         }
+ 
+         return problems;
+     }
+ 
+     private class BinaryConcatenationMerge implements MergeBin {
+ 
+         private String mimeType = "application/octet-stream";
+ 
+         public BinaryConcatenationMerge() {
+         }
+ 
+         @Override
+         public FlowFile merge(final ProcessContext context, final 
ProcessSession session, final List<FlowFileSessionWrapper> wrappers) {
+             final Set<FlowFile> parentFlowFiles = new HashSet<>();
+             for (final FlowFileSessionWrapper wrapper : wrappers) {
+                 parentFlowFiles.add(wrapper.getFlowFile());
+             }
+ 
+             FlowFile bundle = session.create(parentFlowFiles);
+             final ObjectHolder<String> bundleMimeTypeRef = new 
ObjectHolder<>(null);
+             bundle = session.write(bundle, new OutputStreamCallback() {
+                 @Override
+                 public void process(final OutputStream out) throws 
IOException {
+                     final byte[] header = getDescriptorFileContent(context, 
wrappers, HEADER);
+                     if (header != null) {
+                         out.write(header);
+                     }
+ 
+                     boolean isFirst = true;
+                     final Iterator<FlowFileSessionWrapper> itr = 
wrappers.iterator();
+                     while (itr.hasNext()) {
+                         final FlowFileSessionWrapper wrapper = itr.next();
+                         wrapper.getSession().read(wrapper.getFlowFile(), new 
InputStreamCallback() {
+                             @Override
+                             public void process(final InputStream in) throws 
IOException {
+                                 StreamUtils.copy(in, out);
+                             }
+                         });
+ 
+                         if (itr.hasNext()) {
+                             final byte[] demarcator = 
getDescriptorFileContent(context, wrappers, DEMARCATOR);
+                             if (demarcator != null) {
+                                 out.write(demarcator);
+                             }
+                         }
+ 
+                         final String flowFileMimeType = 
wrapper.getFlowFile().getAttribute(CoreAttributes.MIME_TYPE.key());
+                         if (isFirst) {
+                             bundleMimeTypeRef.set(flowFileMimeType);
+                             isFirst = false;
+                         } else {
+                             if (bundleMimeTypeRef.get() != null && 
!bundleMimeTypeRef.get().equals(flowFileMimeType)) {
+                                 bundleMimeTypeRef.set(null);
+                             }
+                         }
+                     }
+ 
+                     final byte[] footer = getDescriptorFileContent(context, 
wrappers, FOOTER);
+                     if (footer != null) {
+                         out.write(footer);
+                     }
+                 }
+             });
+ 
+             session.getProvenanceReporter().join(getFlowFiles(wrappers), 
bundle);
+             bundle = session.putAttribute(bundle, 
CoreAttributes.FILENAME.key(), createFilename(wrappers));
+             if (bundleMimeTypeRef.get() != null) {
+                 this.mimeType = bundleMimeTypeRef.get();
+             }
+ 
+             return bundle;
+         }
+ 
+         private byte[] getDescriptorFileContent(final ProcessContext context, 
final List<FlowFileSessionWrapper> wrappers, final PropertyDescriptor 
descriptor)
+                 throws IOException {
+             byte[] property = null;
+             final String descriptorFile = 
context.getProperty(descriptor).getValue();
+             if (descriptorFile != null && wrappers != null && wrappers.size() 
> 0) {
+                 final String content = new 
String(readContent(descriptorFile));
+                 final FlowFileSessionWrapper wrapper = wrappers.get(0);
+                 if (wrapper != null && content != null) {
+                     final FlowFile flowFile = wrapper.getFlowFile();
+                     if (flowFile != null) {
+                         final PropertyValue propVal = 
context.newPropertyValue(content).evaluateAttributeExpressions(flowFile);
+                         property = propVal.getValue().getBytes();
+                     }
+                 }
+             }
+             return property;
+         }
+ 
+         @Override
+         public String getMergedContentType() {
+             return mimeType;
+         }
+     }
+ 
+     private List<FlowFile> getFlowFiles(final List<FlowFileSessionWrapper> 
sessionWrappers) {
+         final List<FlowFile> flowFiles = new ArrayList<>();
+         for (final FlowFileSessionWrapper wrapper : sessionWrappers) {
+             flowFiles.add(wrapper.getFlowFile());
+         }
+         return flowFiles;
+     }
+ 
+     private String getPath(final FlowFile flowFile) {
+         Path path = 
Paths.get(flowFile.getAttribute(CoreAttributes.PATH.key()));
+         if (path.getNameCount() == 0) {
+             return "";
+         }
+ 
+         if (".".equals(path.getName(0).toString())) {
+             path = (path.getNameCount() == 1) ? null : path.subpath(1, 
path.getNameCount());
+         }
+ 
+         return (path == null) ? "" : path.toString() + "/";
+     }
+ 
+     private String createFilename(final List<FlowFileSessionWrapper> 
wrappers) {
+         if (wrappers.size() == 1) {
+             return 
wrappers.get(0).getFlowFile().getAttribute(CoreAttributes.FILENAME.key());
+         } else {
+             FlowFile ff = wrappers.get(0).getFlowFile();
+             String origFilename = ff.getAttribute(SEGMENT_ORIGINAL_FILENAME);
+             if (origFilename != null) {
+                 return origFilename;
+             } else {
+                 return String.valueOf(System.nanoTime());
+             }
+         }
+     }
+ 
+     private class TarMerge implements MergeBin {
+ 
+         @Override
+         public FlowFile merge(final ProcessContext context, final 
ProcessSession session, final List<FlowFileSessionWrapper> wrappers) {
+             final boolean keepPath = 
context.getProperty(KEEP_PATH).asBoolean();
+             FlowFile bundle = session.create(); // we don't pass the parents 
to the #create method because the parents belong to different sessions
+ 
+             bundle = session.putAttribute(bundle, 
CoreAttributes.FILENAME.key(), createFilename(wrappers) + ".tar");
+             bundle = session.write(bundle, new OutputStreamCallback() {
+                 @Override
+                 public void process(final OutputStream rawOut) throws 
IOException {
+                     try (final OutputStream bufferedOut = new 
BufferedOutputStream(rawOut);
+                             final TarArchiveOutputStream out = new 
TarArchiveOutputStream(bufferedOut)) {
+                         
out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+                         for (final FlowFileSessionWrapper wrapper : wrappers) 
{
+                             final FlowFile flowFile = wrapper.getFlowFile();
+ 
+                             final String path = keepPath ? getPath(flowFile) 
: "";
+                             final String entryName = path + 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ 
+                             final TarArchiveEntry tarEntry = new 
TarArchiveEntry(entryName);
+                             tarEntry.setSize(flowFile.getSize());
+                             final String permissionsVal = 
flowFile.getAttribute(TAR_PERMISSIONS_ATTRIBUTE);
+                             if (permissionsVal != null) {
+                                 try {
+                                     
tarEntry.setMode(Integer.parseInt(permissionsVal));
+                                 } catch (final Exception e) {
+                                     getLogger().debug("Attribute {} of {} is 
set to {}; expected 3 digits between 0-7, so ignoring",
+                                             new 
Object[]{TAR_PERMISSIONS_ATTRIBUTE, flowFile, permissionsVal});
+                                 }
+                             }
+ 
+                             out.putArchiveEntry(tarEntry);
+ 
+                             wrapper.getSession().exportTo(flowFile, out);
+                             out.closeArchiveEntry();
+                         }
+                     }
+                 }
+             });
+ 
+             session.getProvenanceReporter().join(getFlowFiles(wrappers), 
bundle);
+             return bundle;
+         }
+ 
+         @Override
+         public String getMergedContentType() {
+             return "application/tar";
+         }
+     }
+ 
+     private class FlowFileStreamMerger implements MergeBin {
+ 
+         private final FlowFilePackager packager;
+         private final String mimeType;
+ 
+         public FlowFileStreamMerger(final FlowFilePackager packager, final 
String mimeType) {
+             this.packager = packager;
+             this.mimeType = mimeType;
+         }
+ 
+         @Override
+         public FlowFile merge(final ProcessContext context, final 
ProcessSession session, final List<FlowFileSessionWrapper> wrappers) {
+             FlowFile bundle = session.create(); // we don't pass the parents 
to the #create method because the parents belong to different sessions
+ 
+             bundle = session.write(bundle, new OutputStreamCallback() {
+                 @Override
+                 public void process(final OutputStream rawOut) throws 
IOException {
+                     try (final OutputStream bufferedOut = new 
BufferedOutputStream(rawOut)) {
+                         // we don't want the packager closing the stream. V1 
creates a TAR Output Stream, which then gets
+                         // closed, which in turn closes the underlying 
OutputStream, and we want to protect ourselves against that.
+                         final OutputStream out = new 
NonCloseableOutputStream(bufferedOut);
+ 
+                         for (final FlowFileSessionWrapper wrapper : wrappers) 
{
+                             final FlowFile flowFile = wrapper.getFlowFile();
+                             wrapper.getSession().read(flowFile, new 
InputStreamCallback() {
+                                 @Override
+                                 public void process(final InputStream rawIn) 
throws IOException {
+                                     try (final InputStream in = new 
BufferedInputStream(rawIn)) {
+                                         final Map<String, String> attributes 
= new HashMap<>(flowFile.getAttributes());
+ 
+                                         // for backward compatibility 
purposes, we add the "legacy" NiFi attributes
+                                         attributes.put("nf.file.name", 
attributes.get(CoreAttributes.FILENAME.key()));
+                                         attributes.put("nf.file.path", 
attributes.get(CoreAttributes.PATH.key()));
+                                         if 
(attributes.containsKey(CoreAttributes.MIME_TYPE.key())) {
+                                             attributes.put("content-type", 
attributes.get(CoreAttributes.MIME_TYPE.key()));
+                                         }
+ 
+                                         packager.packageFlowFile(in, out, 
attributes, flowFile.getSize());
+                                     }
+                                 }
+                             });
+                         }
+                     }
+                 }
+             });
+ 
+             bundle = session.putAttribute(bundle, 
CoreAttributes.FILENAME.key(), createFilename(wrappers) + ".pkg");
+             session.getProvenanceReporter().join(getFlowFiles(wrappers), 
bundle);
+             return bundle;
+         }
+ 
+         @Override
+         public String getMergedContentType() {
+             return mimeType;
+         }
+     }
+ 
+     private class ZipMerge implements MergeBin {
+ 
+         private final int compressionLevel;
+ 
+         public ZipMerge(final int compressionLevel) {
+             this.compressionLevel = compressionLevel;
+         }
+ 
+         @Override
+         public FlowFile merge(final ProcessContext context, final 
ProcessSession session, final List<FlowFileSessionWrapper> wrappers) {
+             final boolean keepPath = 
context.getProperty(KEEP_PATH).asBoolean();
+ 
+             FlowFile bundle = session.create(); // we don't pass the parents 
to the #create method because the parents belong to different sessions
+ 
+             bundle = session.putAttribute(bundle, 
CoreAttributes.FILENAME.key(), createFilename(wrappers) + ".zip");
+             bundle = session.write(bundle, new OutputStreamCallback() {
+                 @Override
+                 public void process(final OutputStream rawOut) throws 
IOException {
+                     try (final OutputStream bufferedOut = new 
BufferedOutputStream(rawOut);
+                             final ZipOutputStream out = new 
ZipOutputStream(bufferedOut)) {
+                         out.setLevel(compressionLevel);
+                         for (final FlowFileSessionWrapper wrapper : wrappers) 
{
+                             final FlowFile flowFile = wrapper.getFlowFile();
+ 
+                             final String path = keepPath ? getPath(flowFile) 
: "";
+                             final String entryName = path + 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+                             final ZipEntry zipEntry = new ZipEntry(entryName);
+                             zipEntry.setSize(flowFile.getSize());
+                             out.putNextEntry(zipEntry);
+ 
+                             wrapper.getSession().exportTo(flowFile, out);
+                             out.closeEntry();
+                         }
+ 
+                         out.finish();
+                         out.flush();
+                     }
+                 }
+             });
+ 
+             session.getProvenanceReporter().join(getFlowFiles(wrappers), 
bundle);
+             return bundle;
+         }
+ 
+         @Override
+         public String getMergedContentType() {
+             return "application/zip";
+         }
+     }
+ 
+     private static class KeepUniqueAttributeStrategy implements 
AttributeStrategy {
+ 
+         @Override
+         public Map<String, String> getMergedAttributes(final 
List<FlowFileSessionWrapper> flowFiles) {
+             final Map<String, String> newAttributes = new HashMap<>();
+             final Set<String> conflicting = new HashSet<>();
+ 
+             for (final FlowFileSessionWrapper wrapper : flowFiles) {
+                 final FlowFile flowFile = wrapper.getFlowFile();
+ 
+                 for (final Map.Entry<String, String> attributeEntry : 
flowFile.getAttributes().entrySet()) {
+                     final String name = attributeEntry.getKey();
+                     final String value = attributeEntry.getValue();
+ 
+                     final String existingValue = newAttributes.get(name);
+                     if (existingValue != null && 
!existingValue.equals(value)) {
+                         conflicting.add(name);
+                     } else {
+                         newAttributes.put(name, value);
+                     }
+                 }
+             }
+ 
+             for (final String attributeToRemove : conflicting) {
+                 newAttributes.remove(attributeToRemove);
+             }
+ 
+             // Never copy the UUID from the parents - which could happen if 
we don't remove it and there is only 1 parent.
+             newAttributes.remove(CoreAttributes.UUID.key());
+             return newAttributes;
+         }
+     }
+ 
+     private static class KeepCommonAttributeStrategy implements 
AttributeStrategy {
+ 
+         @Override
+         public Map<String, String> getMergedAttributes(final 
List<FlowFileSessionWrapper> flowFiles) {
+             final Map<String, String> result = new HashMap<>();
+ 
+             //trivial cases
+             if (flowFiles == null || flowFiles.isEmpty()) {
+                 return result;
+             } else if (flowFiles.size() == 1) {
+                 
result.putAll(flowFiles.iterator().next().getFlowFile().getAttributes());
+             }
+ 
+             /*
+              * Start with the first attribute map and only put an entry to the
+              * resultant map if it is common to every map.
+              */
+             final Map<String, String> firstMap = 
flowFiles.iterator().next().getFlowFile().getAttributes();
+ 
+             outer:
+             for (final Map.Entry<String, String> mapEntry : 
firstMap.entrySet()) {
+                 final String key = mapEntry.getKey();
+                 final String value = mapEntry.getValue();
+ 
+                 for (final FlowFileSessionWrapper flowFileWrapper : 
flowFiles) {
+                     final Map<String, String> currMap = 
flowFileWrapper.getFlowFile().getAttributes();
+                     final String curVal = currMap.get(key);
+                     if (curVal == null || !curVal.equals(value)) {
+                         continue outer;
+                     }
+                 }
+                 result.put(key, value);
+             }
+ 
+             // Never copy the UUID from the parents - which could happen if 
we don't remove it and there is only 1 parent.
+             result.remove(CoreAttributes.UUID.key());
+             return result;
+         }
+     }
+ 
+     private static class FragmentComparator implements 
Comparator<FlowFileSessionWrapper> {
+ 
+         @Override
+         public int compare(final FlowFileSessionWrapper o1, final 
FlowFileSessionWrapper o2) {
+             final int fragmentIndex1 = 
Integer.parseInt(o1.getFlowFile().getAttribute(FRAGMENT_INDEX_ATTRIBUTE));
+             final int fragmentIndex2 = 
Integer.parseInt(o2.getFlowFile().getAttribute(FRAGMENT_INDEX_ATTRIBUTE));
+             return Integer.compare(fragmentIndex1, fragmentIndex2);
+         }
+     }
+ 
+     private interface MergeBin {
+ 
+         FlowFile merge(ProcessContext context, ProcessSession session, 
List<FlowFileSessionWrapper> flowFiles);
+ 
+         String getMergedContentType();
+     }
+ 
+     private interface AttributeStrategy {
+ 
+         Map<String, String> getMergedAttributes(List<FlowFileSessionWrapper> 
flowFiles);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java
index 0000000,a8b190d..ccebb46
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java
@@@ -1,0 -1,134 +1,134 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.stream.io.StreamUtils;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.EventDriven;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.Tags;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.EventDriven;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.documentation.Tags;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.io.StreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ import org.apache.nifi.util.StopWatch;
+ 
+ @EventDriven
+ @SideEffectFree
+ @Tags({"binary", "discard", "keep"})
+ @CapabilityDescription("Keep or discard bytes range from a binary file.")
+ public class ModifyBytes extends AbstractProcessor {
+ 
+     // Relationships
+     public static final Relationship REL_SUCCESS = new Relationship.Builder()
+             .name("success")
+             .description("Processed flowfiles.")
+             .build();
+     //
+     private final Set<Relationship> relationships;
+     // Properties
+     public static final PropertyDescriptor START_OFFSET = new 
PropertyDescriptor.Builder()
+             .name("Start Offset")
+             .description("Number of bytes removed at the beginning of the 
file.")
+             .required(true)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("0 B")
+             .build();
+     public static final PropertyDescriptor END_OFFSET = new 
PropertyDescriptor.Builder()
+             .name("End Offset")
+             .description("Number of bytes removed at the end of the file.")
+             .required(true)
+             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+             .defaultValue("0 B")
+             .build();
+     // 
+     private final List<PropertyDescriptor> propDescriptors;
+ 
+     public ModifyBytes() {
+         HashSet<Relationship> r = new HashSet<>();
+         r.add(REL_SUCCESS);
+         relationships = Collections.unmodifiableSet(r);
+ 
+         ArrayList<PropertyDescriptor> pds = new ArrayList<>();
+         pds.add(START_OFFSET);
+         pds.add(END_OFFSET);
+         propDescriptors = Collections.unmodifiableList(pds);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return propDescriptors;
+     }
+ 
+     @Override
+     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+         FlowFile ff = session.get();
+         if (null == ff) {
+             return;
+         }
+ 
+         final ProcessorLog logger = getLogger();
+ 
+         final int startOffset = 
context.getProperty(START_OFFSET).asDataSize(DataUnit.B).intValue();
+         final int endOffset = 
context.getProperty(END_OFFSET).asDataSize(DataUnit.B).intValue();
+         final int newFileSize = (int) ff.getSize() - startOffset - endOffset;
+ 
+         final StopWatch stopWatch = new StopWatch(true);
+         if (newFileSize <= 0) {
+             ff = session.write(ff, new OutputStreamCallback() {
+                 @Override
+                 public void process(final OutputStream out) throws 
IOException {
+                     out.write(new byte[0]);
+                 }
+             });
+         } else {
+             ff = session.write(ff, new StreamCallback() {
+                 @Override
+                 public void process(final InputStream in, final OutputStream 
out) throws IOException {
+                     in.skip(startOffset);
+                     StreamUtils.copy(in, out, newFileSize);
+                 }
+             });
+         }
+ 
+         logger.info("Transferred {} to 'success'", new Object[]{ff});
+         session.getProvenanceReporter().modifyContent(ff, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+         session.transfer(ff, REL_SUCCESS);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
----------------------------------------------------------------------
diff --cc 
nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
index 0000000,115f234..c5fce3c
mode 000000,100644..100644
--- 
a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
+++ 
b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java
@@@ -1,0 -1,206 +1,206 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processors.standard;
+ 
+ import java.io.IOException;
+ import java.io.OutputStream;
+ import java.nio.charset.Charset;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.processor.AbstractProcessor;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.ProcessorInitializationContext;
+ import org.apache.nifi.processor.Relationship;
 -import org.apache.nifi.processor.annotation.CapabilityDescription;
 -import org.apache.nifi.processor.annotation.SideEffectFree;
 -import org.apache.nifi.processor.annotation.Tags;
 -import org.apache.nifi.processor.annotation.TriggerSerially;
 -import org.apache.nifi.processor.annotation.TriggerWhenEmpty;
++import org.apache.nifi.annotation.documentation.CapabilityDescription;
++import org.apache.nifi.annotation.behavior.SideEffectFree;
++import org.apache.nifi.annotation.documentation.Tags;
++import org.apache.nifi.annotation.behavior.TriggerSerially;
++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+ import org.apache.nifi.processor.io.OutputStreamCallback;
+ import org.apache.nifi.processor.util.StandardValidators;
+ 
+ @SideEffectFree
+ @TriggerSerially
+ @TriggerWhenEmpty
+ @Tags({"monitor", "flow", "active", "inactive", "activity", "detection"})
+ @CapabilityDescription("Monitors the flow for activity and sends out an 
indicator when the flow has not had any data for "
+         + "some specified amount of time and again when the flow's activity 
is restored")
+ public class MonitorActivity extends AbstractProcessor {
+ 
+     public static final PropertyDescriptor THRESHOLD = new 
PropertyDescriptor.Builder()
+             .name("Threshold Duration")
+             .description("Determines how much time must elapse before 
considering the flow to be inactive")
+             .required(true)
+             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+             .defaultValue("5 min")
+             .build();
+     public static final PropertyDescriptor CONTINUALLY_SEND_MESSAGES = new 
PropertyDescriptor.Builder()
+             .name("Continually Send Messages")
+             .description("If true, will send inactivity indicator continually 
every Threshold Duration amount of time until activity is restored; if false, 
will send an indicator only when the flow first becomes inactive")
+             .required(true)
+             .allowableValues("true", "false")
+             .defaultValue("false")
+             .build();
+     public static final PropertyDescriptor ACTIVITY_RESTORED_MESSAGE = new 
PropertyDescriptor.Builder()
+             .name("Activity Restored Message")
+             .description("The message that will be the content of FlowFiles 
that are sent to 'activity.restored' relationship")
+             .required(true)
+             .expressionLanguageSupported(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .defaultValue("Activity restored at time: 
${now():format('yyyy/MM/dd HH:mm:ss')} after being inactive for 
${inactivityDurationMillis:toNumber():divide(60000)} minutes")
+             .build();
+     public static final PropertyDescriptor INACTIVITY_MESSAGE = new 
PropertyDescriptor.Builder()
+             .name("Inactivity Message")
+             .description("The message that will be the content of FlowFiles 
that are sent to the 'inactive' relationship")
+             .required(true)
+             .expressionLanguageSupported(true)
+             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+             .defaultValue("Lacking activity as of time: 
${now():format('yyyy/MM/dd HH:mm:ss')}; flow has been inactive for 
${inactivityDurationMillis:toNumber():divide(60000)} minutes")
+             .build();
+     public static final PropertyDescriptor COPY_ATTRIBUTES = new 
PropertyDescriptor.Builder()
+             .name("Copy Attributes")
+             .description("If true, will copy all flow file attributes from 
the flow file that resumed activity to the newly created indicator flow file")
+             .required(false)
+             .allowableValues("true", "false")
+             .defaultValue("false").build();
+ 
+     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("All incoming FlowFiles are 
routed to success").build();
+     public static final Relationship REL_INACTIVE = new 
Relationship.Builder().name("inactive").description("This relationship is used 
to transfer an Inactivity indicator when no FlowFiles are routed to 'success' 
for Threshold Duration amount of time").build();
+     public static final Relationship REL_ACTIVITY_RESTORED = new 
Relationship.Builder().name("activity.restored").description("This relationship 
is used to transfer an Activity Restored indicator when FlowFiles are routing 
to 'success' following a period of inactivity").build();
+     public static final Charset UTF8 = Charset.forName("UTF-8");
+ 
+     private List<PropertyDescriptor> properties;
+     private Set<Relationship> relationships;
+ 
+     private final AtomicLong latestSuccessTransfer = new 
AtomicLong(System.currentTimeMillis());
+     private final AtomicBoolean inactive = new AtomicBoolean(false);
+     private final AtomicLong lastInactiveMessage = new 
AtomicLong(System.currentTimeMillis());
+ 
+     @Override
+     protected void init(final ProcessorInitializationContext context) {
+         final List<PropertyDescriptor> properties = new ArrayList<>();
+         properties.add(THRESHOLD);
+         properties.add(CONTINUALLY_SEND_MESSAGES);
+         properties.add(INACTIVITY_MESSAGE);
+         properties.add(ACTIVITY_RESTORED_MESSAGE);
+         properties.add(COPY_ATTRIBUTES);
+         this.properties = Collections.unmodifiableList(properties);
+ 
+         final Set<Relationship> relationships = new HashSet<>();
+         relationships.add(REL_SUCCESS);
+         relationships.add(REL_INACTIVE);
+         relationships.add(REL_ACTIVITY_RESTORED);
+         this.relationships = Collections.unmodifiableSet(relationships);
+     }
+ 
+     @Override
+     public Set<Relationship> getRelationships() {
+         return relationships;
+     }
+ 
+     @Override
+     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+         return properties;
+     }
+ 
+     @Override
+     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+         final long thresholdMillis = 
context.getProperty(THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
+         final long now = System.currentTimeMillis();
+ 
+         final ProcessorLog logger = getLogger();
+         final List<FlowFile> flowFiles = session.get(50);
+         if (flowFiles.isEmpty()) {
+             final long previousSuccessMillis = latestSuccessTransfer.get();
+             boolean sendInactiveMarker = false;
+ 
+             if (now >= previousSuccessMillis + thresholdMillis) {
+                 final boolean continual = 
context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
+                 sendInactiveMarker = !inactive.getAndSet(true) || (continual 
&& (now > lastInactiveMessage.get() + thresholdMillis));
+             }
+ 
+             if (sendInactiveMarker) {
+                 lastInactiveMessage.set(System.currentTimeMillis());
+ 
+                 FlowFile inactiveFlowFile = session.create();
+                 inactiveFlowFile = session.putAttribute(inactiveFlowFile, 
"inactivityStartMillis", String.valueOf(previousSuccessMillis));
+                 inactiveFlowFile = session.putAttribute(inactiveFlowFile, 
"inactivityDurationMillis", String.valueOf(now - previousSuccessMillis));
+ 
+                 final byte[] outBytes = 
context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(UTF8);
+                 inactiveFlowFile = session.write(inactiveFlowFile, new 
OutputStreamCallback() {
+                     @Override
+                     public void process(final OutputStream out) throws 
IOException {
+                         out.write(outBytes);
+                     }
+                 });
+ 
+                 session.getProvenanceReporter().create(inactiveFlowFile);
+                 session.transfer(inactiveFlowFile, REL_INACTIVE);
+                 logger.info("Transferred {} to 'inactive'", new 
Object[]{inactiveFlowFile});
+             } else {
+                 context.yield();    // no need to dominate CPU checking 
times; let other processors run for a bit.
+             }
+         } else {
+             session.transfer(flowFiles, REL_SUCCESS);
+             logger.info("Transferred {} FlowFiles to 'success'", new 
Object[]{flowFiles.size()});
+ 
+             final long inactivityStartMillis = 
latestSuccessTransfer.getAndSet(now);
+             if (inactive.getAndSet(false)) {
+                 FlowFile activityRestoredFlowFile = session.create();
+ 
+                 final boolean copyAttributes = 
context.getProperty(COPY_ATTRIBUTES).asBoolean();
+                 if (copyAttributes) {
+ 
+                     // copy attributes from the first flow file in the list
+                     Map<String, String> attributes = new 
HashMap<>(flowFiles.get(0).getAttributes());
+                     // don't copy the UUID
+                     attributes.remove(CoreAttributes.UUID.key());
+                     activityRestoredFlowFile = 
session.putAllAttributes(activityRestoredFlowFile, attributes);
+                 }
+ 
+                 activityRestoredFlowFile = 
session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", 
String.valueOf(inactivityStartMillis));
+                 activityRestoredFlowFile = 
session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", 
String.valueOf(now - inactivityStartMillis));
+ 
+                 final byte[] outBytes = 
context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(UTF8);
+                 activityRestoredFlowFile = 
session.write(activityRestoredFlowFile, new OutputStreamCallback() {
+                     @Override
+                     public void process(final OutputStream out) throws 
IOException {
+                         out.write(outBytes);
+                     }
+                 });
+ 
+                 
session.getProvenanceReporter().create(activityRestoredFlowFile);
+                 session.transfer(activityRestoredFlowFile, 
REL_ACTIVITY_RESTORED);
+                 logger.info("Transferred {} to 'activity.restored'", new 
Object[]{activityRestoredFlowFile});
+             }
+         }
+     }
+ }

Reply via email to