This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5808ed2bf7 NIFI-14429 Added Initial Listing Strategy in ListSmb
5808ed2bf7 is described below
commit 5808ed2bf7dca56a5a6d4cd53b22d1529776940b
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Thu Apr 3 20:00:46 2025 +0200
NIFI-14429 Added Initial Listing Strategy in ListSmb
Also added dependsOn() to Entity Tracking* properties
This closes #9845.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../org/apache/nifi/processors/smb/ListSmb.java | 112 +++++++++++++++++++--
.../smb/util/InitialListingStrategy.java | 49 +++++++++
.../org/apache/nifi/processors/smb/ListSmbIT.java | 50 +++++++++
.../apache/nifi/processors/smb/ListSmbTest.java | 56 +++++++++--
4 files changed, 251 insertions(+), 16 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
index a95c9da198..371c974ea9 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
@@ -40,7 +40,10 @@ import static
org.apache.nifi.services.smb.SmbListableEntity.SIZE;
import java.io.IOException;
import java.net.URI;
+import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.format.DateTimeParseException;
+import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -59,6 +62,7 @@ import
org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.PropertyValue;
@@ -69,8 +73,10 @@ import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.list.AbstractListProcessor;
import org.apache.nifi.processor.util.list.ListedEntityTracker;
+import org.apache.nifi.processors.smb.util.InitialListingStrategy;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.services.smb.SmbClientProviderService;
@@ -82,8 +88,8 @@ import org.apache.nifi.services.smb.SmbListableEntity;
@Tags({"samba, smb, cifs, files", "list"})
@SeeAlso({PutSmbFile.class, GetSmbFile.class, FetchSmb.class})
@CapabilityDescription("Lists concrete files shared via SMB protocol. " +
- "Each listed file may result in one flowfile, the metadata being
written as flowfile attributes. " +
- "Or - in case the 'Record Writer' property is set - the entire result
is written as records to a single flowfile. "
+ "Each listed file may result in one FlowFile, the metadata being
written as FlowFile attributes. " +
+ "Or - in case the 'Record Writer' property is set - the entire result
is written as records to a single FlowFile. "
+
"This Processor is designed to run on Primary Node only in a cluster.
If the primary node changes, the new Primary Node will pick up where the "
+
@@ -169,6 +175,26 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
.allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS)
.build();
+ public static final PropertyDescriptor INITIAL_LISTING_STRATEGY = new
Builder()
+ .name("initial-listing-strategy")
+ .displayName("Initial Listing Strategy")
+ .description("Specifies how to handle existing files on the SMB
share when the processor is started for the first time (or its state has been
cleared).")
+ .required(true)
+ .allowableValues(InitialListingStrategy.class)
+ .defaultValue(InitialListingStrategy.ALL_FILES.getValue())
+ .dependsOn(SMB_LISTING_STRATEGY, BY_TIMESTAMPS)
+ .build();
+
+ public static final PropertyDescriptor INITIAL_LISTING_TIMESTAMP = new
Builder()
+ .name("initial-listing-timestamp")
+ .displayName("Initial Listing Timestamp")
+ .description("The timestamp from which the files will be listed
when the processor is started for the first time (or its state has been
cleared). " +
+ "The value can be specified as an epoch timestamp in
milliseconds or as a UTC datetime in a format such as 2025-02-01T00:00:00Z")
+ .required(true)
+ .dependsOn(INITIAL_LISTING_STRATEGY,
InitialListingStrategy.FROM_TIMESTAMP)
+ .addValidator(NON_BLANK_VALIDATOR)
+ .build();
+
public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new
Builder()
.name("smb-client-provider-service")
.displayName("SMB Client Provider Service")
@@ -189,9 +215,26 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
.addValidator(new MustNotContainDirectorySeparatorsValidator())
.build();
+ public static final PropertyDescriptor TRACKING_STATE_CACHE = new Builder()
+ .fromPropertyDescriptor(ListedEntityTracker.TRACKING_STATE_CACHE)
+ .dependsOn(SMB_LISTING_STRATEGY, BY_ENTITIES)
+ .build();
+
+ public static final PropertyDescriptor TRACKING_TIME_WINDOW = new Builder()
+ .fromPropertyDescriptor(ListedEntityTracker.TRACKING_TIME_WINDOW)
+ .dependsOn(SMB_LISTING_STRATEGY, BY_ENTITIES)
+ .build();
+
+ public static final PropertyDescriptor INITIAL_LISTING_TARGET = new
Builder()
+ .fromPropertyDescriptor(ListedEntityTracker.INITIAL_LISTING_TARGET)
+ .dependsOn(SMB_LISTING_STRATEGY, BY_ENTITIES)
+ .build();
+
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
SMB_CLIENT_PROVIDER_SERVICE,
SMB_LISTING_STRATEGY,
+ INITIAL_LISTING_STRATEGY,
+ INITIAL_LISTING_TIMESTAMP,
DIRECTORY,
FILE_NAME_SUFFIX_FILTER,
AbstractListProcessor.RECORD_WRITER,
@@ -200,16 +243,37 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
MINIMUM_SIZE,
MAXIMUM_SIZE,
AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION,
- ListedEntityTracker.TRACKING_STATE_CACHE,
- ListedEntityTracker.TRACKING_TIME_WINDOW,
- ListedEntityTracker.INITIAL_LISTING_TARGET
+ TRACKING_STATE_CACHE,
+ TRACKING_TIME_WINDOW,
+ INITIAL_LISTING_TARGET
);
+ private volatile Long initialListingTimestamp;
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}
+ @Override
+ protected void customValidate(ValidationContext validationContext,
Collection<ValidationResult> validationResults) {
+ try {
+ getInitialListingTimestamp(validationContext);
+ } catch (InvalidTimestampException ite) {
+ validationResults.add(new ValidationResult.Builder()
+ .subject(INITIAL_LISTING_TIMESTAMP.getDisplayName())
+ .explanation(ite.getMessage())
+ .valid(false)
+ .build());
+ }
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) throws IOException {
+ boolean isStateEmpty =
context.getStateManager().getState(getStateScope(context)).toMap().isEmpty();
+ initialListingTimestamp = isStateEmpty ?
getInitialListingTimestamp(context) : null;
+ }
+
@Override
protected Map<String, String> createAttributes(SmbListableEntity entity,
ProcessContext context) {
final Map<String, String> attributes = new TreeMap<>();
@@ -324,6 +388,10 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
filter = filter.and(entity -> entity.getLastModifiedTime() >=
minTimestampOrNull);
}
+ if (initialListingTimestamp != null) {
+ filter = filter.and(entity -> entity.getLastModifiedTime() >=
initialListingTimestamp);
+ }
+
if (minimumSizeOrNull != null) {
filter = filter.and(entity -> entity.getSize() >=
minimumSizeOrNull);
}
@@ -348,7 +416,7 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
try {
clientService.close();
} catch (Exception e) {
- throw new RuntimeException("Could not close SMB client", e);
+ throw new ProcessException("Could not close SMB client", e);
}
});
}
@@ -359,6 +427,32 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
return "/".equals(directory) ? "" : directory;
}
+ private Long getInitialListingTimestamp(PropertyContext context) {
+ final String listingStrategy =
context.getProperty(SMB_LISTING_STRATEGY).getValue();
+
+ if (BY_TIMESTAMPS.getValue().equals(listingStrategy)) {
+ final InitialListingStrategy initialListingStrategy =
context.getProperty(INITIAL_LISTING_STRATEGY).asAllowableValue(InitialListingStrategy.class);
+
+ if (InitialListingStrategy.NEW_FILES == initialListingStrategy) {
+ return Instant.now().toEpochMilli();
+ } else if (InitialListingStrategy.FROM_TIMESTAMP ==
initialListingStrategy) {
+ final String initialListingTimestamp =
context.getProperty(INITIAL_LISTING_TIMESTAMP).getValue();
+
+ try {
+ return
Instant.parse(initialListingTimestamp).toEpochMilli();
+ } catch (DateTimeParseException dtpe) {
+ try {
+ return Long.parseLong(initialListingTimestamp);
+ } catch (NumberFormatException nfe) {
+ throw new
InvalidTimestampException(initialListingTimestamp);
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
private static class MustNotContainDirectorySeparatorsValidator implements
Validator {
@Override
@@ -373,4 +467,10 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
}
+ private static class InvalidTimestampException extends RuntimeException {
+ InvalidTimestampException(String timestamp) {
+ super(String.format("'%s' is neither an epoch timestamp nor a UTC
datetime.", timestamp));
+ }
+ }
+
}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/InitialListingStrategy.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/InitialListingStrategy.java
new file mode 100644
index 0000000000..a2d7dbb49b
--- /dev/null
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/InitialListingStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb.util;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum InitialListingStrategy implements DescribedValue {
+
+ ALL_FILES("All Files", "Lists all existing files"),
+ NEW_FILES("New Files", "Lists only newly created files"),
+ FROM_TIMESTAMP("From Timestamp", "Lists only files created after the
specified timestamp (inclusively)");
+
+ private final String displayName;
+ private final String description;
+
+ InitialListingStrategy(String displayName, String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
index f059502353..9a132aed0c 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
@@ -18,11 +18,14 @@ package org.apache.nifi.processors.smb;
import static java.util.Arrays.asList;
import static java.util.stream.Collectors.toSet;
+import static
org.apache.nifi.processor.util.list.AbstractListProcessor.BY_TIMESTAMPS;
import static
org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY;
import static
org.apache.nifi.processor.util.list.AbstractListProcessor.RECORD_WRITER;
import static
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY;
import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER;
+import static org.apache.nifi.processors.smb.ListSmb.INITIAL_LISTING_STRATEGY;
+import static org.apache.nifi.processors.smb.ListSmb.INITIAL_LISTING_TIMESTAMP;
import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE;
import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_SIZE;
import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
@@ -31,10 +34,13 @@ import static
org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
import static org.apache.nifi.util.TestRunners.newTestRunner;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.time.Instant;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+
+import org.apache.nifi.processors.smb.util.InitialListingStrategy;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.services.smb.SmbClientProviderService;
import org.apache.nifi.services.smb.SmbjClientProviderService;
@@ -207,4 +213,48 @@ public class ListSmbIT extends SambaTestContainers {
testRunner.disableControllerService(smbjClientProviderService);
}
+ @Test
+ void testInitialListingStrategyAllFiles() throws Exception {
+ testInitialListingStrategy(InitialListingStrategy.ALL_FILES, 2);
+ }
+
+ @Test
+ void testInitialListingStrategyNewFiles() throws Exception {
+ testInitialListingStrategy(InitialListingStrategy.NEW_FILES, 0);
+ }
+
+ @Test
+ void testInitialListingStrategyFromTimestamp() throws Exception {
+ testInitialListingStrategy(InitialListingStrategy.FROM_TIMESTAMP, 1);
+ }
+
+ private void testInitialListingStrategy(InitialListingStrategy
initialListingStrategy, int expectedCount) throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
+ testRunner.setProperty(MINIMUM_AGE, "0 ms");
+ testRunner.setProperty(LISTING_STRATEGY, BY_TIMESTAMPS);
+ testRunner.setProperty(INITIAL_LISTING_STRATEGY,
initialListingStrategy);
+
+ writeFile("1.txt", generateContentWithSize(1));
+ Thread.sleep(100);
+
+ testRunner.setProperty(INITIAL_LISTING_TIMESTAMP,
Instant.now().toString()); // ignored if initialListingStrategy is not
FROM_TIMESTAMP
+
+ writeFile("2.txt", generateContentWithSize(1));
+ Thread.sleep(100);
+
+ testRunner.run(1, false, true);
+ testRunner.assertTransferCount(REL_SUCCESS, expectedCount);
+ testRunner.clearTransferState();
+
+ writeFile("3.txt", generateContentWithSize(1));
+ Thread.sleep(100);
+
+ testRunner.run(1, true, false);
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.clearTransferState();
+
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
index 621ec74209..a308856cef 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
@@ -19,15 +19,19 @@ package org.apache.nifi.processors.smb;
import static java.util.Arrays.stream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static
org.apache.nifi.processor.util.list.AbstractListProcessor.BY_TIMESTAMPS;
import static
org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY;
import static
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
import static
org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION;
import static
org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE;
import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY;
import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER;
+import static org.apache.nifi.processors.smb.ListSmb.INITIAL_LISTING_STRATEGY;
+import static org.apache.nifi.processors.smb.ListSmb.INITIAL_LISTING_TIMESTAMP;
import static org.apache.nifi.processors.smb.ListSmb.MAXIMUM_AGE;
import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE;
import static
org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.processors.smb.ListSmb.SMB_LISTING_STRATEGY;
import static org.apache.nifi.util.TestRunners.newTestRunner;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
@@ -45,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.processor.util.list.ListedEntity;
+import org.apache.nifi.processors.smb.util.InitialListingStrategy;
import org.apache.nifi.services.smb.SmbClientProviderService;
import org.apache.nifi.services.smb.SmbClientService;
import org.apache.nifi.services.smb.SmbListableEntity;
@@ -88,7 +93,7 @@ class ListSmbTest {
testRunner.addControllerService("cacheService", cacheService);
testRunner.setProperty(TRACKING_STATE_CACHE, "cacheService");
testRunner.enableControllerService(cacheService);
- final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
+ final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSmbClientService(testRunner);
long now = System.currentTimeMillis();
mockSmbFolders(mockNifiSmbClientService,
listableEntity("should_list_this_again_after_property_change",
now - 100));
@@ -125,7 +130,7 @@ class ListSmbTest {
testRunner.setProperty(LISTING_STRATEGY, "timestamps");
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms");
- final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
+ final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSmbClientService(testRunner);
setTime(1000L);
mockSmbFolders(mockNifiSmbClientService);
testRunner.run();
@@ -168,7 +173,7 @@ class ListSmbTest {
testRunner.addControllerService("cacheService", cacheService);
testRunner.setProperty(TRACKING_STATE_CACHE, "cacheService");
testRunner.enableControllerService(cacheService);
- final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
+ final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSmbClientService(testRunner);
setTime(1000L);
mockSmbFolders(mockNifiSmbClientService,
listableEntity("first", 1000)
@@ -194,7 +199,7 @@ class ListSmbTest {
testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms");
- final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
+ final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSmbClientService(testRunner);
setTime(1000L);
mockSmbFolders(mockNifiSmbClientService,
listableEntity("first", 1000)
@@ -214,7 +219,7 @@ class ListSmbTest {
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
testRunner.setProperty(MINIMUM_AGE, "10 ms");
testRunner.setProperty(MAXIMUM_AGE, "30 ms");
- final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
+ final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSmbClientService(testRunner);
setTime(1000L);
mockSmbFolders(mockNifiSmbClientService,
listableEntity("too_young", 1000),
@@ -231,7 +236,7 @@ class ListSmbTest {
final TestRunner testRunner = newTestRunner(ListSmb.class);
testRunner.setProperty(LISTING_STRATEGY, "timestamps");
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
- final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
+ final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSmbClientService(testRunner);
when(mockNifiSmbClientService.listFiles(anyString())).thenThrow(new
RuntimeException("test exception"));
testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size());
@@ -241,7 +246,7 @@ class ListSmbTest {
@Test
public void shouldFormatRemotePathProperly() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
- final SmbClientProviderService clientProviderService =
mockSmbConnectionPoolService();
+ final SmbClientProviderService clientProviderService =
mockSmbClientProviderService();
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE,
CLIENT_SERVICE_PROVIDER_ID);
testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID,
clientProviderService);
when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://hostname:445/share"));
@@ -260,19 +265,50 @@ class ListSmbTest {
}
- private SmbClientProviderService mockSmbConnectionPoolService() {
+ @Test
+ void testInitialListingTimestampValidEpoch() throws Exception {
+ testInitialListingTimestamp("123456789", true);
+ }
+
+ @Test
+ void testInitialListingTimestampValidUTCTime() throws Exception {
+ testInitialListingTimestamp("2025-04-03T16:16:54Z", true);
+ }
+
+ @Test
+ void testInitialListingTimestampInvalidTime() throws Exception {
+ testInitialListingTimestamp("2025-04-03 16:16:54", false);
+ }
+
+ private void testInitialListingTimestamp(String propertyValue, boolean
shouldBeValid) throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+
+ configureTestRunnerWithMockedSmbClientService(testRunner);
+
+ testRunner.setProperty(SMB_LISTING_STRATEGY, BY_TIMESTAMPS);
+ testRunner.setProperty(INITIAL_LISTING_STRATEGY,
InitialListingStrategy.FROM_TIMESTAMP);
+ testRunner.setProperty(INITIAL_LISTING_TIMESTAMP, propertyValue);
+
+ if (shouldBeValid) {
+ testRunner.assertValid();
+ } else {
+ testRunner.assertNotValid();
+ }
+ }
+
+ private SmbClientProviderService mockSmbClientProviderService() {
final SmbClientProviderService clientProviderService =
mock(SmbClientProviderService.class);
when(clientProviderService.getIdentifier()).thenReturn(CLIENT_SERVICE_PROVIDER_ID);
when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share"));
return clientProviderService;
}
- private SmbClientService
configureTestRunnerWithMockedSambaClient(TestRunner testRunner)
+ private SmbClientService
configureTestRunnerWithMockedSmbClientService(TestRunner testRunner)
throws Exception {
final SmbClientService mockNifiSmbClientService =
mock(SmbClientService.class);
testRunner.setProperty(DIRECTORY, "testDirectory");
- final SmbClientProviderService clientProviderService =
mockSmbConnectionPoolService();
+ final SmbClientProviderService clientProviderService =
mockSmbClientProviderService();
when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService);
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE,
CLIENT_SERVICE_PROVIDER_ID);
testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID,
clientProviderService);