This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 88f205edc6 NIFI-14443: Added FlowFile Expression Language support for
Hostname and Share properties in PutSmbFile
88f205edc6 is described below
commit 88f205edc627a8a736c4654cd87d62038fd37fed
Author: Mark Bathori <[email protected]>
AuthorDate: Tue Nov 18 11:48:46 2025 +0100
NIFI-14443: Added FlowFile Expression Language support for Hostname and
Share properties in PutSmbFile
This closes #10541.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../org/apache/nifi/processors/smb/PutSmbFile.java | 11 +--
.../smb/util/HostnameAndShareFlowFileFilter.java | 77 +++++++++++++++++++++
.../apache/nifi/processors/smb/PutSmbFileTest.java | 79 ++++++++++++++++++++++
3 files changed, 163 insertions(+), 4 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
index 88b0e89a2f..027f6dee26 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
@@ -52,6 +52,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.smb.util.HostnameAndShareFlowFileFilter;
import java.io.OutputStream;
import java.net.URI;
@@ -95,6 +96,7 @@ public class PutSmbFile extends AbstractProcessor {
.description("The network host to which files should be written.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor SHARE = new
PropertyDescriptor.Builder()
.name("Share")
@@ -102,6 +104,7 @@ public class PutSmbFile extends AbstractProcessor {
"after the hostname: \\\\hostname\\[share]\\dir1\\dir2")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor DIRECTORY = new
PropertyDescriptor.Builder()
.name("Directory")
@@ -285,16 +288,16 @@ public class PutSmbFile extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
- List<FlowFile> flowFiles = session.get(batchSize);
+ final HostnameAndShareFlowFileFilter flowFileFilter = new
HostnameAndShareFlowFileFilter(context, batchSize);
+ final List<FlowFile> flowFiles = session.get(flowFileFilter);
if ( flowFiles.isEmpty() ) {
return;
}
final ComponentLog logger = getLogger();
logger.debug("Processing next {} flowfiles", flowFiles.size());
- final String hostname = context.getProperty(HOSTNAME).getValue();
- final String shareName = context.getProperty(SHARE).getValue();
-
+ final String hostname = flowFileFilter.getHostName();
+ final String shareName = flowFileFilter.getShare();
final String domain = context.getProperty(DOMAIN).getValue();
final String username = context.getProperty(USERNAME).getValue();
String password = context.getProperty(PASSWORD).getValue();
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/HostnameAndShareFlowFileFilter.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/HostnameAndShareFlowFileFilter.java
new file mode 100644
index 0000000000..8473e8edab
--- /dev/null
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/HostnameAndShareFlowFileFilter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb.util;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.processor.ProcessContext;
+
+import static
org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult.REJECT_AND_TERMINATE;
+import static org.apache.nifi.processors.smb.PutSmbFile.HOSTNAME;
+import static org.apache.nifi.processors.smb.PutSmbFile.SHARE;
+
+public class HostnameAndShareFlowFileFilter implements FlowFileFilter {
+
+ private final ProcessContext context;
+ private final int batchSize;
+
+ private HostSharePair selectedHostSharePair;
+ private int count = 0;
+
+ public HostnameAndShareFlowFileFilter(ProcessContext context, int
batchSize) {
+ this.context = context;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public FlowFileFilterResult filter(FlowFile flowFile) {
+ final HostSharePair hostSharePair = getFlowFileHostSharePair(flowFile);
+
+ if (selectedHostSharePair == null) {
+ selectedHostSharePair = hostSharePair;
+ }
+
+ if (count >= batchSize) {
+ return REJECT_AND_TERMINATE;
+ }
+
+ if (selectedHostSharePair.hostName().equals(hostSharePair.hostName())
&& selectedHostSharePair.share().equals(hostSharePair.share())) {
+ count += 1;
+ return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
+ } else {
+ return FlowFileFilterResult.REJECT_AND_CONTINUE;
+ }
+ }
+
+ private HostSharePair getFlowFileHostSharePair(final FlowFile flowFile) {
+ final String hostName =
context.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String share =
context.getProperty(SHARE).evaluateAttributeExpressions(flowFile).getValue();
+
+ return new HostSharePair(hostName, share);
+ }
+
+ public String getHostName() {
+ return selectedHostSharePair.hostName();
+ }
+
+ public String getShare() {
+ return selectedHostSharePair.share();
+ }
+
+ record HostSharePair(String hostName, String share) {
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
index 572c9888f7..a5eb1b05f8 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/PutSmbFileTest.java
@@ -28,8 +28,16 @@ import com.hierynomus.smbj.share.DiskShare;
import com.hierynomus.smbj.share.File;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
@@ -71,6 +79,8 @@ public class PutSmbFileTest {
private final static String USERNAME = "myusername";
private final static String PASSWORD = "mypassword";
+ private static final AtomicInteger FLOWFILE_ID_COUNTER = new
AtomicInteger(0);
+
@Captor
private ArgumentCaptor<Set<SMB2ShareAccess>> shareAccessSet;
@Captor
@@ -142,6 +152,17 @@ public class PutSmbFileTest {
return shareAccessSet.getValue();
}
+ private List<MockFlowFile> generateFlowFile(int numberOfFlowFiles,
Map<String, String> attributes) {
+ final List<MockFlowFile> result = new ArrayList<>();
+ for (int i = 0; i < numberOfFlowFiles; i++) {
+ final MockFlowFile flowFile = new
MockFlowFile(FLOWFILE_ID_COUNTER.incrementAndGet());
+ flowFile.putAttributes(attributes);
+ result.add(flowFile);
+ }
+
+ return result;
+ }
+
private AutoCloseable mockCloseable;
@BeforeEach
@@ -163,6 +184,64 @@ public class PutSmbFileTest {
}
}
+ @Test
+ public void testHostnameAndShareEL() {
+ testRunner.setProperty(PutSmbFile.HOSTNAME, "${smb.hostname}");
+ testRunner.setProperty(PutSmbFile.SHARE, "${smb.share}");
+ testRunner.setProperty(PutSmbFile.BATCH_SIZE, "20");
+
+ // Add 10 FlowFiles with the same hostname and share property values
+ final Map<String, String> attributes1 = new HashMap<>(Map.of(
+ "smb.hostname", "test-host-1",
+ "smb.share", "test-share-1"
+ ));
+ final List<MockFlowFile> flowFiles1 = generateFlowFile(10,
attributes1);
+ testRunner.enqueue(flowFiles1.toArray(new FlowFile[0]));
+
+ // Add 20 FlowFiles with a different hostname and share property value
than the first 10
+ final Map<String, String> attributes2 = new HashMap<>(Map.of(
+ "smb.hostname", "test-host-2",
+ "smb.share", "test-share-2"
+ ));
+ final List<MockFlowFile> flowFiles2 = generateFlowFile(20,
attributes2);
+ testRunner.enqueue(flowFiles2.toArray(new FlowFile[0]));
+
+ //trigger the processor only once
+ testRunner.run(1);
+
+ // Since 10 FlowFiles share the same hostname and share as the first
processed FlowFile, 20 FlowFiles should remain in the queue.
+ assertEquals(20, testRunner.getQueueSize().getObjectCount());
+ }
+
+ @Test
+ public void
testHostnameAndShareELWhenBatchsizeIsLowerThanAcceptableFlowFiles() {
+ testRunner.setProperty(PutSmbFile.HOSTNAME, "${smb.hostname}");
+ testRunner.setProperty(PutSmbFile.SHARE, "${smb.share}");
+ testRunner.setProperty(PutSmbFile.BATCH_SIZE, "10");
+
+ // Add 20 FlowFiles with the same hostname and share property values
+ final Map<String, String> attributes1 = new HashMap<>(Map.of(
+ "smb.hostname", "test-host-1",
+ "smb.share", "test-share-1"
+ ));
+ final List<MockFlowFile> flowFiles1 = generateFlowFile(20,
attributes1);
+ testRunner.enqueue(flowFiles1.toArray(new FlowFile[0]));
+
+ // Add 20 FlowFiles with a different hostname and share property value
than the first 20
+ final Map<String, String> attributes2 = new HashMap<>(Map.of(
+ "smb.hostname", "test-host-2",
+ "smb.share", "test-share-2"
+ ));
+ final List<MockFlowFile> flowFiles2 = generateFlowFile(20,
attributes2);
+ testRunner.enqueue(flowFiles2.toArray(new FlowFile[0]));
+
+ //trigger the processor only once
+ testRunner.run(1);
+
+ // 20 FlowFiles share the same hostname and share as the first
processed FlowFile, but since the batch size is 10, 30 FlowFiles should remain
in the queue
+ assertEquals(30, testRunner.getQueueSize().getObjectCount());
+ }
+
@Test
public void testNormalAuth() throws IOException {
testRunner.enqueue("data");