This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5808ed2bf7 NIFI-14429 Added Initial Listing Strategy in ListSmb
5808ed2bf7 is described below

commit 5808ed2bf7dca56a5a6d4cd53b22d1529776940b
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Thu Apr 3 20:00:46 2025 +0200

    NIFI-14429 Added Initial Listing Strategy in ListSmb
    
    Also added dependsOn() to Entity Tracking* properties
    
    This closes #9845.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../org/apache/nifi/processors/smb/ListSmb.java    | 112 +++++++++++++++++++--
 .../smb/util/InitialListingStrategy.java           |  49 +++++++++
 .../org/apache/nifi/processors/smb/ListSmbIT.java  |  50 +++++++++
 .../apache/nifi/processors/smb/ListSmbTest.java    |  56 +++++++++--
 4 files changed, 251 insertions(+), 16 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
index a95c9da198..371c974ea9 100644
--- 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
+++ 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
@@ -40,7 +40,10 @@ import static 
org.apache.nifi.services.smb.SmbListableEntity.SIZE;
 
 import java.io.IOException;
 import java.net.URI;
+import java.time.Instant;
 import java.time.LocalDateTime;
+import java.time.format.DateTimeParseException;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -59,6 +62,7 @@ import 
org.apache.nifi.annotation.configuration.DefaultSchedule;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.PropertyValue;
@@ -69,8 +73,10 @@ import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.list.AbstractListProcessor;
 import org.apache.nifi.processor.util.list.ListedEntityTracker;
