Repository: nifi
Updated Branches:
  refs/heads/master ec868362f -> ded18b94d


NIFI-3255 removed dependency on session.merge from SplitText

NIFI-3255 addressed PR comments

NIFI-3255 fixed linkage for Split creation

This closes #1394


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ded18b94
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ded18b94
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ded18b94

Branch: refs/heads/master
Commit: ded18b94dbcaa91dcad2d6ff995e0a9c1282323b
Parents: ec86836
Author: Oleg Zhurakousky <[email protected]>
Authored: Wed Jan 4 15:37:30 2017 -0500
Committer: Oleg Zhurakousky <[email protected]>
Committed: Fri Feb 17 12:31:21 2017 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/SplitText.java     | 38 ++++++++++++++++----
 1 file changed, 32 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ded18b94/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
index e57841f..4e62993 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitText.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,6 +33,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -56,6 +58,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.util.TextLineDemarcator;
 import org.apache.nifi.stream.io.util.TextLineDemarcator.OffsetInfo;
@@ -282,8 +285,7 @@ public class SplitText extends AbstractProcessor {
      * it signifies the header information and its contents will be included in
      * each and every computed split.
      */
-    private List<FlowFile> generateSplitFlowFiles(final String fragmentId, 
FlowFile sourceFlowFile, SplitInfo splitInfo,
-                                                  List<SplitInfo> 
computedSplitsInfo, ProcessSession processSession){
+    private List<FlowFile> generateSplitFlowFiles(String fragmentId, FlowFile 
sourceFlowFile, SplitInfo splitInfo, List<SplitInfo> computedSplitsInfo, 
ProcessSession processSession){
         List<FlowFile> splitFlowFiles = new ArrayList<>();
         FlowFile headerFlowFile = null;
         long headerCrlfLength = 0;
@@ -295,19 +297,19 @@ public class SplitText extends AbstractProcessor {
 
         if ((computedSplitsInfo.size() == 0) && (headerFlowFile != null)) {
             FlowFile splitFlowFile = processSession.clone(sourceFlowFile, 0, 
headerFlowFile.getSize() - headerCrlfLength);
-            splitFlowFile = SplitText.this.updateAttributes(processSession, 
splitFlowFile, 0, splitFlowFile.getSize(),
+            splitFlowFile = this.updateAttributes(processSession, 
splitFlowFile, 0, splitFlowFile.getSize(),
                     fragmentId, fragmentIndex++, 0, 
sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
             splitFlowFiles.add(splitFlowFile);
         } else {
             for (SplitInfo computedSplitInfo : computedSplitsInfo) {
-                long length = SplitText.this.removeTrailingNewLines ? 
computedSplitInfo.trimmedLength : computedSplitInfo.length;
+                long length = this.removeTrailingNewLines ? 
computedSplitInfo.trimmedLength : computedSplitInfo.length;
                 boolean proceedWithClone = headerFlowFile != null || length > 
0;
                 if (proceedWithClone) {
                     FlowFile splitFlowFile = null;
                     if (headerFlowFile != null) {
                         if (length > 0) {
                             splitFlowFile = 
processSession.clone(sourceFlowFile, computedSplitInfo.startOffset, length);
-                            splitFlowFile = processSession.merge( 
Arrays.asList(new FlowFile[] { headerFlowFile, splitFlowFile }), splitFlowFile);
+                            splitFlowFile = 
this.concatenateContents(sourceFlowFile, processSession, headerFlowFile, 
splitFlowFile);
                         } else {
                             splitFlowFile = 
processSession.clone(sourceFlowFile, 0, headerFlowFile.getSize() - 
headerCrlfLength); // trim the last CRLF if split consists of only HEADER
                         }
@@ -315,7 +317,7 @@ public class SplitText extends AbstractProcessor {
                         splitFlowFile = processSession.clone(sourceFlowFile, 
computedSplitInfo.startOffset, length);
                     }
 
-                    splitFlowFile = 
SplitText.this.updateAttributes(processSession, splitFlowFile, 
computedSplitInfo.lineCount, splitFlowFile.getSize(), fragmentId, 
fragmentIndex++,
+                    splitFlowFile = this.updateAttributes(processSession, 
splitFlowFile, computedSplitInfo.lineCount, splitFlowFile.getSize(), 
fragmentId, fragmentIndex++,
                             computedSplitsInfo.size(), 
sourceFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
                     splitFlowFiles.add(splitFlowFile);
                 }
@@ -329,6 +331,30 @@ public class SplitText extends AbstractProcessor {
         return splitFlowFiles;
     }
 
+    /**
+     * Will concatenate the contents of the provided array of {@link FlowFile}s
+     * into a single {@link FlowFile}. While this operation is as general as it
+     * is described in the previous sentence, in the context of this processor
+     * there can only be two {@link FlowFile}s with the first {@link FlowFile}
+     * representing the header content of the split and the second
+     * {@link FlowFile} represents the split itself.
+     */
+    private FlowFile concatenateContents(FlowFile sourceFlowFile, 
ProcessSession session, FlowFile... flowFiles) {
+        FlowFile mergedFlowFile = session.create(sourceFlowFile);
+        for (FlowFile flowFile : flowFiles) {
+            mergedFlowFile = session.append(mergedFlowFile, new 
OutputStreamCallback() {
+                @Override
+                public void process(OutputStream out) throws IOException {
+                    try (InputStream is = session.read(flowFile)) {
+                        IOUtils.copy(is, out);
+                    }
+                }
+            });
+        }
+        session.remove(flowFiles[1]); // in current usage we always have 2 
files
+        return mergedFlowFile;
+    }
+
     private FlowFile updateAttributes(ProcessSession processSession, FlowFile 
splitFlowFile, long splitLineCount, long splitFlowFileSize,
             String splitId, int splitIndex, int splitCount, String 
origFileName) {
         Map<String, String> attributes = new HashMap<>();

Reply via email to