This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9d51c675cb NIFI-12709 Added New FlowFlow Attributes for Zip and Tar in
UnpackContent
9d51c675cb is described below
commit 9d51c675cbfd20e6ecbf2660afd2250ed8752f69
Author: dan-s1 <[email protected]>
AuthorDate: Fri Jul 26 17:10:22 2024 +0000
NIFI-12709 Added New FlowFlow Attributes for Zip and Tar in UnpackContent
This closes #9122
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/processors/standard/UnpackContent.java | 138 ++++++++++++++++-----
.../processors/standard/TestUnpackContent.java | 38 ++++--
2 files changed, 137 insertions(+), 39 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
index 492c488114..b831751b6e 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UnpackContent.java
@@ -67,9 +67,12 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.file.Path;
+import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -99,12 +102,17 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "fragment.count", description = "The number
of unpacked FlowFiles generated from the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description =
"The filename of the parent FlowFile. Extensions of .tar, .zip or .pkg are
removed because "
+ "the MergeContent processor automatically adds those extensions
if it is used to rebuild the original FlowFile"),
- @WritesAttribute(attribute = "file.lastModifiedTime", description = "The
date and time that the unpacked file was last modified (tar only)."),
- @WritesAttribute(attribute = "file.creationTime", description = "The date
and time that the file was created. This attribute holds always the same value
as file.lastModifiedTime (tar only)."),
- @WritesAttribute(attribute = "file.owner", description = "The owner of the
unpacked file (tar only)"),
- @WritesAttribute(attribute = "file.group", description = "The group owner
of the unpacked file (tar only)"),
- @WritesAttribute(attribute = "file.permissions", description = "The
read/write/execute permissions of the unpacked file (tar only)"),
- @WritesAttribute(attribute = "file.encryptionMethod", description = "The
encryption method for entries in Zip archives")})
+ @WritesAttribute(attribute =
UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE, description = "The date and
time that the unpacked file was last modified (tar and zip only)."),
+ @WritesAttribute(attribute = UnpackContent.FILE_CREATION_TIME_ATTRIBUTE,
description = "The date and time that the file was created. For encrypted zip
files this attribute" +
+ " always holds the same value as " +
UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + ". For tar and unencrypted
zip files if available it will be returned otherwise" +
+ " this will be the same value as" +
UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE + "."),
+ @WritesAttribute(attribute =
UnpackContent.FILE_LAST_METADATA_CHANGE_ATTRIBUTE, description = "The date and
time the file's metadata changed (tar only)."),
+ @WritesAttribute(attribute =
UnpackContent.FILE_LAST_ACCESS_TIME_ATTRIBUTE, description = "The date and time
the file was last accessed (tar and unencrypted zip files only)"),
+ @WritesAttribute(attribute = UnpackContent.FILE_OWNER_ATTRIBUTE,
description = "The owner of the unpacked file (tar only)"),
+ @WritesAttribute(attribute = UnpackContent.FILE_GROUP_ATTRIBUTE,
description = "The group owner of the unpacked file (tar only)"),
+ @WritesAttribute(attribute = UnpackContent.FILE_SIZE_ATTRIBUTE,
description = "The uncompressed size of the unpacked file (tar and zip only)"),
+ @WritesAttribute(attribute = UnpackContent.FILE_PERMISSIONS_ATTRIBUTE,
description = "The read/write/execute permissions of the unpacked file (tar and
unencrypted zip files only)"),
+ @WritesAttribute(attribute =
UnpackContent.FILE_ENCRYPTION_METHOD_ATTRIBUTE, description = "The encryption
method for entries in Zip archives")})
@SeeAlso(MergeContent.class)
@UseCase(
description = "Unpack Zip containing filenames with special characters,
created on Windows with filename charset 'Cp437' or 'IBM437'.",
@@ -131,8 +139,11 @@ public class UnpackContent extends AbstractProcessor {
public static final String FILE_LAST_MODIFIED_TIME_ATTRIBUTE =
"file.lastModifiedTime";
public static final String FILE_CREATION_TIME_ATTRIBUTE =
"file.creationTime";
+ public static final String FILE_LAST_METADATA_CHANGE_ATTRIBUTE =
"file.lastMetadataChange";
+ public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE =
"file.lastAccessTime";
public static final String FILE_OWNER_ATTRIBUTE = "file.owner";
public static final String FILE_GROUP_ATTRIBUTE = "file.group";
+ public static final String FILE_SIZE_ATTRIBUTE = "file.size";
public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions";
public static final String FILE_ENCRYPTION_METHOD_ATTRIBUTE =
"file.encryptionMethod";
@@ -373,6 +384,7 @@ public class UnpackContent extends AbstractProcessor {
@Override
public void unpack(final ProcessSession session, final FlowFile
source, final List<FlowFile> unpacked) {
final String fragmentId = UUID.randomUUID().toString();
+ final Map<String, String> attributes = new HashMap<>();
session.read(source, inputStream -> {
int fragmentCount = 0;
try (final TarArchiveInputStream tarIn = new
TarArchiveInputStream(new BufferedInputStream(inputStream))) {
@@ -387,26 +399,41 @@ public class UnpackContent extends AbstractProcessor {
FlowFile unpackedFile = session.create(source);
try {
- final String timeAsString =
DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant());
+ attributes.put(CoreAttributes.FILENAME.key(),
file.getName());
+ attributes.put(CoreAttributes.PATH.key(),
filePathString);
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
OCTET_STREAM);
+ attributes.put(FILE_PERMISSIONS_ATTRIBUTE,
FileInfo.permissionToString(tarEntry.getMode()));
+ attributes.put(FILE_OWNER_ATTRIBUTE,
String.valueOf(tarEntry.getUserName()));
+ attributes.put(FILE_GROUP_ATTRIBUTE,
String.valueOf(tarEntry.getGroupName()));
+ attributes.put(FILE_SIZE_ATTRIBUTE,
String.valueOf(tarEntry.getRealSize()));
+ String lastModified =
DATE_TIME_FORMATTER.format(tarEntry.getModTime().toInstant());
+ attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE,
lastModified);
+
+ if (tarEntry.getCreationTime() != null) {
+ final String creationTime =
DATE_TIME_FORMATTER.format(tarEntry.getCreationTime().toInstant());
+ attributes.put(FILE_CREATION_TIME_ATTRIBUTE,
creationTime);
+ } else {
+ attributes.put(FILE_CREATION_TIME_ATTRIBUTE,
lastModified);
+ }
- unpackedFile =
session.putAllAttributes(unpackedFile, Map.of(
- CoreAttributes.FILENAME.key(),
file.getName(),
- CoreAttributes.PATH.key(), filePathString,
- CoreAttributes.MIME_TYPE.key(),
OCTET_STREAM,
+ if (tarEntry.getStatusChangeTime() != null) {
+ final String metadataChangeTime =
DATE_TIME_FORMATTER.format(tarEntry.getStatusChangeTime().toInstant());
+
attributes.put(FILE_LAST_METADATA_CHANGE_ATTRIBUTE, metadataChangeTime);
+ }
- FILE_PERMISSIONS_ATTRIBUTE,
FileInfo.permissionToString(tarEntry.getMode()),
- FILE_OWNER_ATTRIBUTE,
String.valueOf(tarEntry.getUserName()),
- FILE_GROUP_ATTRIBUTE,
String.valueOf(tarEntry.getGroupName()),
+ if (tarEntry.getLastAccessTime() != null) {
+ final String lastAccesTime =
DATE_TIME_FORMATTER.format(tarEntry.getLastAccessTime().toInstant());
+
attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, lastAccesTime);
+ }
- FILE_LAST_MODIFIED_TIME_ATTRIBUTE,
timeAsString,
- FILE_CREATION_TIME_ATTRIBUTE, timeAsString,
+ attributes.put(FRAGMENT_ID, fragmentId);
+ attributes.put(FRAGMENT_INDEX,
String.valueOf(++fragmentCount));
- FRAGMENT_ID, fragmentId,
- FRAGMENT_INDEX,
String.valueOf(++fragmentCount)
- ));
+ unpackedFile =
session.putAllAttributes(unpackedFile, attributes);
final long fileSize = tarEntry.getSize();
unpackedFile = session.write(unpackedFile,
outputStream -> StreamUtils.copy(tarIn, outputStream, fileSize));
+ attributes.clear();
} finally {
unpacked.add(unpackedFile);
}
@@ -470,28 +497,59 @@ public class UnpackContent extends AbstractProcessor {
return !directory && (fileFilter == null ||
fileFilter.matcher(fileName).find());
}
- protected void processEntry(final InputStream zipInputStream,
final boolean directory, final String zipEntryName, final EncryptionMethod
encryptionMethod) {
+ protected void processEntry(final InputStream zipInputStream,
boolean directory, String zipEntryName, Map<String, String> attributes) {
if (isFileEntryMatched(directory, zipEntryName)) {
final File file = new File(zipEntryName);
final String parentDirectory = (file.getParent() == null)
? PATH_SEPARATOR : file.getParent();
FlowFile unpackedFile = session.create(sourceFlowFile);
try {
- unpackedFile = session.putAllAttributes(unpackedFile,
Map.of(
- CoreAttributes.FILENAME.key(), file.getName(),
- CoreAttributes.PATH.key(), parentDirectory,
- CoreAttributes.MIME_TYPE.key(), OCTET_STREAM,
- FILE_ENCRYPTION_METHOD_ATTRIBUTE,
encryptionMethod.toString(),
-
- FRAGMENT_ID, fragmentId,
- FRAGMENT_INDEX, String.valueOf(++fragmentIndex)
- ));
+ attributes.put(CoreAttributes.FILENAME.key(),
file.getName());
+ attributes.put(CoreAttributes.PATH.key(),
parentDirectory);
+ attributes.put(CoreAttributes.MIME_TYPE.key(),
OCTET_STREAM);
+ attributes.put(FRAGMENT_ID, fragmentId);
+ attributes.put(FRAGMENT_INDEX,
String.valueOf(++fragmentIndex));
+ unpackedFile = session.putAllAttributes(unpackedFile,
attributes);
unpackedFile = session.write(unpackedFile,
outputStream -> StreamUtils.copy(zipInputStream, outputStream));
} finally {
unpacked.add(unpackedFile);
}
}
}
+
+ protected void addFileSizeAttribute(long fileSize, Map<String,
String> attributes) {
+ attributes.put(FILE_SIZE_ATTRIBUTE, String.valueOf(fileSize));
+ }
+
+ protected void addEncryptionMethodAttribute(EncryptionMethod
encryptionMethod, Map<String, String> attributes) {
+ attributes.put(FILE_ENCRYPTION_METHOD_ATTRIBUTE,
encryptionMethod.toString());
+ }
+
+ protected void addFilePermissionsAttribute(int mode, Map<String,
String> attributes) {
+ if (mode > -1) {
+ attributes.put(FILE_PERMISSIONS_ATTRIBUTE,
FileInfo.permissionToString(mode));
+ }
+ }
+
+ protected void addZipEntryTimeAttributes(Instant lastModified,
Instant creation, Instant lastAccess, Map<String, String> attributes) {
+ String lastModifiedDate = null;
+ if (lastModified != null) {
+ lastModifiedDate =
DATE_TIME_FORMATTER.format(lastModified);
+ attributes.put(FILE_LAST_MODIFIED_TIME_ATTRIBUTE,
lastModifiedDate);
+ }
+
+ if (creation != null) {
+ final String creationTime =
DATE_TIME_FORMATTER.format(creation);
+ attributes.put(FILE_CREATION_TIME_ATTRIBUTE, creationTime);
+ } else if (lastModifiedDate != null) {
+ attributes.put(FILE_CREATION_TIME_ATTRIBUTE,
lastModifiedDate);
+ }
+
+ if (lastAccess != null) {
+ final String lastAccessDate =
DATE_TIME_FORMATTER.format(lastAccess);
+ attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE,
lastAccessDate);
+ }
+ }
}
private static class CompressedZipInputStreamCallback extends
ZipInputStreamCallback {
@@ -518,8 +576,19 @@ public class UnpackContent extends AbstractProcessor {
try (final ZipArchiveInputStream zipInputStream = new
ZipArchiveInputStream(new BufferedInputStream(inputStream),
filenameEncoding.toString(), true,
allowStoredEntriesWithDataDescriptor)) {
ZipArchiveEntry zipEntry;
+ final Map<String, String> attributes = new HashMap<>();
while ((zipEntry = zipInputStream.getNextEntry()) != null)
{
- processEntry(zipInputStream, zipEntry.isDirectory(),
zipEntry.getName(), EncryptionMethod.NONE);
+ addEncryptionMethodAttribute(EncryptionMethod.NONE,
attributes);
+ addFileSizeAttribute(zipEntry.getSize(), attributes);
+ addFilePermissionsAttribute(zipEntry.getUnixMode(),
attributes);
+ // NOTE: Per javadocs, ZipArchiveEntry can return -1
for getTime() if its not specified
+ // and getLastAccessTime() can return null if it is
not specified.
+ Instant lastModified =
zipEntry.getLastModifiedDate().toInstant();
+ Instant creation = zipEntry.getTime() > 0 ? new
Date(zipEntry.getTime()).toInstant() : null;
+ Instant lastAccess = zipEntry.getLastAccessTime() !=
null ? zipEntry.getLastAccessTime().toInstant() : null;
+ addZipEntryTimeAttributes(lastModified, creation,
lastAccess, attributes);
+ processEntry(zipInputStream, zipEntry.isDirectory(),
zipEntry.getName(), attributes);
+ attributes.clear();
}
}
}
@@ -547,8 +616,15 @@ public class UnpackContent extends AbstractProcessor {
public void process(final InputStream inputStream) throws
IOException {
try (final ZipInputStream zipInputStream = new
ZipInputStream(new BufferedInputStream(inputStream), password,
filenameEncoding)) {
LocalFileHeader zipEntry;
+ final Map<String, String> attributes = new HashMap<>();
while ((zipEntry = zipInputStream.getNextEntry()) != null)
{
- processEntry(zipInputStream, zipEntry.isDirectory(),
zipEntry.getFileName(), zipEntry.getEncryptionMethod());
+ //NOTE: LocalFileHeader has no methods to return
creation time and the mode.
+
addEncryptionMethodAttribute(zipEntry.getEncryptionMethod(), attributes);
+ addFileSizeAttribute(zipEntry.getUncompressedSize(),
attributes);
+ Instant lastModified = zipEntry.getLastModifiedTime()
> 0 ? new Date(zipEntry.getLastModifiedTime()).toInstant() : null;
+ addZipEntryTimeAttributes(lastModified, null, null,
attributes);
+ processEntry(zipInputStream, zipEntry.isDirectory(),
zipEntry.getFileName(), attributes);
+ attributes.clear();
}
}
}
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
index fc2b0226a7..b3207d4494 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUnpackContent.java
@@ -41,6 +41,7 @@ import java.util.UUID;
import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_COUNT;
import static org.apache.nifi.processors.standard.SplitContent.FRAGMENT_ID;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -50,6 +51,8 @@ public class TestUnpackContent {
private static final Path dataPath =
Paths.get("src/test/resources/TestUnpackContent");
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ");
+
@Test
public void testTar() throws IOException {
final TestRunner unpackRunner = TestRunners.newTestRunner(new
UnpackContent());
@@ -82,19 +85,21 @@ public class TestUnpackContent {
final List<MockFlowFile> unpacked =
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
for (final MockFlowFile flowFile : unpacked) {
+
assertTrue(flowFile.getAttributes().keySet().containsAll(List.of(UnpackContent.FRAGMENT_ID,
UnpackContent.FRAGMENT_INDEX,
+ UnpackContent.FRAGMENT_COUNT,
UnpackContent.SEGMENT_ORIGINAL_FILENAME, UnpackContent.FILE_SIZE_ATTRIBUTE)));
+
final String filename =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String folder =
flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(folder).resolve(filename);
- assertEquals("rw-r--r--",
flowFile.getAttribute("file.permissions"));
- assertEquals("jmcarey", flowFile.getAttribute("file.owner"));
- assertEquals("mkpasswd", flowFile.getAttribute("file.group"));
- String modifiedTimeAsString =
flowFile.getAttribute("file.lastModifiedTime");
-
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(modifiedTimeAsString);
+ assertEquals("rw-r--r--",
flowFile.getAttribute(UnpackContent.FILE_PERMISSIONS_ATTRIBUTE));
+ assertEquals("jmcarey",
flowFile.getAttribute(UnpackContent.FILE_OWNER_ATTRIBUTE));
+ assertEquals("mkpasswd",
flowFile.getAttribute(UnpackContent.FILE_GROUP_ATTRIBUTE));
+ String modifiedTimeAsString =
flowFile.getAttribute("file.lastModifiedTime");
+ assertDoesNotThrow(() ->
TIMESTAMP_FORMATTER.parse(modifiedTimeAsString));
String creationTimeAsString =
flowFile.getAttribute("file.creationTime");
-
-
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ").parse(creationTimeAsString);
+ assertDoesNotThrow(() ->
TIMESTAMP_FORMATTER.parse(creationTimeAsString));
assertTrue(Files.exists(path));
@@ -182,7 +187,24 @@ public class TestUnpackContent {
autoUnpackRunner.assertTransferCount(UnpackContent.REL_FAILURE, 0);
final List<MockFlowFile> unpacked =
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_SUCCESS);
+
+ final List<String> expectedAttributeNames = List.of(
+ CoreAttributes.FILENAME.key(),
+ CoreAttributes.PATH.key(),
+ UnpackContent.FRAGMENT_ID,
+ UnpackContent.FRAGMENT_INDEX,
+ UnpackContent.FRAGMENT_COUNT,
+ UnpackContent.SEGMENT_ORIGINAL_FILENAME,
+ UnpackContent.FILE_SIZE_ATTRIBUTE,
+ UnpackContent.FILE_CREATION_TIME_ATTRIBUTE,
+ UnpackContent.FILE_LAST_MODIFIED_TIME_ATTRIBUTE,
+ UnpackContent.FILE_PERMISSIONS_ATTRIBUTE
+ );
+
for (final MockFlowFile flowFile : unpacked) {
+ for (final String expectedAttributeName : expectedAttributeNames) {
+ flowFile.assertAttributeExists(expectedAttributeName);
+ }
final String filename =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
final String folder =
flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(folder).resolve(filename);
@@ -191,6 +213,7 @@ public class TestUnpackContent {
flowFile.assertContentEquals(path.toFile());
}
}
+
@Test
public void testInvalidZip() throws IOException {
final TestRunner unpackRunner = TestRunners.newTestRunner(new
UnpackContent());
@@ -218,7 +241,6 @@ public class TestUnpackContent {
final List<MockFlowFile> unpacked =
unpackRunner.getFlowFilesForRelationship(UnpackContent.REL_FAILURE);
for (final MockFlowFile flowFile : unpacked) {
final String filename =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
- // final String folder =
flowFile.getAttribute(CoreAttributes.PATH.key());
final Path path = dataPath.resolve(filename);
assertTrue(Files.exists(path));