Repository: nifi
Updated Branches:
  refs/heads/master 3b74d2dda -> 1fc1d38fd


NIFI-4562: This closes #2331. If an Exception is thrown when merging FlowFiles, 
ensure that we remove the 'bundle' flowfile before exiting the method

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1fc1d38f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1fc1d38f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1fc1d38f

Branch: refs/heads/master
Commit: 1fc1d38fd8bec39d7358c7e04e024d563fc26492
Parents: 3b74d2d
Author: Mark Payne <[email protected]>
Authored: Fri Dec 8 13:09:00 2017 -0500
Committer: joewitt <[email protected]>
Committed: Fri Dec 8 15:35:36 2017 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/MergeContent.java  | 399 ++++++++++---------
 1 file changed, 212 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/1fc1d38f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 79a69a7..e78cd4f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -578,49 +578,54 @@ public class MergeContent extends BinFiles {
             final ProcessSession session = bin.getSession();
             FlowFile bundle = session.create(bin.getContents());
             final AtomicReference<String> bundleMimeTypeRef = new 
AtomicReference<>(null);
-            bundle = session.write(bundle, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException 
{
-                    final byte[] header = getDelimiterContent(context, 
contents, HEADER);
-                    if (header != null) {
-                        out.write(header);
-                    }
+            try {
+                bundle = session.write(bundle, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws 
IOException {
+                        final byte[] header = getDelimiterContent(context, 
contents, HEADER);
+                        if (header != null) {
+                            out.write(header);
+                        }
 
-                    boolean isFirst = true;
-                    final Iterator<FlowFile> itr = contents.iterator();
-                    while (itr.hasNext()) {
-                        final FlowFile flowFile = itr.next();
-                        bin.getSession().read(flowFile, false, new 
InputStreamCallback() {
-                            @Override
-                            public void process(final InputStream in) throws 
IOException {
-                                StreamUtils.copy(in, out);
-                            }
-                        });
+                        boolean isFirst = true;
+                        final Iterator<FlowFile> itr = contents.iterator();
+                        while (itr.hasNext()) {
+                            final FlowFile flowFile = itr.next();
+                            bin.getSession().read(flowFile, false, new 
InputStreamCallback() {
+                                @Override
+                                public void process(final InputStream in) 
throws IOException {
+                                    StreamUtils.copy(in, out);
+                                }
+                            });
 
-                        if (itr.hasNext()) {
-                            final byte[] demarcator = 
getDelimiterContent(context, contents, DEMARCATOR);
-                            if (demarcator != null) {
-                                out.write(demarcator);
+                            if (itr.hasNext()) {
+                                final byte[] demarcator = 
getDelimiterContent(context, contents, DEMARCATOR);
+                                if (demarcator != null) {
+                                    out.write(demarcator);
+                                }
                             }
-                        }
 
-                        final String flowFileMimeType = 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
-                        if (isFirst) {
-                            bundleMimeTypeRef.set(flowFileMimeType);
-                            isFirst = false;
-                        } else {
-                            if (bundleMimeTypeRef.get() != null && 
!bundleMimeTypeRef.get().equals(flowFileMimeType)) {
-                                bundleMimeTypeRef.set(null);
+                            final String flowFileMimeType = 
flowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+                            if (isFirst) {
+                                bundleMimeTypeRef.set(flowFileMimeType);
+                                isFirst = false;
+                            } else {
+                                if (bundleMimeTypeRef.get() != null && 
!bundleMimeTypeRef.get().equals(flowFileMimeType)) {
+                                    bundleMimeTypeRef.set(null);
+                                }
                             }
                         }
-                    }
 
-                    final byte[] footer = getDelimiterContent(context, 
contents, FOOTER);
-                    if (footer != null) {
-                        out.write(footer);
+                        final byte[] footer = getDelimiterContent(context, 
contents, FOOTER);
+                        if (footer != null) {
+                            out.write(footer);
+                        }
                     }
-                }
-            });
+                });
+            } catch (final Exception e) {
+                session.remove(bundle);
+                throw e;
+            }
 
             session.getProvenanceReporter().join(contents, bundle);
             bundle = session.putAttribute(bundle, 
CoreAttributes.FILENAME.key(), createFilename(contents));
@@ -719,48 +724,53 @@ public class MergeContent extends BinFiles {
             final boolean keepPath = 
context.getProperty(KEEP_PATH).asBoolean();
             FlowFile bundle = session.create(); // we don't pass the parents 
to the #create method because the parents belong to different sessions
 
-            bundle = session.putAttribute(bundle, 
CoreAttributes.FILENAME.key(), createFilename(contents) + ".tar");
-            bundle = session.write(bundle, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream rawOut) throws 
IOException {
-                    try (final OutputStream bufferedOut = new 
BufferedOutputStream(rawOut);
+            try {
+                bundle = session.putAttribute(bundle, 
CoreAttributes.FILENAME.key(), createFilename(contents) + ".tar");
+                bundle = session.write(bundle, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream rawOut) throws 
IOException {
+                        try (final OutputStream bufferedOut = new 
BufferedOutputStream(rawOut);
                             final TarArchiveOutputStream out = new 
TarArchiveOutputStream(bufferedOut)) {
-                        
out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
-                        for (final FlowFile flowFile : contents) {
-                            final String path = keepPath ? getPath(flowFile) : 
"";
-                            final String entryName = path + 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
-
-                            final TarArchiveEntry tarEntry = new 
TarArchiveEntry(entryName);
-                            tarEntry.setSize(flowFile.getSize());
-                            final String permissionsVal = 
flowFile.getAttribute(TAR_PERMISSIONS_ATTRIBUTE);
-                            if (permissionsVal != null) {
-                                try {
-                                    
tarEntry.setMode(Integer.parseInt(permissionsVal));
-                                } catch (final Exception e) {
-                                    getLogger().debug("Attribute {} of {} is 
set to {}; expected 3 digits between 0-7, so ignoring",
-                                            new 
Object[]{TAR_PERMISSIONS_ATTRIBUTE, flowFile, permissionsVal});
+                            
out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+                            for (final FlowFile flowFile : contents) {
+                                final String path = keepPath ? 
getPath(flowFile) : "";
+                                final String entryName = path + 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+                                final TarArchiveEntry tarEntry = new 
TarArchiveEntry(entryName);
+                                tarEntry.setSize(flowFile.getSize());
+                                final String permissionsVal = 
flowFile.getAttribute(TAR_PERMISSIONS_ATTRIBUTE);
+                                if (permissionsVal != null) {
+                                    try {
+                                        
tarEntry.setMode(Integer.parseInt(permissionsVal));
+                                    } catch (final Exception e) {
+                                        getLogger().debug("Attribute {} of {} 
is set to {}; expected 3 digits between 0-7, so ignoring",
+                                            new Object[] 
{TAR_PERMISSIONS_ATTRIBUTE, flowFile, permissionsVal});
+                                    }
                                 }
-                            }
 
-                            final String modTime = 
context.getProperty(TAR_MODIFIED_TIME)
+                                final String modTime = 
context.getProperty(TAR_MODIFIED_TIME)
                                     
.evaluateAttributeExpressions(flowFile).getValue();
-                            if (StringUtils.isNotBlank(modTime)) {
-                                try {
-                                    
tarEntry.setModTime(Instant.parse(modTime).toEpochMilli());
-                                } catch (final Exception e) {
-                                    getLogger().debug("Attribute {} of {} is 
set to {}; expected ISO8601 format, so ignoring",
-                                            new Object[]{TAR_MODIFIED_TIME, 
flowFile, modTime});
+                                if (StringUtils.isNotBlank(modTime)) {
+                                    try {
+                                        
tarEntry.setModTime(Instant.parse(modTime).toEpochMilli());
+                                    } catch (final Exception e) {
+                                        getLogger().debug("Attribute {} of {} 
is set to {}; expected ISO8601 format, so ignoring",
+                                            new Object[] {TAR_MODIFIED_TIME, 
flowFile, modTime});
+                                    }
                                 }
-                            }
 
-                            out.putArchiveEntry(tarEntry);
+                                out.putArchiveEntry(tarEntry);
 
-                            bin.getSession().exportTo(flowFile, out);
-                            out.closeArchiveEntry();
+                                bin.getSession().exportTo(flowFile, out);
+                                out.closeArchiveEntry();
+                            }
                         }
                     }
-                }
-            });
+                });
+            } catch (final Exception e) {
+                session.remove(bundle);
+                throw e;
+            }
 
             bin.getSession().getProvenanceReporter().join(contents, bundle);
             return bundle;
@@ -794,35 +804,40 @@ public class MergeContent extends BinFiles {
 
             FlowFile bundle = session.create(contents);
 
-            bundle = session.write(bundle, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream rawOut) throws 
IOException {
-                    try (final OutputStream bufferedOut = new 
BufferedOutputStream(rawOut)) {
-                        // we don't want the packager closing the stream. V1 
creates a TAR Output Stream, which then gets
-                        // closed, which in turn closes the underlying 
OutputStream, and we want to protect ourselves against that.
-                        final OutputStream out = new 
NonCloseableOutputStream(bufferedOut);
-
-                        for (final FlowFile flowFile : contents) {
-                            bin.getSession().read(flowFile, false, new 
InputStreamCallback() {
-                                @Override
-                                public void process(final InputStream rawIn) 
throws IOException {
-                                    try (final InputStream in = new 
BufferedInputStream(rawIn)) {
-                                        final Map<String, String> attributes = 
new HashMap<>(flowFile.getAttributes());
-
-                                        // for backward compatibility 
purposes, we add the "legacy" NiFi attributes
-                                        attributes.put("nf.file.name", 
attributes.get(CoreAttributes.FILENAME.key()));
-                                        attributes.put("nf.file.path", 
attributes.get(CoreAttributes.PATH.key()));
-                                        if 
(attributes.containsKey(CoreAttributes.MIME_TYPE.key())) {
-                                            attributes.put("content-type", 
attributes.get(CoreAttributes.MIME_TYPE.key()));
+            try {
+                bundle = session.write(bundle, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream rawOut) throws 
IOException {
+                        try (final OutputStream bufferedOut = new 
BufferedOutputStream(rawOut)) {
+                            // we don't want the packager closing the stream. 
V1 creates a TAR Output Stream, which then gets
+                            // closed, which in turn closes the underlying 
OutputStream, and we want to protect ourselves against that.
+                            final OutputStream out = new 
NonCloseableOutputStream(bufferedOut);
+
+                            for (final FlowFile flowFile : contents) {
+                                bin.getSession().read(flowFile, false, new 
InputStreamCallback() {
+                                    @Override
+                                    public void process(final InputStream 
rawIn) throws IOException {
+                                        try (final InputStream in = new 
BufferedInputStream(rawIn)) {
+                                            final Map<String, String> 
attributes = new HashMap<>(flowFile.getAttributes());
+
+                                            // for backward compatibility 
purposes, we add the "legacy" NiFi attributes
+                                            attributes.put("nf.file.name", 
attributes.get(CoreAttributes.FILENAME.key()));
+                                            attributes.put("nf.file.path", 
attributes.get(CoreAttributes.PATH.key()));
+                                            if 
(attributes.containsKey(CoreAttributes.MIME_TYPE.key())) {
+                                                attributes.put("content-type", 
attributes.get(CoreAttributes.MIME_TYPE.key()));
+                                            }
+                                            packager.packageFlowFile(in, out, 
attributes, flowFile.getSize());
                                         }
-                                        packager.packageFlowFile(in, out, 
attributes, flowFile.getSize());
                                     }
-                                }
-                            });
+                                });
+                            }
                         }
                     }
-                }
-            });
+                });
+            } catch (final Exception e) {
+                session.remove(bundle);
+                throw e;
+            }
 
             bundle = session.putAttribute(bundle, 
CoreAttributes.FILENAME.key(), createFilename(contents) + ".pkg");
             session.getProvenanceReporter().join(contents, bundle);
@@ -860,34 +875,39 @@ public class MergeContent extends BinFiles {
 
             FlowFile bundle = session.create(contents);
 
-            bundle = session.putAttribute(bundle, 
CoreAttributes.FILENAME.key(), createFilename(contents) + ".zip");
-            bundle = session.write(bundle, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream rawOut) throws 
IOException {
-                    try (final OutputStream bufferedOut = new 
BufferedOutputStream(rawOut);
+            try {
+                bundle = session.putAttribute(bundle, 
CoreAttributes.FILENAME.key(), createFilename(contents) + ".zip");
+                bundle = session.write(bundle, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream rawOut) throws 
IOException {
+                        try (final OutputStream bufferedOut = new 
BufferedOutputStream(rawOut);
                             final ZipOutputStream out = new 
ZipOutputStream(bufferedOut)) {
-                        out.setLevel(compressionLevel);
-                        for (final FlowFile flowFile : contents) {
-                            final String path = keepPath ? getPath(flowFile) : 
"";
-                            final String entryName = path + 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
-                            final ZipEntry zipEntry = new ZipEntry(entryName);
-                            zipEntry.setSize(flowFile.getSize());
-                            try {
-                                out.putNextEntry(zipEntry);
+                            out.setLevel(compressionLevel);
+                            for (final FlowFile flowFile : contents) {
+                                final String path = keepPath ? 
getPath(flowFile) : "";
+                                final String entryName = path + 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+                                final ZipEntry zipEntry = new 
ZipEntry(entryName);
+                                zipEntry.setSize(flowFile.getSize());
+                                try {
+                                    out.putNextEntry(zipEntry);
 
-                                bin.getSession().exportTo(flowFile, out);
-                                out.closeEntry();
-                                unmerged.remove(flowFile);
-                            } catch (ZipException e) {
-                                getLogger().error("Encountered exception 
merging {}", new Object[]{flowFile}, e);
+                                    bin.getSession().exportTo(flowFile, out);
+                                    out.closeEntry();
+                                    unmerged.remove(flowFile);
+                                } catch (ZipException e) {
+                                    getLogger().error("Encountered exception 
merging {}", new Object[] {flowFile}, e);
+                                }
                             }
-                        }
 
-                        out.finish();
-                        out.flush();
+                            out.finish();
+                            out.flush();
+                        }
                     }
-                }
-            });
+                });
+            } catch (final Exception e) {
+                session.remove(bundle);
+                throw e;
+            }
 
             session.getProvenanceReporter().join(contents, bundle);
             return bundle;
@@ -921,92 +941,97 @@ public class MergeContent extends BinFiles {
 
             // we don't pass the parents to the #create method because the 
parents belong to different sessions
             FlowFile bundle = session.create(contents);
-            bundle = session.write(bundle, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream rawOut) throws 
IOException {
-                    try (final OutputStream out = new 
BufferedOutputStream(rawOut)) {
-                        for (final FlowFile flowFile : contents) {
-                            bin.getSession().read(flowFile, false, new 
InputStreamCallback() {
-                                @Override
-                                public void process(InputStream in) throws 
IOException {
-                                    boolean canMerge = true;
-                                    try (DataFileStream<GenericRecord> reader 
= new DataFileStream<>(in,
+            try {
+                bundle = session.write(bundle, new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream rawOut) throws 
IOException {
+                        try (final OutputStream out = new 
BufferedOutputStream(rawOut)) {
+                            for (final FlowFile flowFile : contents) {
+                                bin.getSession().read(flowFile, false, new 
InputStreamCallback() {
+                                    @Override
+                                    public void process(InputStream in) throws 
IOException {
+                                        boolean canMerge = true;
+                                        try (DataFileStream<GenericRecord> 
reader = new DataFileStream<>(in,
                                             new 
GenericDatumReader<GenericRecord>())) {
-                                        if (schema.get() == null) {
-                                            // this is the first file - set up 
the writer, and store the
-                                            // Schema & metadata we'll use.
-                                            schema.set(reader.getSchema());
-                                            if 
(!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy)) {
-                                                for (String key : 
reader.getMetaKeys()) {
-                                                    if 
(!DataFileWriter.isReservedMeta(key)) {
-                                                        byte[] metadatum = 
reader.getMeta(key);
-                                                        metadata.put(key, 
metadatum);
-                                                        writer.setMeta(key, 
metadatum);
+                                            if (schema.get() == null) {
+                                                // this is the first file - 
set up the writer, and store the
+                                                // Schema & metadata we'll use.
+                                                schema.set(reader.getSchema());
+                                                if 
(!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy)) {
+                                                    for (String key : 
reader.getMetaKeys()) {
+                                                        if 
(!DataFileWriter.isReservedMeta(key)) {
+                                                            byte[] metadatum = 
reader.getMeta(key);
+                                                            metadata.put(key, 
metadatum);
+                                                            
writer.setMeta(key, metadatum);
+                                                        }
                                                     }
                                                 }
-                                            }
-                                            
inputCodec.set(reader.getMetaString(DataFileConstants.CODEC));
-                                            if (inputCodec.get() == null) {
-                                                
inputCodec.set(DataFileConstants.NULL_CODEC);
-                                            }
-                                            
writer.setCodec(CodecFactory.fromString(inputCodec.get()));
-                                            writer.create(schema.get(), out);
-                                        } else {
-                                            // check that we're appending to 
the same schema
-                                            if 
(!schema.get().equals(reader.getSchema())) {
-                                                getLogger().debug("Input file 
{} has different schema - {}, not merging",
-                                                        new 
Object[]{flowFile.getId(), reader.getSchema().getName()});
-                                                canMerge = false;
-                                                unmerged.add(flowFile);
-                                            }
+                                                
inputCodec.set(reader.getMetaString(DataFileConstants.CODEC));
+                                                if (inputCodec.get() == null) {
+                                                    
inputCodec.set(DataFileConstants.NULL_CODEC);
+                                                }
+                                                
writer.setCodec(CodecFactory.fromString(inputCodec.get()));
+                                                writer.create(schema.get(), 
out);
+                                            } else {
+                                                // check that we're appending 
to the same schema
+                                                if 
(!schema.get().equals(reader.getSchema())) {
+                                                    getLogger().debug("Input 
file {} has different schema - {}, not merging",
+                                                        new Object[] 
{flowFile.getId(), reader.getSchema().getName()});
+                                                    canMerge = false;
+                                                    unmerged.add(flowFile);
+                                                }
 
-                                            if 
(METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy)
+                                                if 
(METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy)
                                                     || 
METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy)) {
-                                                // check that we're appending 
to the same metadata
-                                                for (String key : 
reader.getMetaKeys()) {
-                                                    if 
(!DataFileWriter.isReservedMeta(key)) {
-                                                        byte[] metadatum = 
reader.getMeta(key);
-                                                        byte[] 
writersMetadatum = metadata.get(key);
-                                                        if 
(!Arrays.equals(metadatum, writersMetadatum)) {
-                                                            // Ignore 
additional metadata if ALL_COMMON is the strategy, otherwise don't merge
-                                                            if 
(!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy) || 
writersMetadatum != null) {
-                                                                
getLogger().debug("Input file {} has different non-reserved metadata, not 
merging",
-                                                                        new 
Object[]{flowFile.getId()});
-                                                                canMerge = 
false;
-                                                                
unmerged.add(flowFile);
+                                                    // check that we're 
appending to the same metadata
+                                                    for (String key : 
reader.getMetaKeys()) {
+                                                        if 
(!DataFileWriter.isReservedMeta(key)) {
+                                                            byte[] metadatum = 
reader.getMeta(key);
+                                                            byte[] 
writersMetadatum = metadata.get(key);
+                                                            if 
(!Arrays.equals(metadatum, writersMetadatum)) {
+                                                                // Ignore 
additional metadata if ALL_COMMON is the strategy, otherwise don't merge
+                                                                if 
(!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy) || 
writersMetadatum != null) {
+                                                                    
getLogger().debug("Input file {} has different non-reserved metadata, not 
merging",
+                                                                        new 
Object[] {flowFile.getId()});
+                                                                    canMerge = 
false;
+                                                                    
unmerged.add(flowFile);
+                                                                }
                                                             }
                                                         }
                                                     }
-                                                }
-                                            } // else the metadata in the 
first FlowFile was either ignored or retained in the if-clause above
+                                                } // else the metadata in the 
first FlowFile was either ignored or retained in the if-clause above
 
-                                            // check that we're appending to 
the same codec
-                                            String thisCodec = 
reader.getMetaString(DataFileConstants.CODEC);
-                                            if (thisCodec == null) {
-                                                thisCodec = 
DataFileConstants.NULL_CODEC;
-                                            }
-                                            if 
(!inputCodec.get().equals(thisCodec)) {
-                                                getLogger().debug("Input file 
{} has different codec, not merging",
-                                                        new 
Object[]{flowFile.getId()});
-                                                canMerge = false;
-                                                unmerged.add(flowFile);
+                                                // check that we're appending 
to the same codec
+                                                String thisCodec = 
reader.getMetaString(DataFileConstants.CODEC);
+                                                if (thisCodec == null) {
+                                                    thisCodec = 
DataFileConstants.NULL_CODEC;
+                                                }
+                                                if 
(!inputCodec.get().equals(thisCodec)) {
+                                                    getLogger().debug("Input 
file {} has different codec, not merging",
+                                                        new Object[] 
{flowFile.getId()});
+                                                    canMerge = false;
+                                                    unmerged.add(flowFile);
+                                                }
                                             }
-                                        }
 
-                                        // write the Avro content from the 
current FlowFile to the merged OutputStream
-                                        if (canMerge) {
-                                            writer.appendAllFrom(reader, 
false);
+                                            // write the Avro content from the 
current FlowFile to the merged OutputStream
+                                            if (canMerge) {
+                                                writer.appendAllFrom(reader, 
false);
+                                            }
                                         }
                                     }
-                                }
-                            });
+                                });
+                            }
+                            writer.flush();
+                        } finally {
+                            writer.close();
                         }
-                        writer.flush();
-                    } finally {
-                        writer.close();
                     }
-                }
-            });
+                });
+            } catch (final Exception e) {
+                session.remove(bundle);
+                throw e;
+            }
 
             final Collection<FlowFile> parents;
             if (unmerged.isEmpty()) {

Reply via email to