+import org.apache.nifi.processors.smb.util.InitialListingStrategy;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.services.smb.SmbClientProviderService;
@@ -82,8 +88,8 @@ import org.apache.nifi.services.smb.SmbListableEntity;
 @Tags({"samba, smb, cifs, files", "list"})
 @SeeAlso({PutSmbFile.class, GetSmbFile.class, FetchSmb.class})
 @CapabilityDescription("Lists concrete files shared via SMB protocol. " +
-        "Each listed file may result in one flowfile, the metadata being 
written as flowfile attributes. " +
-        "Or - in case the 'Record Writer' property is set - the entire result 
is written as records to a single flowfile. "
+        "Each listed file may result in one FlowFile, the metadata being 
written as FlowFile attributes. " +
+        "Or - in case the 'Record Writer' property is set - the entire result 
is written as records to a single FlowFile. "
         +
         "This Processor is designed to run on Primary Node only in a cluster. 
If the primary node changes, the new Primary Node will pick up where the "
         +
@@ -169,6 +175,26 @@ public class ListSmb extends 
AbstractListProcessor<SmbListableEntity> {
             .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS)
             .build();
 
+    public static final PropertyDescriptor INITIAL_LISTING_STRATEGY = new 
Builder()
+            .name("initial-listing-strategy")
+            .displayName("Initial Listing Strategy")
+            .description("Specifies how to handle existing files on the SMB 
share when the processor is started for the first time (or its state has been 
cleared).")
+            .required(true)
+            .allowableValues(InitialListingStrategy.class)
+            .defaultValue(InitialListingStrategy.ALL_FILES.getValue())
+            .dependsOn(SMB_LISTING_STRATEGY, BY_TIMESTAMPS)
+            .build();
+
+    public static final PropertyDescriptor INITIAL_LISTING_TIMESTAMP = new 
Builder()
+            .name("initial-listing-timestamp")
+            .displayName("Initial Listing Timestamp")
+            .description("The timestamp from which the files will be listed 
when the processor is started for the first time (or its state has been 
cleared). " +
+                    "The value can be specified as an epoch timestamp in 
milliseconds or as a UTC datetime in a format such as 2025-02-01T00:00:00Z")
+            .required(true)
+            .dependsOn(INITIAL_LISTING_STRATEGY, 
InitialListingStrategy.FROM_TIMESTAMP)
+            .addValidator(NON_BLANK_VALIDATOR)
+            .build();
+
     public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new 
Builder()
             .name("smb-client-provider-service")
             .displayName("SMB Client Provider Service")
@@ -189,9 +215,26 @@ public class ListSmb extends 
AbstractListProcessor<SmbListableEntity> {
             .addValidator(new MustNotContainDirectorySeparatorsValidator())
             .build();
 
+    public static final PropertyDescriptor TRACKING_STATE_CACHE = new Builder()
+            .fromPropertyDescriptor(ListedEntityTracker.TRACKING_STATE_CACHE)
+            .dependsOn(SMB_LISTING_STRATEGY, BY_ENTITIES)
+            .build();
+
+    public static final PropertyDescriptor TRACKING_TIME_WINDOW = new Builder()
+            .fromPropertyDescriptor(ListedEntityTracker.TRACKING_TIME_WINDOW)
+            .dependsOn(SMB_LISTING_STRATEGY, BY_ENTITIES)
+            .build();
+
+    public static final PropertyDescriptor INITIAL_LISTING_TARGET = new 
Builder()
+            .fromPropertyDescriptor(ListedEntityTracker.INITIAL_LISTING_TARGET)
+            .dependsOn(SMB_LISTING_STRATEGY, BY_ENTITIES)
+            .build();
+
     private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
             SMB_CLIENT_PROVIDER_SERVICE,
             SMB_LISTING_STRATEGY,
+            INITIAL_LISTING_STRATEGY,
+            INITIAL_LISTING_TIMESTAMP,
             DIRECTORY,
             FILE_NAME_SUFFIX_FILTER,
             AbstractListProcessor.RECORD_WRITER,
@@ -200,16 +243,37 @@ public class ListSmb extends 
AbstractListProcessor<SmbListableEntity> {
             MINIMUM_SIZE,
             MAXIMUM_SIZE,
             AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION,
-            ListedEntityTracker.TRACKING_STATE_CACHE,
-            ListedEntityTracker.TRACKING_TIME_WINDOW,
-            ListedEntityTracker.INITIAL_LISTING_TARGET
+            TRACKING_STATE_CACHE,
+            TRACKING_TIME_WINDOW,
+            INITIAL_LISTING_TARGET
     );
 
+    private volatile Long initialListingTimestamp;
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         return PROPERTY_DESCRIPTORS;
     }
 
+    @Override
+    protected void customValidate(ValidationContext validationContext, 
Collection<ValidationResult> validationResults) {
+        try {
+            getInitialListingTimestamp(validationContext);
+        } catch (InvalidTimestampException ite) {
+            validationResults.add(new ValidationResult.Builder()
+                    .subject(INITIAL_LISTING_TIMESTAMP.getDisplayName())
+                    .explanation(ite.getMessage())
+                    .valid(false)
+                    .build());
+        }
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        boolean isStateEmpty = 
context.getStateManager().getState(getStateScope(context)).toMap().isEmpty();
+        initialListingTimestamp = isStateEmpty ? 
getInitialListingTimestamp(context) : null;
+    }
+
     @Override
     protected Map<String, String> createAttributes(SmbListableEntity entity, 
ProcessContext context) {
         final Map<String, String> attributes = new TreeMap<>();
@@ -324,6 +388,10 @@ public class ListSmb extends 
AbstractListProcessor<SmbListableEntity> {
             filter = filter.and(entity -> entity.getLastModifiedTime() >= 
minTimestampOrNull);
         }
 
+        if (initialListingTimestamp != null) {
+            filter = filter.and(entity -> entity.getLastModifiedTime() >= 
initialListingTimestamp);
+        }
+
         if (minimumSizeOrNull != null) {
             filter = filter.and(entity -> entity.getSize() >= 
minimumSizeOrNull);
         }
@@ -348,7 +416,7 @@ public class ListSmb extends 
AbstractListProcessor<SmbListableEntity> {
             try {
                 clientService.close();
             } catch (Exception e) {
-                throw new RuntimeException("Could not close SMB client", e);
+                throw new ProcessException("Could not close SMB client", e);
             }
         });
     }
@@ -359,6 +427,32 @@ public class ListSmb extends 
AbstractListProcessor<SmbListableEntity> {
         return "/".equals(directory) ? "" : directory;
     }
 
+    private Long getInitialListingTimestamp(PropertyContext context) {
+        final String listingStrategy = 
context.getProperty(SMB_LISTING_STRATEGY).getValue();
+
+        if (BY_TIMESTAMPS.getValue().equals(listingStrategy)) {
+            final InitialListingStrategy initialListingStrategy = 
context.getProperty(INITIAL_LISTING_STRATEGY).asAllowableValue(InitialListingStrategy.class);
+
+            if (InitialListingStrategy.NEW_FILES == initialListingStrategy) {
+                return Instant.now().toEpochMilli();
+            } else if (InitialListingStrategy.FROM_TIMESTAMP == 
initialListingStrategy) {
+                final String initialListingTimestamp = 
context.getProperty(INITIAL_LISTING_TIMESTAMP).getValue();
+
+                try {
+                    return 
Instant.parse(initialListingTimestamp).toEpochMilli();
+                } catch (DateTimeParseException dtpe) {
+                    try {
+                        return Long.parseLong(initialListingTimestamp);
+                    } catch (NumberFormatException nfe) {
+                        throw new 
InvalidTimestampException(initialListingTimestamp);
+                    }
+                }
+            }
+        }
+
+        return null;
+    }
+
     private static class MustNotContainDirectorySeparatorsValidator implements 
Validator {
 
         @Override
@@ -373,4 +467,10 @@ public class ListSmb extends 
AbstractListProcessor<SmbListableEntity> {
 
     }
 
+    private static class InvalidTimestampException extends RuntimeException {
+        InvalidTimestampException(String timestamp) {
+            super(String.format("'%s' is neither an epoch timestamp nor a UTC 
datetime.", timestamp));
+        }
+    }
+
 }
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/InitialListingStrategy.java
 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/InitialListingStrategy.java
new file mode 100644
index 0000000000..a2d7dbb49b
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/InitialListingStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb.util;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum InitialListingStrategy implements DescribedValue {
+
+    ALL_FILES("All Files", "Lists all existing files"),
+    NEW_FILES("New Files", "Lists only newly created files"),
+    FROM_TIMESTAMP("From Timestamp", "Lists only files created after the 
specified timestamp (inclusively)");
+
+    private final String displayName;
+    private final String description;
+
+    InitialListingStrategy(String displayName, String description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
index f059502353..9a132aed0c 100644
--- 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
+++ 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
@@ -18,11 +18,14 @@ package org.apache.nifi.processors.smb;
 
 import static java.util.Arrays.asList;
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.BY_TIMESTAMPS;
 import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY;
 import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.RECORD_WRITER;
 import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
 import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY;
 import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER;
+import static org.apache.nifi.processors.smb.ListSmb.INITIAL_LISTING_STRATEGY;
+import static org.apache.nifi.processors.smb.ListSmb.INITIAL_LISTING_TIMESTAMP;
 import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE;
 import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_SIZE;
 import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
@@ -31,10 +34,13 @@ import static 
org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
 import static org.apache.nifi.util.TestRunners.newTestRunner;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+
+import org.apache.nifi.processors.smb.util.InitialListingStrategy;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.services.smb.SmbClientProviderService;
 import org.apache.nifi.services.smb.SmbjClientProviderService;
@@ -207,4 +213,48 @@ public class ListSmbIT extends SambaTestContainers {
         testRunner.disableControllerService(smbjClientProviderService);
     }
 
+    @Test
+    void testInitialListingStrategyAllFiles() throws Exception {
+        testInitialListingStrategy(InitialListingStrategy.ALL_FILES, 2);
+    }
+
+    @Test
+    void testInitialListingStrategyNewFiles() throws Exception {
+        testInitialListingStrategy(InitialListingStrategy.NEW_FILES, 0);
+    }
+
+    @Test
+    void testInitialListingStrategyFromTimestamp() throws Exception {
+        testInitialListingStrategy(InitialListingStrategy.FROM_TIMESTAMP, 1);
+    }
+
+    private void testInitialListingStrategy(InitialListingStrategy 
initialListingStrategy, int expectedCount) throws Exception {
+        final TestRunner testRunner = newTestRunner(ListSmb.class);
+        final SmbjClientProviderService smbjClientProviderService = 
configureSmbClient(testRunner, true);
+        testRunner.setProperty(MINIMUM_AGE, "0 ms");
+        testRunner.setProperty(LISTING_STRATEGY, BY_TIMESTAMPS);
+        testRunner.setProperty(INITIAL_LISTING_STRATEGY, 
initialListingStrategy);
+
+        writeFile("1.txt", generateContentWithSize(1));
+        Thread.sleep(100);
+
+        testRunner.setProperty(INITIAL_LISTING_TIMESTAMP, 
Instant.now().toString()); // ignored if initialListingStrategy is not 
FROM_TIMESTAMP
+
+        writeFile("2.txt", generateContentWithSize(1));
+        Thread.sleep(100);
+
+        testRunner.run(1, false, true);
+        testRunner.assertTransferCount(REL_SUCCESS, expectedCount);
+        testRunner.clearTransferState();
+
+        writeFile("3.txt", generateContentWithSize(1));
+        Thread.sleep(100);
+
+        testRunner.run(1, true, false);
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.clearTransferState();
+
+        testRunner.disableControllerService(smbjClientProviderService);
+    }
+
 }
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
index 621ec74209..a308856cef 100644
--- 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
+++ 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
@@ -19,15 +19,19 @@ package org.apache.nifi.processors.smb;
 import static java.util.Arrays.stream;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.BY_TIMESTAMPS;
 import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY;
 import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
 import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION;
 import static 
org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE;
 import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY;
 import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER;
+import static org.apache.nifi.processors.smb.ListSmb.INITIAL_LISTING_STRATEGY;
+import static org.apache.nifi.processors.smb.ListSmb.INITIAL_LISTING_TIMESTAMP;
 import static org.apache.nifi.processors.smb.ListSmb.MAXIMUM_AGE;
 import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE;
 import static 
org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.processors.smb.ListSmb.SMB_LISTING_STRATEGY;
 import static org.apache.nifi.util.TestRunners.newTestRunner;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
@@ -45,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.processor.util.list.ListedEntity;
+import org.apache.nifi.processors.smb.util.InitialListingStrategy;
 import org.apache.nifi.services.smb.SmbClientProviderService;
 import org.apache.nifi.services.smb.SmbClientService;
 import org.apache.nifi.services.smb.SmbListableEntity;
@@ -88,7 +93,7 @@ class ListSmbTest {
         testRunner.addControllerService("cacheService", cacheService);
         testRunner.setProperty(TRACKING_STATE_CACHE, "cacheService");
         testRunner.enableControllerService(cacheService);
-        final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSambaClient(testRunner);
+        final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSmbClientService(testRunner);
         long now = System.currentTimeMillis();
         mockSmbFolders(mockNifiSmbClientService,
                 listableEntity("should_list_this_again_after_property_change", 
now - 100));
@@ -125,7 +130,7 @@ class ListSmbTest {
         testRunner.setProperty(LISTING_STRATEGY, "timestamps");
         testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
         testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms");
-        final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSambaClient(testRunner);
+        final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSmbClientService(testRunner);
         setTime(1000L);
         mockSmbFolders(mockNifiSmbClientService);
         testRunner.run();
@@ -168,7 +173,7 @@ class ListSmbTest {
         testRunner.addControllerService("cacheService", cacheService);
         testRunner.setProperty(TRACKING_STATE_CACHE, "cacheService");
         testRunner.enableControllerService(cacheService);
-        final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSambaClient(testRunner);
+        final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSmbClientService(testRunner);
         setTime(1000L);
         mockSmbFolders(mockNifiSmbClientService,
                 listableEntity("first", 1000)
@@ -194,7 +199,7 @@ class ListSmbTest {
         testRunner.setProperty(LISTING_STRATEGY, "none");
         testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
         testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms");
-        final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSambaClient(testRunner);
+        final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSmbClientService(testRunner);
         setTime(1000L);
         mockSmbFolders(mockNifiSmbClientService,
                 listableEntity("first", 1000)
@@ -214,7 +219,7 @@ class ListSmbTest {
         testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
         testRunner.setProperty(MINIMUM_AGE, "10 ms");
         testRunner.setProperty(MAXIMUM_AGE, "30 ms");
-        final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSambaClient(testRunner);
+        final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSmbClientService(testRunner);
         setTime(1000L);
         mockSmbFolders(mockNifiSmbClientService,
                 listableEntity("too_young", 1000),
@@ -231,7 +236,7 @@ class ListSmbTest {
         final TestRunner testRunner = newTestRunner(ListSmb.class);
         testRunner.setProperty(LISTING_STRATEGY, "timestamps");
         testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
-        final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSambaClient(testRunner);
+        final SmbClientService mockNifiSmbClientService = 
configureTestRunnerWithMockedSmbClientService(testRunner);
         when(mockNifiSmbClientService.listFiles(anyString())).thenThrow(new 
RuntimeException("test exception"));
         testRunner.run();
         assertEquals(1, testRunner.getLogger().getErrorMessages().size());
@@ -241,7 +246,7 @@ class ListSmbTest {
     @Test
     public void shouldFormatRemotePathProperly() throws Exception {
         final TestRunner testRunner = newTestRunner(ListSmb.class);
-        final SmbClientProviderService clientProviderService = 
mockSmbConnectionPoolService();
+        final SmbClientProviderService clientProviderService = 
mockSmbClientProviderService();
         testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, 
CLIENT_SERVICE_PROVIDER_ID);
         testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID, 
clientProviderService);
         
when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://hostname:445/share"));
@@ -260,19 +265,50 @@ class ListSmbTest {
 
     }
 
-    private SmbClientProviderService mockSmbConnectionPoolService() {
+    @Test
+    void testInitialListingTimestampValidEpoch() throws Exception {
+        testInitialListingTimestamp("123456789", true);
+    }
+
+    @Test
+    void testInitialListingTimestampValidUTCTime() throws Exception {
+        testInitialListingTimestamp("2025-04-03T16:16:54Z", true);
+    }
+
+    @Test
+    void testInitialListingTimestampInvalidTime() throws Exception {
+        testInitialListingTimestamp("2025-04-03 16:16:54", false);
+    }
+
+    private void testInitialListingTimestamp(String propertyValue, boolean 
shouldBeValid) throws Exception {
+        final TestRunner testRunner = newTestRunner(ListSmb.class);
+
+        configureTestRunnerWithMockedSmbClientService(testRunner);
+
+        testRunner.setProperty(SMB_LISTING_STRATEGY, BY_TIMESTAMPS);
+        testRunner.setProperty(INITIAL_LISTING_STRATEGY, 
InitialListingStrategy.FROM_TIMESTAMP);
+        testRunner.setProperty(INITIAL_LISTING_TIMESTAMP, propertyValue);
+
+        if (shouldBeValid) {
+            testRunner.assertValid();
+        } else {
+            testRunner.assertNotValid();
+        }
+    }
+
+    private SmbClientProviderService mockSmbClientProviderService() {
         final SmbClientProviderService clientProviderService = 
mock(SmbClientProviderService.class);
         
when(clientProviderService.getIdentifier()).thenReturn(CLIENT_SERVICE_PROVIDER_ID);
         
when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share"));
         return clientProviderService;
     }
 
-    private SmbClientService 
configureTestRunnerWithMockedSambaClient(TestRunner testRunner)
+    private SmbClientService 
configureTestRunnerWithMockedSmbClientService(TestRunner testRunner)
             throws Exception {
         final SmbClientService mockNifiSmbClientService = 
mock(SmbClientService.class);
         testRunner.setProperty(DIRECTORY, "testDirectory");
 
-        final SmbClientProviderService clientProviderService = 
mockSmbConnectionPoolService();
+        final SmbClientProviderService clientProviderService = 
mockSmbClientProviderService();
         
when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService);
         testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, 
CLIENT_SERVICE_PROVIDER_ID);
         testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID, 
clientProviderService);

Reply via email to