Repository: nifi Updated Branches: refs/heads/master ec868362f -> ded18b94d
NIFI-3255 removed dependency on session.merge from SplitText NIFI-3255 addressed PR comments NIFI-3255 fixed linkage for Split creation This closes #1394 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ded18b94 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ded18b94 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ded18b94 Branch: refs/heads/master Commit: ded18b94dbcaa91dcad2d6ff995e0a9c1282323b Parents: ec86836 Author: Oleg Zhurakousky <[email protected]> Authored: Wed Jan 4 15:37:30 2017 -0500 Committer: Oleg Zhurakousky <[email protected]> Committed: Fri Feb 17 12:31:21 2017 -0500 ---------------------------------------------------------------------- .../nifi/processors/standard/SplitText.java | 38 ++++++++++++++++---- 1 file changed, 32 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/ded18b94/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java index e57841f..4e62993 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -32,6 +33,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -56,6 +58,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.util.TextLineDemarcator; import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo; @@ -282,8 +285,7 @@ public class SplitText extends AbstractProcessor { * it signifies the header information and its contents will be included in * each and every computed split. */ - private List<FlowFile> generateSplitFlowFiles(final String fragmentId, FlowFile sourceFlowFile, SplitInfo splitInfo, - List<SplitInfo> computedSplitsInfo, ProcessSession processSession){ + private List<FlowFile> generateSplitFlowFiles(String fragmentId, FlowFile sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, ProcessSession processSession){ List<FlowFile> splitFlowFiles = new ArrayList<>(); FlowFile headerFlowFile = null; long headerCrlfLength = 0; @@ -295,19 +297,19 @@ public class SplitText extends AbstractProcessor { if ((computedSplitsInfo.size() == 0) && (headerFlowFile != null)) { FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength); - splitFlowFile = SplitText.this.updateAttributes(processSession, splitFlowFile, 0, splitFlowFile.getSize(), + splitFlowFile = this.updateAttributes(processSession, splitFlowFile, 0, splitFlowFile.getSize(), fragmentId, fragmentIndex++, 0, sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); splitFlowFiles.add(splitFlowFile); } else { for (SplitInfo computedSplitInfo : computedSplitsInfo) { - long length = SplitText.this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length; + long length = this.removeTrailingNewLines ? computedSplitInfo.trimmedLength : computedSplitInfo.length; boolean proceedWithClone = headerFlowFile != null || length > 0; if (proceedWithClone) { FlowFile splitFlowFile = null; if (headerFlowFile != null) { if (length > 0) { splitFlowFile = processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length); - splitFlowFile = processSession.merge( Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile); + splitFlowFile = this.concatenateContents(sourceFlowFile, processSession, headerFlowFile, splitFlowFile); } else { splitFlowFile = processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - headerCrlfLength); // trim the last CRLF if split consists of only HEADER } @@ -315,7 +317,7 @@ public class SplitText extends AbstractProcessor { splitFlowFile = processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length); } - splitFlowFile = SplitText.this.updateAttributes(processSession, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++, + splitFlowFile = this.updateAttributes(processSession, splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, fragmentIndex++, computedSplitsInfo.size(), sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key())); splitFlowFiles.add(splitFlowFile); } @@ -329,6 +331,30 @@ public class SplitText extends AbstractProcessor { return splitFlowFiles; } + /** + * Will concatenate the contents of the provided array of {@link FlowFile}s + * into a single {@link FlowFile}. While this operation is as general as it + * is described in the previous sentence, in the context of this processor + * there can only be two {@link FlowFile}s with the first {@link FlowFile} + * representing the header content of the split and the second + * {@link FlowFile} represents the split itself. + */ + private FlowFile concatenateContents(FlowFile sourceFlowFile, ProcessSession session, FlowFile... flowFiles) { + FlowFile mergedFlowFile = session.create(sourceFlowFile); + for (FlowFile flowFile : flowFiles) { + mergedFlowFile = session.append(mergedFlowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + try (InputStream is = session.read(flowFile)) { + IOUtils.copy(is, out); + } + } + }); + } + session.remove(flowFiles[1]); // in current usage we always have 2 files + return mergedFlowFile; + } + private FlowFile updateAttributes(ProcessSession processSession, FlowFile splitFlowFile, long splitLineCount, long splitFlowFileSize, String splitId, int splitIndex, int splitCount, String origFileName) { Map<String, String> attributes = new HashMap<>();
