This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 572799a201 NIFI-10273 Supported file entries larger than 8.5GB for TAR 
in MergeContent
572799a201 is described below

commit 572799a201df4f440db18aee6585e731dee5bb34
Author: Michael 81877 <[email protected]>
AuthorDate: Tue Sep 6 15:10:32 2022 +0100

    NIFI-10273 Supported file entries larger than 8.5GB for TAR in MergeContent
    
    This closes #6369
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../apache/nifi/processors/standard/MergeContent.java    | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index 8b223fadb4..a4602de2cf 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.archivers.tar.TarConstants;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -91,6 +92,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicReference;
@@ -771,7 +773,6 @@ public class MergeContent extends BinFiles {
         public FlowFile merge(final Bin bin, final ProcessContext context) {
             final List<FlowFile> contents = bin.getContents();
             final ProcessSession session = bin.getSession();
-
             final boolean keepPath = 
context.getProperty(KEEP_PATH).asBoolean();
             FlowFile bundle = session.create(); // we don't pass the parents 
to the #create method because the parents belong to different sessions
 
@@ -782,7 +783,12 @@ public class MergeContent extends BinFiles {
                     public void process(final OutputStream rawOut) throws 
IOException {
                         try (final OutputStream bufferedOut = new 
BufferedOutputStream(rawOut);
                             final TarArchiveOutputStream out = new 
TarArchiveOutputStream(bufferedOut)) {
+
                             
out.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+                            // if any one of the FlowFiles is larger than the 
default maximum tar entry size, then we set bigNumberMode to handle it
+                            if (getMaxEntrySize(contents) >= 
TarConstants.MAXSIZE) {
+                                
out.setBigNumberMode(TarArchiveOutputStream.BIGNUMBER_POSIX);
+                            }
                             for (final FlowFile flowFile : contents) {
                                 final String path = keepPath ? 
getPath(flowFile) : "";
                                 final String entryName = path + 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
@@ -827,6 +833,14 @@ public class MergeContent extends BinFiles {
             return bundle;
         }
 
+        private long getMaxEntrySize(final List<FlowFile> contents) {
+            final OptionalLong maxSize = contents.stream()
+                .parallel()
+                .mapToLong(ff -> ff.getSize())
+                .max();
+            return maxSize.orElse(0L);
+        }
+
         @Override
         public String getMergedContentType() {
             return "application/tar";

Reply via email to