This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 94457876b6 NIFI-14444 Added File Filter and Path Filter in ListSmb
94457876b6 is described below
commit 94457876b6831575e6af8568bba50a73ee5f402a
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Mon Apr 7 21:56:29 2025 +0200
NIFI-14444 Added File Filter and Path Filter in ListSmb
Also renamed File Name Suffix Filter property to Ignore Files with Suffix
This closes #9854.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../org/apache/nifi/processors/smb/ListSmb.java | 46 +++++++++++++++++----
.../org/apache/nifi/processors/smb/ListSmbIT.java | 47 +++++++++++++++++++++-
.../apache/nifi/processors/smb/ListSmbTest.java | 4 +-
3 files changed, 86 insertions(+), 11 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
index 371c974ea9..b2f45ec4a5 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
@@ -50,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;
+import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -74,6 +75,7 @@ import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
import org.apache.nifi.processors.smb.util.InitialListingStrategy;
@@ -203,9 +205,27 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
.identifiesControllerService(SmbClientProviderService.class)
.build();
- public static final PropertyDescriptor FILE_NAME_SUFFIX_FILTER = new
Builder()
+ public static final PropertyDescriptor FILE_FILTER = new Builder()
+ .name("file-filter")
+ .displayName("File Filter")
+ .description("Only files whose names match the given regular
expression will be listed.")
+ .required(false)
+ .addValidator(NON_BLANK_VALIDATOR)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PATH_FILTER = new Builder()
+ .name("path-filter")
+ .displayName("Path Filter")
+ .description("Only files whose paths (up to the file's parent
directory) match the given regular expression will be listed.")
+ .required(false)
+ .addValidator(NON_BLANK_VALIDATOR)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor IGNORE_FILES_WITH_SUFFIX = new
Builder()
.name("file-name-suffix-filter")
- .displayName("File Name Suffix Filter")
+ .displayName("Ignore Files with Suffix")
.description("Files ending with the given suffix will be omitted.
Can be used to make sure that files "
+ "that are still uploading are not listed multiple times,
by having those files have a suffix "
+ "and remove the suffix once the upload finishes. This is
highly recommended when using "
@@ -236,7 +256,9 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
INITIAL_LISTING_STRATEGY,
INITIAL_LISTING_TIMESTAMP,
DIRECTORY,
- FILE_NAME_SUFFIX_FILTER,
+ FILE_FILTER,
+ PATH_FILTER,
+ IGNORE_FILES_WITH_SUFFIX,
AbstractListProcessor.RECORD_WRITER,
MINIMUM_AGE,
MAXIMUM_AGE,
@@ -328,7 +350,7 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
@Override
protected boolean isListingResetNecessary(PropertyDescriptor property) {
- return asList(SMB_CLIENT_PROVIDER_SERVICE, DIRECTORY,
FILE_NAME_SUFFIX_FILTER).contains(property);
+ return asList(SMB_CLIENT_PROVIDER_SERVICE, DIRECTORY,
IGNORE_FILES_WITH_SUFFIX).contains(property);
}
@Override
@@ -375,7 +397,9 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
final Double maximumSizeOrNull =
context.getProperty(MAXIMUM_SIZE).isSet() ?
context.getProperty(MAXIMUM_SIZE).asDataSize(DataUnit.B)
: null;
- final String suffixOrNull =
context.getProperty(FILE_NAME_SUFFIX_FILTER).getValue();
+ final Pattern filePatternOrNull =
context.getProperty(FILE_FILTER).isSet() ?
Pattern.compile(context.getProperty(FILE_FILTER).getValue()) : null;
+ final Pattern pathPatternOrNull =
context.getProperty(PATH_FILTER).isSet() ?
Pattern.compile(context.getProperty(PATH_FILTER).getValue()) : null;
+ final String ignoreSuffixOrNull =
context.getProperty(IGNORE_FILES_WITH_SUFFIX).getValue();
final long now = getCurrentTime();
Predicate<SmbListableEntity> filter = entity -> now -
entity.getLastModifiedTime() >= minimumAge;
@@ -400,8 +424,16 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
filter = filter.and(entity -> entity.getSize() <=
maximumSizeOrNull);
}
- if (suffixOrNull != null) {
- filter = filter.and(entity ->
!entity.getName().endsWith(suffixOrNull));
+ if (filePatternOrNull != null) {
+ filter = filter.and(entity ->
filePatternOrNull.matcher(entity.getName()).matches());
+ }
+
+ if (pathPatternOrNull != null) {
+ filter = filter.and(entity ->
pathPatternOrNull.matcher(entity.getPath()).matches());
+ }
+
+ if (ignoreSuffixOrNull != null) {
+ filter = filter.and(entity ->
!entity.getName().endsWith(ignoreSuffixOrNull));
}
return filter;
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
index 9a132aed0c..41c545961f 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
@@ -23,11 +23,13 @@ import static
org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_
import static
org.apache.nifi.processor.util.list.AbstractListProcessor.RECORD_WRITER;
import static
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY;
-import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER;
+import static org.apache.nifi.processors.smb.ListSmb.FILE_FILTER;
+import static org.apache.nifi.processors.smb.ListSmb.IGNORE_FILES_WITH_SUFFIX;
import static org.apache.nifi.processors.smb.ListSmb.INITIAL_LISTING_STRATEGY;
import static org.apache.nifi.processors.smb.ListSmb.INITIAL_LISTING_TIMESTAMP;
import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE;
import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_SIZE;
+import static org.apache.nifi.processors.smb.ListSmb.PATH_FILTER;
import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
@@ -40,6 +42,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processors.smb.util.InitialListingStrategy;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.services.smb.SmbClientProviderService;
@@ -198,12 +201,52 @@ public class ListSmbIT extends SambaTestContainers {
}
+ @Test
+ public void shouldFilterByFileFilter() throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
+
+ testRunner.setProperty(MINIMUM_AGE, "0 ms");
+ testRunner.setProperty(FILE_FILTER, "^(?!.*skip).*");
+ testRunner.setProperty(LISTING_STRATEGY, "none");
+
+ writeFile("should_list_this", generateContentWithSize(1));
+ writeFile("should_skip_this", generateContentWithSize(1));
+
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ final MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
+ flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(),
"should_list_this");
+
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
+ @Test
+ public void shouldFilterByPathFilter() throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
+
+ testRunner.setProperty(MINIMUM_AGE, "0 ms");
+ testRunner.setProperty(PATH_FILTER, "dir1/.*");
+ testRunner.setProperty(LISTING_STRATEGY, "none");
+
+ writeFile("dir1/dir11/should_list_this", generateContentWithSize(1));
+ writeFile("dir2/dir21/should_skip_this", generateContentWithSize(1));
+
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ final MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(REL_SUCCESS).getFirst();
+ flowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(),
"should_list_this");
+
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
@Test
public void shouldFilterByGivenSuffix() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
testRunner.setProperty(MINIMUM_AGE, "0 ms");
- testRunner.setProperty(FILE_NAME_SUFFIX_FILTER, ".suffix");
+ testRunner.setProperty(IGNORE_FILES_WITH_SUFFIX, ".suffix");
testRunner.setProperty(LISTING_STRATEGY, "none");
writeFile("should_list_this", generateContentWithSize(1));
writeFile("should_skip_this.suffix", generateContentWithSize(1));
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
index a308856cef..cacf587634 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
@@ -25,7 +25,7 @@ import static
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCC
import static
org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION;
import static
org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE;
import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY;
-import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER;
+import static org.apache.nifi.processors.smb.ListSmb.IGNORE_FILES_WITH_SUFFIX;
import static org.apache.nifi.processors.smb.ListSmb.INITIAL_LISTING_STRATEGY;
import static org.apache.nifi.processors.smb.ListSmb.INITIAL_LISTING_TIMESTAMP;
import static org.apache.nifi.processors.smb.ListSmb.MAXIMUM_AGE;
@@ -105,7 +105,7 @@ class ListSmbTest {
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.clearTransferState();
- testRunner.setProperty(FILE_NAME_SUFFIX_FILTER, "suffix_changed");
+ testRunner.setProperty(IGNORE_FILES_WITH_SUFFIX, "suffix_changed");
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.clearTransferState();