Repository: nifi Updated Branches: refs/heads/master 3b74d2dda -> 1fc1d38fd
NIFI-4562: This closes #2331. If an Exception is thrown when merging FlowFiles, ensure that we remove the 'bundle' flowfile before exiting the method Signed-off-by: joewitt <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1fc1d38f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1fc1d38f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1fc1d38f Branch: refs/heads/master Commit: 1fc1d38fd8bec39d7358c7e04e024d563fc26492 Parents: 3b74d2d Author: Mark Payne <[email protected]> Authored: Fri Dec 8 13:09:00 2017 -0500 Committer: joewitt <[email protected]> Committed: Fri Dec 8 15:35:36 2017 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/MergeContent.java | 399 ++++++++++--------- 1 file changed, 212 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/1fc1d38f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index 79a69a7..e78cd4f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -578,49 +578,54 @@ public class MergeContent extends BinFiles { final ProcessSession session = bin.getSession(); FlowFile bundle = session.create(bin.getContents()); final AtomicReference<String> bundleMimeTypeRef = new AtomicReference<>(null); - bundle = session.write(bundle, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - final byte[] header = getDelimiterContent(context, contents, HEADER); - if (header != null) { - out.write(header); - } + try { + bundle = session.write(bundle, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + final byte[] header = getDelimiterContent(context, contents, HEADER); + if (header != null) { + out.write(header); + } - boolean isFirst = true; - final Iterator<FlowFile> itr = contents.iterator(); - while (itr.hasNext()) { - final FlowFile flowFile = itr.next(); - bin.getSession().read(flowFile, false, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.copy(in, out); - } - }); + boolean isFirst = true; + final Iterator<FlowFile> itr = contents.iterator(); + while (itr.hasNext()) { + final FlowFile flowFile = itr.next(); + bin.getSession().read(flowFile, false, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.copy(in, out); + } + }); - if (itr.hasNext()) { - final byte[] demarcator = getDelimiterContent(context, contents, DEMARCATOR); - if (demarcator != null) { - out.write(demarcator); + if (itr.hasNext()) { + final byte[] demarcator = getDelimiterContent(context, contents, DEMARCATOR); + if (demarcator != null) { + out.write(demarcator); + } } - } - final String flowFileMimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); - if (isFirst) { - bundleMimeTypeRef.set(flowFileMimeType); - isFirst = false; - } else { - if (bundleMimeTypeRef.get() != null && !bundleMimeTypeRef.get().equals(flowFileMimeType)) { - bundleMimeTypeRef.set(null); + final String flowFileMimeType = flowFile.getAttribute(CoreAttributes.MIME_TYPE.key()); + if (isFirst) { + bundleMimeTypeRef.set(flowFileMimeType); + isFirst = false; + } else { + if (bundleMimeTypeRef.get() != null && !bundleMimeTypeRef.get().equals(flowFileMimeType)) { + bundleMimeTypeRef.set(null); + } } } - } - final byte[] footer = getDelimiterContent(context, contents, FOOTER); - if (footer != null) { - out.write(footer); + final byte[] footer = getDelimiterContent(context, contents, FOOTER); + if (footer != null) { + out.write(footer); + } } - } - }); + }); + } catch (final Exception e) { + session.remove(bundle); + throw e; + } session.getProvenanceReporter().join(contents, bundle); bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents)); @@ -719,48 +724,53 @@ public class MergeContent extends BinFiles { final boolean keepPath = context.getProperty(KEEP_PATH).asBoolean(); FlowFile bundle = session.create(); // we don't pass the parents to the #create method because the parents belong to different sessions - bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".tar"); - bundle = session.write(bundle, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut); + try { + bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".tar"); + bundle = session.write(bundle, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut); final TarArchiveOutputStream out = new TarArchiveOutputStream(bufferedOut)) { - out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU); - for (final FlowFile flowFile : contents) { - final String path = keepPath ? getPath(flowFile) : ""; - final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); - - final TarArchiveEntry tarEntry = new TarArchiveEntry(entryName); - tarEntry.setSize(flowFile.getSize()); - final String permissionsVal = flowFile.getAttribute(TAR_PERMISSIONS_ATTRIBUTE); - if (permissionsVal != null) { - try { - tarEntry.setMode(Integer.parseInt(permissionsVal)); - } catch (final Exception e) { - getLogger().debug("Attribute {} of {} is set to {}; expected 3 digits between 0-7, so ignoring", - new Object[]{TAR_PERMISSIONS_ATTRIBUTE, flowFile, permissionsVal}); + out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU); + for (final FlowFile flowFile : contents) { + final String path = keepPath ? getPath(flowFile) : ""; + final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); + + final TarArchiveEntry tarEntry = new TarArchiveEntry(entryName); + tarEntry.setSize(flowFile.getSize()); + final String permissionsVal = flowFile.getAttribute(TAR_PERMISSIONS_ATTRIBUTE); + if (permissionsVal != null) { + try { + tarEntry.setMode(Integer.parseInt(permissionsVal)); + } catch (final Exception e) { + getLogger().debug("Attribute {} of {} is set to {}; expected 3 digits between 0-7, so ignoring", + new Object[] {TAR_PERMISSIONS_ATTRIBUTE, flowFile, permissionsVal}); + } } - } - final String modTime = context.getProperty(TAR_MODIFIED_TIME) + final String modTime = context.getProperty(TAR_MODIFIED_TIME) .evaluateAttributeExpressions(flowFile).getValue(); - if (StringUtils.isNotBlank(modTime)) { - try { - tarEntry.setModTime(Instant.parse(modTime).toEpochMilli()); - } catch (final Exception e) { - getLogger().debug("Attribute {} of {} is set to {}; expected ISO8601 format, so ignoring", - new Object[]{TAR_MODIFIED_TIME, flowFile, modTime}); + if (StringUtils.isNotBlank(modTime)) { + try { + tarEntry.setModTime(Instant.parse(modTime).toEpochMilli()); + } catch (final Exception e) { + getLogger().debug("Attribute {} of {} is set to {}; expected ISO8601 format, so ignoring", + new Object[] {TAR_MODIFIED_TIME, flowFile, modTime}); + } } - } - out.putArchiveEntry(tarEntry); + out.putArchiveEntry(tarEntry); - bin.getSession().exportTo(flowFile, out); - out.closeArchiveEntry(); + bin.getSession().exportTo(flowFile, out); + out.closeArchiveEntry(); + } } } - } - }); + }); + } catch (final Exception e) { + session.remove(bundle); + throw e; + } bin.getSession().getProvenanceReporter().join(contents, bundle); return bundle; @@ -794,35 +804,40 @@ public class MergeContent extends BinFiles { FlowFile bundle = session.create(contents); - bundle = session.write(bundle, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut)) { - // we don't want the packager closing the stream. V1 creates a TAR Output Stream, which then gets - // closed, which in turn closes the underlying OutputStream, and we want to protect ourselves against that. - final OutputStream out = new NonCloseableOutputStream(bufferedOut); - - for (final FlowFile flowFile : contents) { - bin.getSession().read(flowFile, false, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn)) { - final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes()); - - // for backward compatibility purposes, we add the "legacy" NiFi attributes - attributes.put("nf.file.name", attributes.get(CoreAttributes.FILENAME.key())); - attributes.put("nf.file.path", attributes.get(CoreAttributes.PATH.key())); - if (attributes.containsKey(CoreAttributes.MIME_TYPE.key())) { - attributes.put("content-type", attributes.get(CoreAttributes.MIME_TYPE.key())); + try { + bundle = session.write(bundle, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut)) { + // we don't want the packager closing the stream. V1 creates a TAR Output Stream, which then gets + // closed, which in turn closes the underlying OutputStream, and we want to protect ourselves against that. + final OutputStream out = new NonCloseableOutputStream(bufferedOut); + + for (final FlowFile flowFile : contents) { + bin.getSession().read(flowFile, false, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes()); + + // for backward compatibility purposes, we add the "legacy" NiFi attributes + attributes.put("nf.file.name", attributes.get(CoreAttributes.FILENAME.key())); + attributes.put("nf.file.path", attributes.get(CoreAttributes.PATH.key())); + if (attributes.containsKey(CoreAttributes.MIME_TYPE.key())) { + attributes.put("content-type", attributes.get(CoreAttributes.MIME_TYPE.key())); + } + packager.packageFlowFile(in, out, attributes, flowFile.getSize()); } - packager.packageFlowFile(in, out, attributes, flowFile.getSize()); } - } - }); + }); + } } } - } - }); + }); + } catch (final Exception e) { + session.remove(bundle); + throw e; + } bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".pkg"); session.getProvenanceReporter().join(contents, bundle); @@ -860,34 +875,39 @@ public class MergeContent extends BinFiles { FlowFile bundle = session.create(contents); - bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".zip"); - bundle = session.write(bundle, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut); + try { + bundle = session.putAttribute(bundle, CoreAttributes.FILENAME.key(), createFilename(contents) + ".zip"); + bundle = session.write(bundle, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream bufferedOut = new BufferedOutputStream(rawOut); final ZipOutputStream out = new ZipOutputStream(bufferedOut)) { - out.setLevel(compressionLevel); - for (final FlowFile flowFile : contents) { - final String path = keepPath ? getPath(flowFile) : ""; - final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); - final ZipEntry zipEntry = new ZipEntry(entryName); - zipEntry.setSize(flowFile.getSize()); - try { - out.putNextEntry(zipEntry); + out.setLevel(compressionLevel); + for (final FlowFile flowFile : contents) { + final String path = keepPath ? getPath(flowFile) : ""; + final String entryName = path + flowFile.getAttribute(CoreAttributes.FILENAME.key()); + final ZipEntry zipEntry = new ZipEntry(entryName); + zipEntry.setSize(flowFile.getSize()); + try { + out.putNextEntry(zipEntry); - bin.getSession().exportTo(flowFile, out); - out.closeEntry(); - unmerged.remove(flowFile); - } catch (ZipException e) { - getLogger().error("Encountered exception merging {}", new Object[]{flowFile}, e); + bin.getSession().exportTo(flowFile, out); + out.closeEntry(); + unmerged.remove(flowFile); + } catch (ZipException e) { + getLogger().error("Encountered exception merging {}", new Object[] {flowFile}, e); + } } - } - out.finish(); - out.flush(); + out.finish(); + out.flush(); + } } - } - }); + }); + } catch (final Exception e) { + session.remove(bundle); + throw e; + } session.getProvenanceReporter().join(contents, bundle); return bundle; @@ -921,92 +941,97 @@ public class MergeContent extends BinFiles { // we don't pass the parents to the #create method because the parents belong to different sessions FlowFile bundle = session.create(contents); - bundle = session.write(bundle, new OutputStreamCallback() { - @Override - public void process(final OutputStream rawOut) throws IOException { - try (final OutputStream out = new BufferedOutputStream(rawOut)) { - for (final FlowFile flowFile : contents) { - bin.getSession().read(flowFile, false, new InputStreamCallback() { - @Override - public void process(InputStream in) throws IOException { - boolean canMerge = true; - try (DataFileStream<GenericRecord> reader = new DataFileStream<>(in, + try { + bundle = session.write(bundle, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut)) { + for (final FlowFile flowFile : contents) { + bin.getSession().read(flowFile, false, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + boolean canMerge = true; + try (DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) { - if (schema.get() == null) { - // this is the first file - set up the writer, and store the - // Schema & metadata we'll use. - schema.set(reader.getSchema()); - if (!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy)) { - for (String key : reader.getMetaKeys()) { - if (!DataFileWriter.isReservedMeta(key)) { - byte[] metadatum = reader.getMeta(key); - metadata.put(key, metadatum); - writer.setMeta(key, metadatum); + if (schema.get() == null) { + // this is the first file - set up the writer, and store the + // Schema & metadata we'll use. + schema.set(reader.getSchema()); + if (!METADATA_STRATEGY_IGNORE.getValue().equals(metadataStrategy)) { + for (String key : reader.getMetaKeys()) { + if (!DataFileWriter.isReservedMeta(key)) { + byte[] metadatum = reader.getMeta(key); + metadata.put(key, metadatum); + writer.setMeta(key, metadatum); + } } } - } - inputCodec.set(reader.getMetaString(DataFileConstants.CODEC)); - if (inputCodec.get() == null) { - inputCodec.set(DataFileConstants.NULL_CODEC); - } - writer.setCodec(CodecFactory.fromString(inputCodec.get())); - writer.create(schema.get(), out); - } else { - // check that we're appending to the same schema - if (!schema.get().equals(reader.getSchema())) { - getLogger().debug("Input file {} has different schema - {}, not merging", - new Object[]{flowFile.getId(), reader.getSchema().getName()}); - canMerge = false; - unmerged.add(flowFile); - } + inputCodec.set(reader.getMetaString(DataFileConstants.CODEC)); + if (inputCodec.get() == null) { + inputCodec.set(DataFileConstants.NULL_CODEC); + } + writer.setCodec(CodecFactory.fromString(inputCodec.get())); + writer.create(schema.get(), out); + } else { + // check that we're appending to the same schema + if (!schema.get().equals(reader.getSchema())) { + getLogger().debug("Input file {} has different schema - {}, not merging", + new Object[] {flowFile.getId(), reader.getSchema().getName()}); + canMerge = false; + unmerged.add(flowFile); + } - if (METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy) + if (METADATA_STRATEGY_DO_NOT_MERGE.getValue().equals(metadataStrategy) || METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy)) { - // check that we're appending to the same metadata - for (String key : reader.getMetaKeys()) { - if (!DataFileWriter.isReservedMeta(key)) { - byte[] metadatum = reader.getMeta(key); - byte[] writersMetadatum = metadata.get(key); - if (!Arrays.equals(metadatum, writersMetadatum)) { - // Ignore additional metadata if ALL_COMMON is the strategy, otherwise don't merge - if (!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy) || writersMetadatum != null) { - getLogger().debug("Input file {} has different non-reserved metadata, not merging", - new Object[]{flowFile.getId()}); - canMerge = false; - unmerged.add(flowFile); + // check that we're appending to the same metadata + for (String key : reader.getMetaKeys()) { + if (!DataFileWriter.isReservedMeta(key)) { + byte[] metadatum = reader.getMeta(key); + byte[] writersMetadatum = metadata.get(key); + if (!Arrays.equals(metadatum, writersMetadatum)) { + // Ignore additional metadata if ALL_COMMON is the strategy, otherwise don't merge + if (!METADATA_STRATEGY_ALL_COMMON.getValue().equals(metadataStrategy) || writersMetadatum != null) { + getLogger().debug("Input file {} has different non-reserved metadata, not merging", + new Object[] {flowFile.getId()}); + canMerge = false; + unmerged.add(flowFile); + } } } } - } - } // else the metadata in the first FlowFile was either ignored or retained in the if-clause above + } // else the metadata in the first FlowFile was either ignored or retained in the if-clause above - // check that we're appending to the same codec - String thisCodec = reader.getMetaString(DataFileConstants.CODEC); - if (thisCodec == null) { - thisCodec = DataFileConstants.NULL_CODEC; - } - if (!inputCodec.get().equals(thisCodec)) { - getLogger().debug("Input file {} has different codec, not merging", - new Object[]{flowFile.getId()}); - canMerge = false; - unmerged.add(flowFile); + // check that we're appending to the same codec + String thisCodec = reader.getMetaString(DataFileConstants.CODEC); + if (thisCodec == null) { + thisCodec = DataFileConstants.NULL_CODEC; + } + if (!inputCodec.get().equals(thisCodec)) { + getLogger().debug("Input file {} has different codec, not merging", + new Object[] {flowFile.getId()}); + canMerge = false; + unmerged.add(flowFile); + } } - } - // write the Avro content from the current FlowFile to the merged OutputStream - if (canMerge) { - writer.appendAllFrom(reader, false); + // write the Avro content from the current FlowFile to the merged OutputStream + if (canMerge) { + writer.appendAllFrom(reader, false); + } } } - } - }); + }); + } + writer.flush(); + } finally { + writer.close(); } - writer.flush(); - } finally { - writer.close(); } - } - }); + }); + } catch (final Exception e) { + session.remove(bundle); + throw e; + } final Collection<FlowFile> parents; if (unmerged.isEmpty()) {
