This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d51c675cb NIFI-12709 Added New FlowFlow Attributes for Zip and Tar in 
UnpackContent
9d51c675cb is described below

commit 9d51c675cbfd20e6ecbf2660afd2250ed8752f69
Author: dan-s1 <[email protected]>
AuthorDate: Fri Jul 26 17:10:22 2024 +0000

    NIFI-12709 Added New FlowFlow Attributes for Zip and Tar in UnpackContent
    
    This closes #9122
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/processors/standard/UnpackContent.java    | 138 ++++++++++++++++-----
 .../processors/standard/TestUnpackContent.java     |  38 ++++--
 2 files changed, 137 insertions(+), 39 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index 492c488114..b831751b6e 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@ -67,9 +67,12 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.nio.file.Path;
+import java.time.Instant;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -99,12 +102,17 @@ import java.util.regex.Pattern;
     @WritesAttribute(attribute = "fragment.count", description = "The number 
of unpacked FlowFiles generated from the parent FlowFile"),
     @WritesAttribute(attribute = "segment.original.filename ", description = 
"The filename of the parent FlowFile. Extensions of .tar, .zip or .pkg are 
removed because "
             + "the MergeContent processor automatically adds those extensions 
if it is used to rebuild the original FlowFile"),
-    @WritesAttribute(attribute = "file.lastModifiedTime", description = "The 
date and time that the unpacked file was last modified (tar only)."),
-    @WritesAttribute(attribute = "file.creationTime", description = "The date 
and time that the file was created. This attribute holds always the same value 
as file.lastModifiedTime (tar only)."),
-    @WritesAttribute(attribute = "file.owner", description = "The owner of the 
unpacked file (tar only)"),
-    @WritesAttribute(attribute = "file.group", description = "The group owner 
of the unpacked file (tar only)"),
-    @WritesAttribute(attribute = "file.permissions", description = "The 
read/write/execute permissions of the unpacked file (tar only)"),
-    @WritesAttribute(attribute = "file.encryptionMethod", description = "The 
encryption method for entries in Zip archives")})
+    @WritesAttribute(attribute = 
UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE, description = "The date and 
time that the unpacked file was last modified (tar and zip only)."),
+    @WritesAttribute(attribute = UnpackContent.FILE_CREATION_TIME_ATTRIBUTE, 
description = "The date and time that the file was created. For encrypted zip 
files this attribute" +
+            " always holds the same value as " + 
UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + ". For tar and unencrypted 
zip files if available it will be returned otherwise" +
+            " this will be the same value as" + 
UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + "."),
+    @WritesAttribute(attribute = 
UnpackContent.FILE_LAST_METADATA_CHANGE_ATTRIBUTE, description = "The date and 
time the file's metadata changed (tar only)."),
+    @WritesAttribute(attribute = 
UnpackContent.FILE_LAST_ACCESS_TIME_ATTRIBUTE, description = "The date and time 
the file was last accessed (tar and unencrypted zip files only)"),
+    @WritesAttribute(attribute = UnpackContent.FILE_OWNER_ATTRIBUTE, 
description = "The owner of the unpacked file (tar only)"),
+    @WritesAttribute(attribute = UnpackContent.FILE_GROUP_ATTRIBUTE, 
description = "The group owner of the unpacked file (tar only)"),
+    @WritesAttribute(attribute = UnpackContent.FILE_SIZE_ATTRIBUTE, 
description = "The uncompressed size of the unpacked file (tar and zip only)"),
+    @WritesAttribute(attribute = UnpackContent.FILE_PERMISSIONS_ATTRIBUTE, 
description = "The read/write/execute permissions of the unpacked file (tar and 
unencrypted zip files only)"),
+    @WritesAttribute(attribute = 
UnpackContent.FILE_ENCRYPTION_METHOD_ATTRIBUTE, description = "The encryption 
method for entries in Zip archives")})
 @SeeAlso(MergeContent.class)
 @UseCase(
     description = "Unpack Zip containing filenames with special characters, 
created on Windows with filename charset 'Cp437' or 'IBM437'.",
@@ -131,8 +139,11 @@ public class UnpackContent extends AbstractProcessor {
 
     public static final String FILE_LAST_MODIFIED_TIME_ATTRIBUTE = 
"file.lastModifiedTime";
     public static final String FILE_CREATION_TIME_ATTRIBUTE = 
"file.creationTime";
+    public static final String FILE_LAST_METADATA_CHANGE_ATTRIBUTE = 
"file.lastMetadataChange";
+    public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = 
"file.lastAccessTime";
     public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
     public static final String FILE_GROUP_ATTRIBUTE = "file.group";
+    public static final String FILE_SIZE_ATTRIBUTE = "file.size";
     public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
     public static final String FILE_ENCRYPTION_METHOD_ATTRIBUTE = 
"file.encryptionMethod";
 
@@ -373,6 +384,7 @@ public class UnpackContent extends AbstractProcessor {
         @Override
         public void unpack(final ProcessSession session, final FlowFile 
source, final List<FlowFile> unpacked) {
             final String fragmentId = UUID.randomUUID().toString();
+            final Map<String, String> attributes = new HashMap<>();
             session.read(source, inputStream -> {
                 int fragmentCount = 0;
                 try (final TarArchiveInputStream tarIn = new 
TarArchiveInputStream(new BufferedInputStream(inputStream))) {
@@ -387,26 +399,41 @@ public class UnpackContent extends AbstractProcessor {
 
                         FlowFile unpackedFile = session.create(source);
                         try {
-                            final String timeAsString = 
DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant());
+                            attributes.put(CoreAttributes.FILENAME.key(), 
file.getName());
+                            attributes.put(CoreAttributes.PATH.key(), 
filePathString);
+                            attributes.put(CoreAttributes.MIME_TYPE.key(), 
OCTET_STREAM);
+                            attributes.put(FILE_PERMISSIONS_ATTRIBUTE, 
FileInfo.permissionToString(tarEntry.getMode()));
+                            attributes.put(FILE_OWNER_ATTRIBUTE, 
String.valueOf(tarEntry.getUserName()));
+                            attributes.put(FILE_GROUP_ATTRIBUTE, 
String.valueOf(tarEntry.getGroupName()));
+                            attributes.put(FILE_SIZE_ATTRIBUTE, 
String.valueOf(tarEntry.getRealSize()));
+                            String lastModified = 
DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant());
+                            attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, 
lastModified);
+
+                            if (tarEntry.getCreationTime() != null) {
+                                final String creationTime = 
DATE_TIME_FORMATTER.format(tarEntry.getCreationTime().toInstant());
+                                attributes.put(FILE_CREATION_TIME_ATTRIBUTE, 
creationTime);
+                            } else {
+                                attributes.put(FILE_CREATION_TIME_ATTRIBUTE, 
lastModified);
+                            }
 
-                            unpackedFile = 
session.putAllAttributes(unpackedFile, Map.of(
-                                    CoreAttributes.FILENAME.key(), 
file.getName(),
-                                    CoreAttributes.PATH.key(), filePathString,
-                                    CoreAttributes.MIME_TYPE.key(), 
OCTET_STREAM,
+                            if (tarEntry.getStatusChangeTime() != null) {
+                                final String metadataChangeTime = 
DATE_TIME_FORMATTER.format(tarEntry.getStatusChangeTime().toInstant());
+                                
attributes.put(FILE_LAST_METADATA_CHANGE_ATTRIBUTE, metadataChangeTime);
+                            }
 
-                                    FILE_PERMISSIONS_ATTRIBUTE, 
FileInfo.permissionToString(tarEntry.getMode()),
-                                    FILE_OWNER_ATTRIBUTE, 
String.valueOf(tarEntry.getUserName()),
-                                    FILE_GROUP_ATTRIBUTE, 
String.valueOf(tarEntry.getGroupName()),
+                            if (tarEntry.getLastAccessTime() != null) {
+                                final String lastAccesTime = 
DATE_TIME_FORMATTER.format(tarEntry.getLastAccessTime().toInstant());
+                                
attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, lastAccesTime);
+                            }
 
-                                    FILE_LAST_MODIFIED_TIME_ATTRIBUTE, 
timeAsString,
-                                    FILE_CREATION_TIME_ATTRIBUTE, timeAsString,
+                            attributes.put(FRAGMENT_ID, fragmentId);
+                            attributes.put(FRAGMENT_INDEX, 
String.valueOf(++fragmentCount));
 
-                                    FRAGMENT_ID, fragmentId,
-                                    FRAGMENT_INDEX, 
String.valueOf(++fragmentCount)
-                            ));
+                            unpackedFile = 
session.putAllAttributes(unpackedFile, attributes);
 
                             final long fileSize = tarEntry.getSize();
                             unpackedFile = session.write(unpackedFile, 
outputStream -> StreamUtils.copy(tarIn, outputStream, fileSize));
+                            attributes.clear();
                         } finally {
                             unpacked.add(unpackedFile);
                         }
@@ -470,28 +497,59 @@ public class UnpackContent extends AbstractProcessor {
                 return !directory && (fileFilter == null || 
fileFilter.matcher(fileName).find());
             }
 
-            protected void processEntry(final InputStream zipInputStream, 
final boolean directory, final String zipEntryName, final EncryptionMethod 
encryptionMethod) {
+            protected void processEntry(final InputStream zipInputStream, 
boolean directory, String zipEntryName, Map<String, String> attributes) {
                 if (isFileEntryMatched(directory, zipEntryName)) {
                     final File file = new File(zipEntryName);
                     final String parentDirectory = (file.getParent() == null) 
? PATH_SEPARATOR : file.getParent();
 
                     FlowFile unpackedFile = session.create(sourceFlowFile);
                     try {
-                        unpackedFile = session.putAllAttributes(unpackedFile, 
Map.of(
-                                CoreAttributes.FILENAME.key(), file.getName(),
-                                CoreAttributes.PATH.key(), parentDirectory,
-                                CoreAttributes.MIME_TYPE.key(), OCTET_STREAM,
-                                FILE_ENCRYPTION_METHOD_ATTRIBUTE, 
encryptionMethod.toString(),
-
-                                FRAGMENT_ID, fragmentId,
-                                FRAGMENT_INDEX, String.valueOf(++fragmentIndex)
-                        ));
+                        attributes.put(CoreAttributes.FILENAME.key(), 
file.getName());
+                        attributes.put(CoreAttributes.PATH.key(), 
parentDirectory);
+                        attributes.put(CoreAttributes.MIME_TYPE.key(), 
OCTET_STREAM);
+                        attributes.put(FRAGMENT_ID, fragmentId);
+                        attributes.put(FRAGMENT_INDEX, 
String.valueOf(++fragmentIndex));
+                        unpackedFile = session.putAllAttributes(unpackedFile, 
attributes);
                         unpackedFile = session.write(unpackedFile, 
outputStream -> StreamUtils.copy(zipInputStream, outputStream));
                     } finally {
                         unpacked.add(unpackedFile);
                     }
                 }
             }
+
+            protected void addFileSizeAttribute(long fileSize, Map<String, 
String> attributes) {
+                attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(fileSize));
+            }
+
+            protected void addEncryptionMethodAttribute(EncryptionMethod 
encryptionMethod, Map<String, String> attributes) {
+                attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE, 
encryptionMethod.toString());
+            }
+
+            protected void addFilePermissionsAttribute(int mode, Map<String, 
String> attributes) {
+                if (mode > -1) {
+                    attributes.put(FILE_PERMISSIONS_ATTRIBUTE, 
FileInfo.permissionToString(mode));
+                }
+            }
+
+            protected void addZipEntryTimeAttributes(Instant lastModified, 
Instant creation, Instant lastAccess, Map<String, String> attributes) {
+                String lastModifiedDate = null;
+                if (lastModified != null) {
+                    lastModifiedDate = 
DATE_TIME_FORMATTER.format(lastModified);
+                    attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE, 
lastModifiedDate);
+                }
+
+                if (creation != null) {
+                    final String creationTime = 
DATE_TIME_FORMATTER.format(creation);
+                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, creationTime);
+                } else if (lastModifiedDate != null) {
+                    attributes.put(FILE_CREATION_TIME_ATTRIBUTE, 
lastModifiedDate);
+                }
+
+                if (lastAccess != null) {
+                    final String lastAccessDate = 
DATE_TIME_FORMATTER.format(lastAccess);
+                    attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, 
lastAccessDate);
+                }
+            }
         }
 
         private static class CompressedZipInputStreamCallback extends 
ZipInputStreamCallback {
@@ -518,8 +576,19 @@ public class UnpackContent extends AbstractProcessor {
                 try (final ZipArchiveInputStream zipInputStream = new 
ZipArchiveInputStream(new BufferedInputStream(inputStream),
                     filenameEncoding.toString(), true, 
allowStoredEntriesWithDataDescriptor)) {
                     ZipArchiveEntry zipEntry;
+                    final Map<String, String> attributes = new HashMap<>();
                     while ((zipEntry = zipInputStream.getNextEntry()) != null) 
{
-                        processEntry(zipInputStream, zipEntry.isDirectory(), 
zipEntry.getName(), EncryptionMethod.NONE);
+                        addEncryptionMethodAttribute(EncryptionMethod.NONE, 
attributes);
+                        addFileSizeAttribute(zipEntry.getSize(), attributes);
+                        addFilePermissionsAttribute(zipEntry.getUnixMode(), 
attributes);
+                        // NOTE: Per javadocs, ZipArchiveEntry can return -1 
for getTime() if its not specified
+                        // and getLastAccessTime() can return null if it is 
not specified.
+                        Instant lastModified = 
zipEntry.getLastModifiedDate().toInstant();
+                        Instant creation = zipEntry.getTime() > 0 ? new 
Date(zipEntry.getTime()).toInstant() : null;
+                        Instant lastAccess = zipEntry.getLastAccessTime() != 
null ? zipEntry.getLastAccessTime().toInstant() : null;
+                        addZipEntryTimeAttributes(lastModified, creation, 
lastAccess, attributes);
+                        processEntry(zipInputStream, zipEntry.isDirectory(), 
zipEntry.getName(), attributes);
+                        attributes.clear();
                     }
                 }
             }
@@ -547,8 +616,15 @@ public class UnpackContent extends AbstractProcessor {
             public void process(final InputStream inputStream) throws 
IOException {
                 try (final ZipInputStream zipInputStream = new 
ZipInputStream(new BufferedInputStream(inputStream), password, 
filenameEncoding)) {
                     LocalFileHeader zipEntry;
+                    final Map<String, String> attributes = new HashMap<>();
                     while ((zipEntry = zipInputStream.getNextEntry()) != null) 
{
-                        processEntry(zipInputStream, zipEntry.isDirectory(), 
zipEntry.getFileName(), zipEntry.getEncryptionMethod());
+                        //NOTE: LocalFileHeader has no methods to return 
creation time and the mode.
+                        
addEncryptionMethodAttribute(zipEntry.getEncryptionMethod(), attributes);
+                        addFileSizeAttribute(zipEntry.getUncompressedSize(), 
attributes);
+                        Instant lastModified = zipEntry.getLastModifiedTime() 
> 0 ? new Date(zipEntry.getLastModifiedTime()).toInstant() : null;
+                        addZipEntryTimeAttributes(lastModified, null, null, 
attributes);
+                        processEntry(zipInputStream, zipEntry.isDirectory(), 
zipEntry.getFileName(), attributes);
+                        attributes.clear();
                     }
                 }
             }
diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
index fc2b0226a7..b3207d4494 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
@@ -41,6 +41,7 @@ import java.util.UUID;
 
 import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT;
 import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -50,6 +51,8 @@ public class TestUnpackContent {
 
     private static final Path dataPath = 
Paths.get("src/test/resources/TestUnpackContent");
 
+    private static final DateTimeFormatter TIMESTAMP_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ");
+
     @Test
     public void testTar() throws IOException {
         final TestRunner unpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
@@ -82,19 +85,21 @@ public class TestUnpackContent {
         final List<MockFlowFile> unpacked = 
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
 
         for (final MockFlowFile flowFile : unpacked) {
+            
assertTrue(flowFile.getAttributes().keySet().containsAll(List.of(UnpackContent.FRAGMENT_ID,
 UnpackContent.FRAGMENT_INDEX,
+                    UnpackContent.FRAGMENT_COUNT, 
UnpackContent.SEGMENT_ORIGINAL_FILENAME, UnpackContent.FILE_SIZE_ATTRIBUTE)));
+
             final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
             final String folder = 
flowFile.getAttribute(CoreAttributes.PATH.key());
             final Path path = dataPath.resolve(folder).resolve(filename);
-            assertEquals("rw-r--r--", 
flowFile.getAttribute("file.permissions"));
-            assertEquals("jmcarey", flowFile.getAttribute("file.owner"));
-            assertEquals("mkpasswd", flowFile.getAttribute("file.group"));
-            String modifiedTimeAsString = 
flowFile.getAttribute("file.lastModifiedTime");
 
-            
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(modifiedTimeAsString);
+            assertEquals("rw-r--r--", 
flowFile.getAttribute(UnpackContent.FILE_PERMISSIONS_ATTRIBUTE));
+            assertEquals("jmcarey", 
flowFile.getAttribute(UnpackContent.FILE_OWNER_ATTRIBUTE));
+            assertEquals("mkpasswd", 
flowFile.getAttribute(UnpackContent.FILE_GROUP_ATTRIBUTE));
 
+            String modifiedTimeAsString = 
flowFile.getAttribute("file.lastModifiedTime");
+            assertDoesNotThrow(() -> 
TIMESTAMP_FORMATTER.parse(modifiedTimeAsString));
             String creationTimeAsString = 
flowFile.getAttribute("file.creationTime");
-
-            
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(creationTimeAsString);
+            assertDoesNotThrow(() -> 
TIMESTAMP_FORMATTER.parse(creationTimeAsString));
 
             assertTrue(Files.exists(path));
 
@@ -182,7 +187,24 @@ public class TestUnpackContent {
         autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
 
         final List<MockFlowFile> unpacked = 
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
+
+        final List<String> expectedAttributeNames = List.of(
+                CoreAttributes.FILENAME.key(),
+                CoreAttributes.PATH.key(),
+                UnpackContent.FRAGMENT_ID,
+                UnpackContent.FRAGMENT_INDEX,
+                UnpackContent.FRAGMENT_COUNT,
+                UnpackContent.SEGMENT_ORIGINAL_FILENAME,
+                UnpackContent.FILE_SIZE_ATTRIBUTE,
+                UnpackContent.FILE_CREATION_TIME_ATTRIBUTE,
+                UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE,
+                UnpackContent.FILE_PERMISSIONS_ATTRIBUTE
+        );
+
         for (final MockFlowFile flowFile : unpacked) {
+            for (final String expectedAttributeName : expectedAttributeNames) {
+                flowFile.assertAttributeExists(expectedAttributeName);
+            }
             final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
             final String folder = 
flowFile.getAttribute(CoreAttributes.PATH.key());
             final Path path = dataPath.resolve(folder).resolve(filename);
@@ -191,6 +213,7 @@ public class TestUnpackContent {
             flowFile.assertContentEquals(path.toFile());
         }
     }
+
     @Test
     public void testInvalidZip() throws IOException {
         final TestRunner unpackRunner = TestRunners.newTestRunner(new 
UnpackContent());
@@ -218,7 +241,6 @@ public class TestUnpackContent {
         final List<MockFlowFile> unpacked = 
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_FAILURE);
         for (final MockFlowFile flowFile : unpacked) {
             final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
-           // final String folder = 
flowFile.getAttribute(CoreAttributes.PATH.key());
             final Path path = dataPath.resolve(filename);
             assertTrue(Files.exists(path));
 

Reply via email to