This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 88f205edc6 NIFI-14443: Added FlowFile Expression Language support for 
Hostname and Share properties in PutSmbFile
88f205edc6 is described below

commit 88f205edc627a8a736c4654cd87d62038fd37fed
Author: Mark Bathori <[email protected]>
AuthorDate: Tue Nov 18 11:48:46 2025 +0100

    NIFI-14443: Added FlowFile Expression Language support for Hostname and 
Share properties in PutSmbFile
    
    This closes #10541.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../org/apache/nifi/processors/smb/PutSmbFile.java | 11 +--
 .../smb/util/HostnameAndShareFlowFileFilter.java   | 77 +++++++++++++++++++++
 .../apache/nifi/processors/smb/PutSmbFileTest.java | 79 ++++++++++++++++++++++
 3 files changed, 163 insertions(+), 4 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
index 88b0e89a2f..027f6dee26 100644
--- 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
+++ 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
@@ -52,6 +52,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.smb.util.HostnameAndShareFlowFileFilter;
 
 import java.io.OutputStream;
 import java.net.URI;
@@ -95,6 +96,7 @@ public class PutSmbFile extends AbstractProcessor {
             .description("The network host to which files should be written.")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
     public static final PropertyDescriptor SHARE = new 
PropertyDescriptor.Builder()
             .name("Share")
@@ -102,6 +104,7 @@ public class PutSmbFile extends AbstractProcessor {
             "after the hostname: \\\\hostname\\[share]\\dir1\\dir2")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
     public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
             .name("Directory")
@@ -285,16 +288,16 @@ public class PutSmbFile extends AbstractProcessor {
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
-        List<FlowFile> flowFiles = session.get(batchSize);
+        final HostnameAndShareFlowFileFilter flowFileFilter = new 
HostnameAndShareFlowFileFilter(context, batchSize);
+        final List<FlowFile> flowFiles = session.get(flowFileFilter);
         if ( flowFiles.isEmpty() ) {
             return;
         }
         final ComponentLog logger = getLogger();
         logger.debug("Processing next {} flowfiles", flowFiles.size());
 
-        final String hostname = context.getProperty(HOSTNAME).getValue();
-        final String shareName = context.getProperty(SHARE).getValue();
-
+        final String hostname = flowFileFilter.getHostName();
+        final String shareName = flowFileFilter.getShare();
         final String domain = context.getProperty(DOMAIN).getValue();
         final String username = context.getProperty(USERNAME).getValue();
         String password = context.getProperty(PASSWORD).getValue();
diff --git 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/HostnameAndShareFlowFileFilter.java
 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/HostnameAndShareFlowFileFilter.java
new file mode 100644
index 0000000000..8473e8edab
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/HostnameAndShareFlowFileFilter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb.util;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.ProcessContext;
+
+import static 
org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE;
+import static org.apache.nifi.processors.smb.PutSmbFile.HOSTNAME;
+import static org.apache.nifi.processors.smb.PutSmbFile.SHARE;
+
+public class HostnameAndShareFlowFileFilter implements FlowFileFilter {
+
+    private final ProcessContext context;
+    private final int batchSize;
+
+    private HostSharePair selectedHostSharePair;
+    private int count = 0;
+
+    public HostnameAndShareFlowFileFilter(ProcessContext context, int 
batchSize) {
+        this.context = context;
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public FlowFileFilterResult filter(FlowFile flowFile) {
+        final HostSharePair hostSharePair = getFlowFileHostSharePair(flowFile);
+
+        if (selectedHostSharePair == null) {
+            selectedHostSharePair = hostSharePair;
+        }
+
+        if (count >= batchSize) {
+            return REJECT_AND_TERMINATE;
+        }
+
+        if (selectedHostSharePair.hostName().equals(hostSharePair.hostName()) 
&& selectedHostSharePair.share().equals(hostSharePair.share())) {
+            count += 1;
+            return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+        } else {
+            return FlowFileFilterResult.REJECT_AND_CONTINUE;
+        }
+    }
+
+    private HostSharePair getFlowFileHostSharePair(final FlowFile flowFile) {
+        final String hostName = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String share = 
context.getProperty(SHARE).evaluateAttributeExpressions(flowFile).getValue();
+
+        return new HostSharePair(hostName, share);
+    }
+
+    public String getHostName() {
+        return selectedHostSharePair.hostName();
+    }
+
+    public String getShare() {
+        return selectedHostSharePair.share();
+    }
+
+    record HostSharePair(String hostName, String share) {
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
index 572c9888f7..a5eb1b05f8 100644
--- 
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
+++ 
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
@@ -28,8 +28,16 @@ import com.hierynomus.smbj.share.DiskShare;
 import com.hierynomus.smbj.share.File;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.AfterEach;
@@ -71,6 +79,8 @@ public class PutSmbFileTest {
     private final static String USERNAME = "myusername";
     private final static String PASSWORD = "mypassword";
 
+    private static final AtomicInteger FLOWFILE_ID_COUNTER = new 
AtomicInteger(0);
+
     @Captor
     private ArgumentCaptor<Set<SMB2ShareAccess>> shareAccessSet;
     @Captor
@@ -142,6 +152,17 @@ public class PutSmbFileTest {
         return shareAccessSet.getValue();
     }
 
+    private List<MockFlowFile> generateFlowFile(int numberOfFlowFiles, 
Map<String, String> attributes) {
+        final List<MockFlowFile> result = new ArrayList<>();
+        for (int i = 0; i < numberOfFlowFiles; i++) {
+            final MockFlowFile flowFile = new 
MockFlowFile(FLOWFILE_ID_COUNTER.incrementAndGet());
+            flowFile.putAttributes(attributes);
+            result.add(flowFile);
+        }
+
+        return result;
+    }
+
     private AutoCloseable mockCloseable;
 
     @BeforeEach
@@ -163,6 +184,64 @@ public class PutSmbFileTest {
         }
     }
 
+    @Test
+    public void testHostnameAndShareEL() {
+        testRunner.setProperty(PutSmbFile.HOSTNAME, "${smb.hostname}");
+        testRunner.setProperty(PutSmbFile.SHARE, "${smb.share}");
+        testRunner.setProperty(PutSmbFile.BATCH_SIZE, "20");
+
+        // Add 10 FlowFiles with the same hostname and share property values
+        final Map<String, String> attributes1 = new HashMap<>(Map.of(
+                "smb.hostname", "test-host-1",
+                "smb.share", "test-share-1"
+        ));
+        final List<MockFlowFile> flowFiles1 = generateFlowFile(10, 
attributes1);
+        testRunner.enqueue(flowFiles1.toArray(new FlowFile[0]));
+
+        // Add 20 FlowFiles with a different hostname and share property value 
than the first 10
+        final Map<String, String> attributes2 = new HashMap<>(Map.of(
+                "smb.hostname", "test-host-2",
+                "smb.share", "test-share-2"
+        ));
+        final List<MockFlowFile> flowFiles2 = generateFlowFile(20, 
attributes2);
+        testRunner.enqueue(flowFiles2.toArray(new FlowFile[0]));
+
+        //trigger the processor only once
+        testRunner.run(1);
+
+        // Since 10 FlowFiles share the same hostname and share as the first 
processed FlowFile, 20 FlowFiles should remain in the queue.
+        assertEquals(20, testRunner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void 
testHostnameAndShareELWhenBatchsizeIsLowerThanAcceptableFlowFiles() {
+        testRunner.setProperty(PutSmbFile.HOSTNAME, "${smb.hostname}");
+        testRunner.setProperty(PutSmbFile.SHARE, "${smb.share}");
+        testRunner.setProperty(PutSmbFile.BATCH_SIZE, "10");
+
+        // Add 20 FlowFiles with the same hostname and share property values
+        final Map<String, String> attributes1 = new HashMap<>(Map.of(
+                "smb.hostname", "test-host-1",
+                "smb.share", "test-share-1"
+        ));
+        final List<MockFlowFile> flowFiles1 = generateFlowFile(20, 
attributes1);
+        testRunner.enqueue(flowFiles1.toArray(new FlowFile[0]));
+
+        // Add 20 FlowFiles with a different hostname and share property value 
than the first 20
+        final Map<String, String> attributes2 = new HashMap<>(Map.of(
+                "smb.hostname", "test-host-2",
+                "smb.share", "test-share-2"
+        ));
+        final List<MockFlowFile> flowFiles2 = generateFlowFile(20, 
attributes2);
+        testRunner.enqueue(flowFiles2.toArray(new FlowFile[0]));
+
+        //trigger the processor only once
+        testRunner.run(1);
+
+        // 20 FlowFiles share the same hostname and share as the first 
processed FlowFile, but since the batch size is 10, 30 FlowFiles should remain 
in the queue
+        assertEquals(30, testRunner.getQueueSize().getObjectCount());
+    }
+
     @Test
     public void testNormalAuth() throws IOException {
         testRunner.enqueue("data");

Reply via email to