This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 768f58da21 NIFI-10212 added ListSmb processor and
SmbConnectionPoolService
768f58da21 is described below
commit 768f58da218c5a41c71abcda53c8bf8e5ba9a051
Author: Gabor Kulik <[email protected]>
AuthorDate: Mon Jun 20 15:43:33 2022 +0200
NIFI-10212 added ListSmb processor and SmbConnectionPoolService
This closes #6192.
Signed-off-by: Tamas Palfy <[email protected]>
---
nifi-assembly/pom.xml | 12 +
.../nifi/util/StandardProcessorTestRunner.java | 2 +-
.../processor/util/list/AbstractListProcessor.java | 11 +-
.../pom.xml | 14 +-
.../nifi-smb-bundle/nifi-smb-client-api/pom.xml | 40 +++
.../services/smb/SmbClientProviderService.java | 39 +++
.../apache/nifi/services/smb/SmbClientService.java | 30 ++
.../nifi/services/smb/SmbListableEntity.java | 236 +++++++++++++
.../nifi-smb-bundle/nifi-smb-nar/pom.xml | 6 +
.../nifi-smb-bundle/nifi-smb-processors/pom.xml | 40 ++-
.../org/apache/nifi/processors/smb/ListSmb.java | 369 +++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 3 +-
.../org/apache/nifi/processors/smb/ListSmbIT.java | 297 +++++++++++++++++
.../apache/nifi/processors/smb/ListSmbTest.java | 322 ++++++++++++++++++
.../pom.xml | 10 +-
.../pom.xml | 23 +-
.../services/smb/SmbjClientProviderService.java | 174 ++++++++++
.../nifi/services/smb/SmbjClientService.java | 162 +++++++++
.../org.apache.nifi.controller.ControllerService} | 3 +-
.../apache/nifi/services/smb/NiFiSmbjClientIT.java | 175 ++++++++++
.../nifi/services/smb/NiFiSmbjClientTest.java | 80 +++++
nifi-nar-bundles/nifi-smb-bundle/pom.xml | 34 ++
nifi-registry/pom.xml | 1 -
pom.xml | 1 +
24 files changed, 2067 insertions(+), 17 deletions(-)
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 338e90c1c4..caf87d0a18 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -664,6 +664,18 @@ language governing permissions and limitations under the
License. -->
<version>1.18.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-smb-client-api-nar</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-smb-smbj-client-nar</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-windows-event-log-nar</artifactId>
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 922907c8dc..a92cb34575 100644
---
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -182,7 +182,7 @@ public class StandardProcessorTestRunner implements
TestRunner {
@Override
public void run(final int iterations, final boolean stopOnFinish, final
boolean initialize) {
- run(iterations, stopOnFinish, initialize, 5000);
+ run(iterations, stopOnFinish, initialize, 5000 + iterations *
runSchedule);
}
@Override
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
index 682450211e..30a0ca2baf 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
@@ -543,6 +543,10 @@ public abstract class AbstractListProcessor<T extends
ListableEntity> extends Ab
return System.currentTimeMillis();
}
+ protected long getCurrentNanoTime() {
+ return System.nanoTime();
+ }
+
public void listByNoTracking(final ProcessContext context, final
ProcessSession session) {
final List<T> entityList;
@@ -655,6 +659,7 @@ public abstract class AbstractListProcessor<T extends
ListableEntity> extends Ab
.computeIfAbsent(entity.getTimestamp(), __ -> new
ArrayList<>())
.add(entity)
);
+
if (getLogger().isTraceEnabled()) {
getLogger().trace("orderedEntries: " +
orderedEntries.values().stream()
@@ -744,8 +749,8 @@ public abstract class AbstractListProcessor<T extends
ListableEntity> extends Ab
}
final List<T> entityList;
- final long currentRunTimeNanos = System.nanoTime();
- final long currentRunTimeMillis = System.currentTimeMillis();
+ final long currentRunTimeNanos = getCurrentNanoTime();
+ final long currentRunTimeMillis = getCurrentTime();
try {
// track of when this last executed for consideration of the lag
nanos
entityList = performListing(context, minTimestampToListMillis,
ListingMode.EXECUTION);
@@ -1100,7 +1105,7 @@ public abstract class AbstractListProcessor<T extends
ListableEntity> extends Ab
}
protected ListedEntityTracker<T> createListedEntityTracker() {
- return new ListedEntityTracker<>(getIdentifier(), getLogger(),
getRecordSchema());
+ return new ListedEntityTracker<>(getIdentifier(), getLogger(),
this::getCurrentTime, getRecordSchema());
}
private void listByTrackingEntities(ProcessContext context, ProcessSession
session) throws ProcessException {
diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api-nar/pom.xml
similarity index 82%
copy from nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml
copy to nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api-nar/pom.xml
index 25255fd5af..925d9b93ee 100644
--- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api-nar/pom.xml
@@ -14,16 +14,16 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-smb-bundle</artifactId>
<version>1.18.0-SNAPSHOT</version>
</parent>
+ <modelVersion>4.0.0</modelVersion>
- <artifactId>nifi-smb-nar</artifactId>
+ <artifactId>nifi-smb-client-api-nar</artifactId>
<packaging>nar</packaging>
+
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
@@ -32,7 +32,13 @@
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-smb-processors</artifactId>
+ <artifactId>nifi-standard-services-api-nar</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-smb-client-api</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
</dependencies>
diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/pom.xml
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/pom.xml
new file mode 100644
index 0000000000..1c3a3b04b2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/pom.xml
@@ -0,0 +1,40 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-smb-bundle</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>nifi-smb-client-api</artifactId>
+ <packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-listed-entity</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientProviderService.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientProviderService.java
new file mode 100644
index 0000000000..3dc23f1aa0
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientProviderService.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.smb;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.nifi.controller.ControllerService;
+
+public interface SmbClientProviderService extends ControllerService {
+
+ /**
+ * Returns the identifier of the service location.
+ *
+ * @return the remote location
+ */
+ URI getServiceLocation();
+
+ /**
+ * Returns the smb client to use.
+ *
+ * @return the client.
+ */
+ SmbClientService getClient() throws IOException;
+
+}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
new file mode 100644
index 0000000000..4ecaf9a507
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.smb;
+
+import java.util.stream.Stream;
+
+/**
+ * Service abstraction for Server Message Block protocol operations.
+ */
+public interface SmbClientService extends AutoCloseable {
+
+ Stream<SmbListableEntity> listRemoteFiles(String path);
+
+ void createDirectory(String path);
+
+}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java
new file mode 100644
index 0000000000..da33602540
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.smb;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.nifi.processor.util.list.ListableEntity;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class SmbListableEntity implements ListableEntity {
+
+ private final String name;
+ private final String shortName;
+ private final String path;
+ private final long timestamp;
+ private final long creationTime;
+ private final long lastAccessTime;
+ private final long changeTime;
+ private final boolean directory;
+ private final long size;
+ private final long allocationSize;
+
+ private SmbListableEntity(String name, String shortName, String path, long
timestamp, long creationTime,
+ long lastAccessTime, long changeTime, boolean directory,
+ long size, long allocationSize) {
+ this.name = name;
+ this.shortName = shortName;
+ this.path = path;
+ this.timestamp = timestamp;
+ this.creationTime = creationTime;
+ this.lastAccessTime = lastAccessTime;
+ this.changeTime = changeTime;
+ this.directory = directory;
+ this.size = size;
+ this.allocationSize = allocationSize;
+ }
+
+ public static SimpleRecordSchema getRecordSchema() {
+ List<RecordField> fields = Arrays.asList(
+ new RecordField("filename",
RecordFieldType.STRING.getDataType(), false),
+ new RecordField("shortName",
RecordFieldType.STRING.getDataType(), false),
+ new RecordField("path", RecordFieldType.STRING.getDataType(),
false),
+ new RecordField("identifier",
RecordFieldType.STRING.getDataType(), false),
+ new RecordField("timestamp",
RecordFieldType.LONG.getDataType(), false),
+ new RecordField("creationTime",
RecordFieldType.LONG.getDataType(), false),
+ new RecordField("lastAccessTime",
RecordFieldType.LONG.getDataType(), false),
+ new RecordField("changeTime",
RecordFieldType.LONG.getDataType(), false),
+ new RecordField("size", RecordFieldType.LONG.getDataType(),
false),
+ new RecordField("allocationSize",
RecordFieldType.LONG.getDataType(), false)
+ );
+ return new SimpleRecordSchema(fields);
+ }
+
+ public static SmbListableEntityBuilder builder() {
+ return new SmbListableEntityBuilder();
+ }
+
+ public String getShortName() {
+ return shortName;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ public long getChangeTime() {
+ return changeTime;
+ }
+
+ public long getAllocationSize() {
+ return allocationSize;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public String getPathWithName() {
+ return path.isEmpty() ? name : path + "/" + name;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return getPathWithName();
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public long getSize() {
+ return size;
+ }
+
+ public boolean isDirectory() {
+ return directory;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SmbListableEntity that = (SmbListableEntity) o;
+ return getPathWithName().equals(that.getPathWithName());
+ }
+
+ @Override
+ public int hashCode() {
+ return getPathWithName().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getPathWithName() + " (last write: " + timestamp + " size: " +
size + ")";
+ }
+
+ @Override
+ public Record toRecord() {
+ final Map<String, Object> record = new TreeMap<>();
+ record.put("filename", getName());
+ record.put("shortName", getShortName());
+ record.put("path", path);
+ record.put("identifier", getPathWithName());
+ record.put("timestamp", getTimestamp());
+ record.put("creationTime", getCreationTime());
+ record.put("lastAccessTime", getLastAccessTime());
+ record.put("size", getSize());
+ record.put("allocationSize", getAllocationSize());
+ return new MapRecord(getRecordSchema(), record);
+ }
+
+ public static class SmbListableEntityBuilder {
+
+ private String name;
+ private String shortName;
+ private String path = "";
+ private long timestamp;
+ private long creationTime;
+ private long lastAccessTime;
+ private long changeTime;
+ private boolean directory = false;
+ private long size = 0;
+ private long allocationSize = 0;
+
+ public SmbListableEntityBuilder setName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public SmbListableEntityBuilder setShortName(String shortName) {
+ this.shortName = shortName;
+ return this;
+ }
+
+ public SmbListableEntityBuilder setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ public SmbListableEntityBuilder setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ public SmbListableEntityBuilder setCreationTime(long creationTime) {
+ this.creationTime = creationTime;
+ return this;
+ }
+
+ public SmbListableEntityBuilder setLastAccessTime(long lastAccessTime)
{
+ this.lastAccessTime = lastAccessTime;
+ return this;
+ }
+
+ public SmbListableEntityBuilder setChangeTime(long changeTime) {
+ this.changeTime = changeTime;
+ return this;
+ }
+
+ public SmbListableEntityBuilder setDirectory(boolean directory) {
+ this.directory = directory;
+ return this;
+ }
+
+ public SmbListableEntityBuilder setSize(long size) {
+ this.size = size;
+ return this;
+ }
+
+ public SmbListableEntityBuilder setAllocationSize(long allocationSize)
{
+ this.allocationSize = allocationSize;
+ return this;
+ }
+
+ public SmbListableEntity build() {
+ return new SmbListableEntity(name, shortName, path, timestamp,
creationTime, lastAccessTime, changeTime,
+ directory, size, allocationSize);
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml
index 25255fd5af..61c19970a9 100644
--- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml
@@ -35,5 +35,11 @@
<artifactId>nifi-smb-processors</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-smb-client-api-nar</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
</dependencies>
</project>
diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml
index 58b04c5898..de266368b2 100644
--- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml
@@ -26,10 +26,24 @@
<packaging>jar</packaging>
<dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-smb-client-api</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
@@ -38,7 +52,10 @@
<dependency>
<groupId>com.hierynomus</groupId>
<artifactId>smbj</artifactId>
- <version>0.10.0</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@@ -46,5 +63,26 @@
<version>1.18.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-smb-smbj-client</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
new file mode 100644
index 0000000000..e0029144f6
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.time.format.DateTimeFormatter.ISO_DATE_TIME;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Collections.unmodifiableMap;
+import static org.apache.nifi.components.state.Scope.CLUSTER;
+import static
org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
+import static
org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR;
+import static
org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
+import static
org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.list.AbstractListProcessor;
+import org.apache.nifi.processor.util.list.ListedEntityTracker;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbListableEntity;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@Tags({"samba, smb, cifs, files", "list"})
+@SeeAlso({PutSmbFile.class, GetSmbFile.class})
+@CapabilityDescription("Lists concrete files shared via SMB protocol. " +
+ "Each listed file may result in one flowfile, the metadata being
written as flowfile attributes. " +
+ "Or - in case the 'Record Writer' property is set - the entire result
is written as records to a single flowfile. "
+ +
+ "This Processor is designed to run on Primary Node only in a cluster.
If the primary node changes, the new Primary Node will pick up where the "
+ +
+ "previous node left off without duplicating all of the data.")
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@WritesAttributes({
+ @WritesAttribute(attribute = "filename", description = "The name of
the file that was read from filesystem."),
+ @WritesAttribute(attribute = "shortname", description = "The short
name of the file that was read from filesystem."),
+ @WritesAttribute(attribute = "path", description =
+ "The path is set to the relative path of the file's directory "
+ + "on filesystem compared to the Share and Input
Directory properties and the configured host "
+ + "and port inherited from the configured connection
pool controller service. For example, for "
+ + "a given remote location
smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from "
+ + "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file
then the path attribute will be set to \"sub/folder/file\"."),
+ @WritesAttribute(attribute = "absolute.path", description =
+ "The absolute.path is set to the absolute path of the file's
directory on the remote location. For example, "
+ + "given a remote location
smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listen from "
+ + "SHARE/DIRECTORY/sub/folder/file then the
absolute.path attribute will be set to "
+ + "SHARE/DIRECTORY/sub/folder/file."),
+ @WritesAttribute(attribute = "identifier", description =
+ "The identifier of the file. This equals to the path attribute
so two files with the same relative path "
+ + "coming from different file shares considered to be
identical."),
+ @WritesAttribute(attribute = "timestamp", description =
+ "The timestamp of when the file's content changed in the
filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
+ @WritesAttribute(attribute = "createTime", description =
+ "The timestamp of when the file was created in the filesystem
as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
+ @WritesAttribute(attribute = "lastAccessTime", description =
+ "The timestamp of when the file was accessed in the filesystem
as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
+ @WritesAttribute(attribute = "changeTime", description =
+ "The timestamp of when the file's attributes was changed in
the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
+ @WritesAttribute(attribute = "size", description = "The number of
bytes in the source file"),
+ @WritesAttribute(attribute = "allocationSize", description = "The
number of bytes allocated for the file on the server"),
+})
+@Stateful(scopes = {Scope.CLUSTER}, description =
+ "After performing a listing of files, the state of the previous
listing can be stored in order to list files "
+ + "continuously without duplication."
+)
+public class ListSmb extends AbstractListProcessor<SmbListableEntity> {
+
+ public static final PropertyDescriptor DIRECTORY = new
PropertyDescriptor.Builder()
+ .displayName("Input Directory")
+ .name("directory")
+ .description("The network folder from which to list files. This is
the remaining relative path " +
+ "after the share:
smb://HOSTNAME:PORT/SHARE/[DIRECTORY]/sub/directories. It is also possible "
+ + "to add subdirectories. The given path on the remote
file share must exist. "
+ + "This can be checked using verification. You may mix
Windows and Linux-style "
+ + "directory separators.")
+ .required(false)
+ .addValidator(NON_BLANK_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MINIMUM_AGE = new
PropertyDescriptor.Builder()
+ .displayName("Minimum File Age")
+ .name("min-file-age")
+ .description("The minimum age that a file must be in order to be
listed; any file younger than this "
+ + "amount of time will be ignored.")
+ .required(true)
+ .addValidator(TIME_PERIOD_VALIDATOR)
+ .defaultValue("5 secs")
+ .build();
+
+ public static final PropertyDescriptor MAXIMUM_AGE = new
PropertyDescriptor.Builder()
+ .displayName("Maximum File Age")
+ .name("max-file-age")
+ .description("Any file older than the given value will be omitted.
")
+ .required(false)
+ .addValidator(TIME_PERIOD_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MINIMUM_SIZE = new
PropertyDescriptor.Builder()
+ .displayName("Minimum File Size")
+ .name("min-file-size")
+ .description("Any file smaller than the given value will be
omitted.")
+ .required(false)
+ .addValidator(DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor MAXIMUM_SIZE = new
PropertyDescriptor.Builder()
+ .displayName("Maximum File Size")
+ .name("max-file-size")
+ .description("Any file larger than the given value will be
omitted.")
+ .required(false)
+ .addValidator(DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor SMB_LISTING_STRATEGY = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(LISTING_STRATEGY)
+ .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS)
+ .build();
+
+ public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new
Builder()
+ .name("smb-client-provider-service")
+ .displayName("SMB Client Provider Service")
+ .description("Specifies the SMB client provider to use for
creating SMB connections.")
+ .required(true)
+ .identifiesControllerService(SmbClientProviderService.class)
+ .build();
+
+ public static final PropertyDescriptor FILE_NAME_SUFFIX_FILTER = new
Builder()
+ .name("file-name-suffix-filter")
+ .displayName("File Name Suffix Filter")
+ .description("Files ending with the given suffix will be omitted.
Can be used to make sure that files "
+ + "that are still uploading are not listed multiple times,
by having those files have a suffix "
+ + "and remove the suffix once the upload finishes. This is
highly recommended when using "
+ + "'Tracking Entities' or 'Tracking Timestamps' listing
strategies.")
+ .required(false)
+ .addValidator(NON_EMPTY_VALIDATOR)
+ .addValidator(new MustNotContainDirectorySeparatorsValidator())
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES =
unmodifiableList(asList(
+ SMB_LISTING_STRATEGY,
+ SMB_CLIENT_PROVIDER_SERVICE,
+ DIRECTORY,
+ AbstractListProcessor.RECORD_WRITER,
+ FILE_NAME_SUFFIX_FILTER,
+ MINIMUM_AGE,
+ MAXIMUM_AGE,
+ MINIMUM_SIZE,
+ MAXIMUM_SIZE,
+ AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION,
+ ListedEntityTracker.TRACKING_STATE_CACHE,
+ ListedEntityTracker.TRACKING_TIME_WINDOW,
+ ListedEntityTracker.INITIAL_LISTING_TARGET
+ ));
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ protected Map<String, String> createAttributes(SmbListableEntity entity,
ProcessContext context) {
+ final Map<String, String> attributes = new TreeMap<>();
+ attributes.put("filename", entity.getName());
+ attributes.put("shortname", entity.getShortName());
+ attributes.put("path", entity.getPath());
+ attributes.put("absolute.path", entity.getPathWithName());
+ attributes.put("identifier", entity.getIdentifier());
+ attributes.put("timestamp", formatTimeStamp(entity.getTimestamp()));
+ attributes.put("creationTime",
formatTimeStamp(entity.getCreationTime()));
+ attributes.put("lastAccessTime",
formatTimeStamp(entity.getLastAccessTime()));
+ attributes.put("changeTime", formatTimeStamp(entity.getChangeTime()));
+ attributes.put("size", String.valueOf(entity.getSize()));
+ attributes.put("allocationSize",
String.valueOf(entity.getAllocationSize()));
+ return unmodifiableMap(attributes);
+ }
+
+ @Override
+ protected String getPath(ProcessContext context) {
+ final SmbClientProviderService clientProviderService =
+
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+ final URI serviceLocation = clientProviderService.getServiceLocation();
+ final String directory = getDirectory(context);
+ return String.format("%s/%s", serviceLocation.toString(),
directory.isEmpty() ? "" : directory + "/");
+ }
+
+ @Override
+ protected List<SmbListableEntity> performListing(ProcessContext context,
Long minimumTimestampOrNull,
+ ListingMode listingMode) throws IOException {
+
+ final Predicate<SmbListableEntity> fileFilter =
+ createFileFilter(context, minimumTimestampOrNull);
+
+ try (Stream<SmbListableEntity> listing = performListing(context)) {
+ final Iterator<SmbListableEntity> iterator = listing.iterator();
+ final List<SmbListableEntity> result = new LinkedList<>();
+ while (iterator.hasNext()) {
+ if (isExecutionStopped(listingMode)) {
+ return emptyList();
+ }
+ final SmbListableEntity entity = iterator.next();
+ if (fileFilter.test(entity)) {
+ result.add(entity);
+ }
+ }
+ return result;
+ } catch (Exception e) {
+ throw new IOException("Could not perform listing", e);
+ }
+ }
+
+ @Override
+ protected boolean isListingResetNecessary(PropertyDescriptor property) {
+ return asList(SMB_CLIENT_PROVIDER_SERVICE, DIRECTORY,
FILE_NAME_SUFFIX_FILTER).contains(property);
+ }
+
+ @Override
+ protected Scope getStateScope(PropertyContext context) {
+ return CLUSTER;
+ }
+
+ @Override
+ protected RecordSchema getRecordSchema() {
+ return SmbListableEntity.getRecordSchema();
+ }
+
+ @Override
+ protected Integer countUnfilteredListing(ProcessContext context) throws
IOException {
+ try (Stream<SmbListableEntity> listing = performListing(context)) {
+ return Long.valueOf(listing.count()).intValue();
+ } catch (Exception e) {
+ throw new IOException("Could not count files", e);
+ }
+ }
+
+ @Override
+ protected String getListingContainerName(ProcessContext context) {
+ return String.format("Remote Directory [%s]", getPath(context));
+ }
+
+ private String formatTimeStamp(long timestamp) {
+ return ISO_DATE_TIME.format(
+
LocalDateTime.ofEpochSecond(TimeUnit.MILLISECONDS.toSeconds(timestamp), 0,
ZoneOffset.UTC));
+ }
+
+ private boolean isExecutionStopped(ListingMode listingMode) {
+ return ListingMode.EXECUTION.equals(listingMode) && !isScheduled();
+ }
+
+ private Predicate<SmbListableEntity> createFileFilter(ProcessContext
context, Long minTimestampOrNull) {
+
+ final Long minimumAge =
context.getProperty(MINIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long maximumAgeOrNull = context.getProperty(MAXIMUM_AGE).isSet()
? context.getProperty(MAXIMUM_AGE)
+ .asTimePeriod(TimeUnit.MILLISECONDS) : null;
+ final Double minimumSizeOrNull =
+ context.getProperty(MINIMUM_SIZE).isSet() ?
context.getProperty(MINIMUM_SIZE).asDataSize(DataUnit.B)
+ : null;
+ final Double maximumSizeOrNull =
+ context.getProperty(MAXIMUM_SIZE).isSet() ?
context.getProperty(MAXIMUM_SIZE).asDataSize(DataUnit.B)
+ : null;
+ final String suffixOrNull =
context.getProperty(FILE_NAME_SUFFIX_FILTER).getValue();
+
+ final long now = getCurrentTime();
+ Predicate<SmbListableEntity> filter = entity -> now -
entity.getTimestamp() >= minimumAge;
+
+ if (maximumAgeOrNull != null) {
+ filter = filter.and(entity -> now - entity.getTimestamp() <=
maximumAgeOrNull);
+ }
+
+ if (minTimestampOrNull != null) {
+ filter = filter.and(entity -> entity.getTimestamp() >=
minTimestampOrNull);
+ }
+
+ if (minimumSizeOrNull != null) {
+ filter = filter.and(entity -> entity.getSize() >=
minimumSizeOrNull);
+ }
+
+ if (maximumSizeOrNull != null) {
+ filter = filter.and(entity -> entity.getSize() <=
maximumSizeOrNull);
+ }
+
+ if (suffixOrNull != null) {
+ filter = filter.and(entity ->
!entity.getName().endsWith(suffixOrNull));
+ }
+
+ return filter;
+ }
+
+ private Stream<SmbListableEntity> performListing(ProcessContext context)
throws IOException {
+ final SmbClientProviderService clientProviderService =
+
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+ final String directory = getDirectory(context);
+ final SmbClientService clientService =
clientProviderService.getClient();
+ return clientService.listRemoteFiles(directory).onClose(() -> {
+ try {
+ clientService.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Could not close samba client", e);
+ }
+ });
+ }
+
+ private String getDirectory(ProcessContext context) {
+ final PropertyValue property = context.getProperty(DIRECTORY);
+ final String directory = property.isSet() ?
property.getValue().replace('\\', '/') : "";
+ return directory.equals("/") ? "" : directory;
+ }
+
+ private static class MustNotContainDirectorySeparatorsValidator implements
Validator {
+
+ @Override
+ public ValidationResult validate(String subject, String value,
ValidationContext context) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(value)
+ .valid(!value.contains("/"))
+ .explanation(subject + " must not contain any folder
separator character.")
+ .build();
+ }
+
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index bc5320b99b..0dd9276bc0 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,5 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+org.apache.nifi.processors.smb.GetSmbFile
+org.apache.nifi.processors.smb.ListSmb
org.apache.nifi.processors.smb.PutSmbFile
-org.apache.nifi.processors.smb.GetSmbFile
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
new file mode 100644
index 0000000000..8ef5aa7ba3
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static java.util.Arrays.fill;
+import static java.util.stream.Collectors.toSet;
+import static
org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY;
+import static
org.apache.nifi.processor.util.list.AbstractListProcessor.RECORD_WRITER;
+import static
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY;
+import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER;
+import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE;
+import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_SIZE;
+import static
org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
+import static org.apache.nifi.util.TestRunners.newTestRunner;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbListableEntity;
+import org.apache.nifi.services.smb.SmbjClientProviderService;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.utility.DockerImageName;
+
+public class ListSmbIT {
+
+ private final static Integer DEFAULT_SAMBA_PORT = 445;
+ private final static Logger logger =
LoggerFactory.getLogger(ListSmbTest.class);
+ private final GenericContainer<?> sambaContainer = new
GenericContainer<>(DockerImageName.parse("dperson/samba"))
+ .withExposedPorts(DEFAULT_SAMBA_PORT, 139)
+ .waitingFor(Wait.forListeningPort())
+ .withLogConsumer(new Slf4jLogConsumer(logger))
+ .withCommand("-w domain -u username;password -s
share;/folder;;no;no;username;;; -p");
+
+ @BeforeEach
+ public void beforeEach() {
+ sambaContainer.start();
+ }
+
+ @AfterEach
+ public void afterEach() {
+ sambaContainer.stop();
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {4, 50, 45000})
+ public void shouldFillSizeAttributeProperly(int size) throws Exception {
+ writeFile("1.txt", generateContentWithSize(size));
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ testRunner.setProperty(LISTING_STRATEGY, "none");
+ testRunner.setProperty(MINIMUM_AGE, "0 ms");
+ SmbjClientProviderService smbjClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
+ testRunner.enableControllerService(smbjClientProviderService);
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.getFlowFilesForRelationship(REL_SUCCESS)
+ .forEach(flowFile -> assertEquals(size,
Integer.valueOf(flowFile.getAttribute("size"))));
+ testRunner.assertValid();
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
+ @Test
+ public void shouldShowBulletinOnMissingDirectory() throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ testRunner.setProperty(LISTING_STRATEGY, "none");
+ testRunner.setProperty(MINIMUM_AGE, "0 ms");
+ testRunner.setProperty(DIRECTORY, "folderDoesNotExists");
+ SmbjClientProviderService smbjClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
+ testRunner.enableControllerService(smbjClientProviderService);
+ testRunner.run();
+ assertEquals(1, testRunner.getLogger().getErrorMessages().size());
+ testRunner.assertValid();
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
+ @Test
+ public void shouldShowBulletinWhenShareIsInvalid() throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ SmbjClientProviderService smbjClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
+ testRunner.setProperty(smbjClientProviderService, SHARE,
"invalid_share");
+ testRunner.enableControllerService(smbjClientProviderService);
+ testRunner.run();
+ assertEquals(1, testRunner.getLogger().getErrorMessages().size());
+ testRunner.assertValid();
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
+ @Test
+ public void shouldShowBulletinWhenSMBPortIsInvalid() throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ final SmbClientProviderService smbClientProviderService =
+ configureTestRunnerForSambaDockerContainer(testRunner);
+ testRunner.setProperty(smbClientProviderService, PORT, "1");
+ testRunner.enableControllerService(smbClientProviderService);
+ testRunner.run();
+ assertEquals(1, testRunner.getLogger().getErrorMessages().size());
+ testRunner.assertValid();
+ testRunner.disableControllerService(smbClientProviderService);
+ }
+
+ @Test
+ public void shouldShowBulletinWhenSMBHostIsInvalid() throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ final SmbClientProviderService smbClientProviderService =
+ configureTestRunnerForSambaDockerContainer(testRunner);
+ testRunner.setProperty(smbClientProviderService, HOSTNAME,
"this.host.should.not.exists");
+ testRunner.enableControllerService(smbClientProviderService);
+ testRunner.run();
+ assertEquals(1, testRunner.getLogger().getErrorMessages().size());
+ testRunner.disableControllerService(smbClientProviderService);
+ }
+
+ @Test
+ public void shouldUseRecordWriterProperly() throws Exception {
+ final Set<String> testFiles = new HashSet<>(asList(
+ "1.txt",
+ "directory/2.txt",
+ "directory/subdirectory/3.txt",
+ "directory/subdirectory2/4.txt",
+ "directory/subdirectory3/5.txt"
+ ));
+ testFiles.forEach(file -> writeFile(file, generateContentWithSize(4)));
+
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ final MockRecordWriter writer = new MockRecordWriter(null, false);
+ final SimpleRecordSchema simpleRecordSchema =
SmbListableEntity.getRecordSchema();
+ testRunner.addControllerService("writer", writer);
+ testRunner.enableControllerService(writer);
+ testRunner.setProperty(LISTING_STRATEGY, "none");
+ testRunner.setProperty(RECORD_WRITER, "writer");
+ testRunner.setProperty(MINIMUM_AGE, "0 ms");
+ SmbjClientProviderService smbjClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
+ testRunner.enableControllerService(smbjClientProviderService);
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ final String result =
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent();
+ final int identifierColumnIndex =
simpleRecordSchema.getFieldNames().indexOf("identifier");
+ final Set<String> actual = Arrays.stream(result.split("\n"))
+ .map(row -> row.split(",")[identifierColumnIndex])
+ .collect(toSet());
+ assertEquals(testFiles, actual);
+ testRunner.assertValid();
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
+ @Test
+ public void shouldWriteFlowFileAttributesProperly() throws Exception {
+ final Set<String> testFiles = new HashSet<>(asList(
+ "file_name", "directory/file_name",
"directory/subdirectory/file_name"
+ ));
+ testFiles.forEach(file -> writeFile(file, generateContentWithSize(4)));
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ final SmbjClientProviderService smbjClientProviderService =
+ configureTestRunnerForSambaDockerContainer(testRunner);
+ testRunner.setProperty(LISTING_STRATEGY, "none");
+ testRunner.setProperty(MINIMUM_AGE, "0 sec");
+ testRunner.enableControllerService(smbjClientProviderService);
+ testRunner.run(1);
+ testRunner.assertTransferCount(REL_SUCCESS, 3);
+ final Set<Map<String, String>> allAttributes =
testRunner.getFlowFilesForRelationship(REL_SUCCESS)
+ .stream()
+ .map(MockFlowFile::getAttributes)
+ .collect(toSet());
+
+ final Set<String> identifiers = allAttributes.stream()
+ .map(attributes -> attributes.get("identifier"))
+ .collect(toSet());
+ assertEquals(testFiles, identifiers);
+
+ allAttributes.forEach(attribute -> assertEquals(
+ Stream.of(attribute.get("path"),
attribute.get("filename")).filter(s -> !s.isEmpty()).collect(
+ Collectors.joining("/")),
+ attribute.get("absolute.path")));
+
+ final Set<String> fileNames = allAttributes.stream()
+ .map(attributes -> attributes.get("filename"))
+ .collect(toSet());
+
+ assertEquals(new HashSet<>(Arrays.asList("file_name")), fileNames);
+
+ testRunner.assertValid();
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
+ @Test
+ public void shouldFilterFilesBySizeCriteria() throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ final SmbClientProviderService smbClientProviderService =
+ configureTestRunnerForSambaDockerContainer(testRunner);
+ testRunner.enableControllerService(smbClientProviderService);
+ testRunner.setProperty(MINIMUM_AGE, "0 ms");
+ testRunner.setProperty(LISTING_STRATEGY, "none");
+
+ writeFile("1.txt", generateContentWithSize(1));
+ writeFile("10.txt", generateContentWithSize(10));
+ writeFile("100.txt", generateContentWithSize(100));
+
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 3);
+ testRunner.clearTransferState();
+
+ testRunner.setProperty(MINIMUM_SIZE, "10 B");
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 2);
+ testRunner.clearTransferState();
+
+ testRunner.setProperty(MINIMUM_SIZE, "50 B");
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+
+ testRunner.disableControllerService(smbClientProviderService);
+
+ }
+
+ @Test
+ public void shouldFilterByGivenSuffix() throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ final SmbClientProviderService smbClientProviderService =
+ configureTestRunnerForSambaDockerContainer(testRunner);
+ testRunner.enableControllerService(smbClientProviderService);
+ testRunner.setProperty(MINIMUM_AGE, "0 ms");
+ testRunner.setProperty(FILE_NAME_SUFFIX_FILTER, ".suffix");
+ testRunner.setProperty(LISTING_STRATEGY, "none");
+ writeFile("should_list_this", generateContentWithSize(1));
+ writeFile("should_skip_this.suffix", generateContentWithSize(1));
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.disableControllerService(smbClientProviderService);
+ }
+
+ private SmbjClientProviderService
configureTestRunnerForSambaDockerContainer(TestRunner testRunner)
+ throws Exception {
+ SmbjClientProviderService smbjClientProviderService = new
SmbjClientProviderService();
+ testRunner.addControllerService("connection-pool",
smbjClientProviderService);
+ testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "connection-pool");
+ testRunner.setProperty(smbjClientProviderService, HOSTNAME,
sambaContainer.getHost());
+ testRunner.setProperty(smbjClientProviderService, PORT,
+
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
+ testRunner.setProperty(smbjClientProviderService, USERNAME,
"username");
+ testRunner.setProperty(smbjClientProviderService, PASSWORD,
"password");
+ testRunner.setProperty(smbjClientProviderService, SHARE, "share");
+ testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain");
+ return smbjClientProviderService;
+ }
+
+ private String generateContentWithSize(int sizeInBytes) {
+ byte[] bytes = new byte[sizeInBytes];
+ fill(bytes, (byte) 1);
+ return new String(bytes);
+ }
+
+ private void writeFile(String path, String content) {
+ String containerPath = "/folder/" + path;
+ sambaContainer.copyFileToContainer(Transferable.of(content),
containerPath);
+ }
+
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
new file mode 100644
index 0000000000..e279bf2e85
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.stream;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static
org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY;
+import static
org.apache.nifi.processor.util.list.AbstractListProcessor.REL_SUCCESS;
+import static
org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION;
+import static
org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE;
+import static org.apache.nifi.processors.smb.ListSmb.DIRECTORY;
+import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER;
+import static org.apache.nifi.processors.smb.ListSmb.MAXIMUM_AGE;
+import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE;
+import static
org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.util.TestRunners.newTestRunner;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.processor.util.list.ListedEntity;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbListableEntity;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class ListSmbTest {
+
+ private final static AtomicLong currentMillis = new AtomicLong();
+ private final static AtomicLong currentNanos = new AtomicLong();
+ public static final String CLIENT_SERVICE_PROVIDER_ID =
"client-provider-service-id";
+
+ private static long currentMillis() {
+ return currentMillis.get();
+ }
+
+ private static long currentNanos() {
+ return currentNanos.get();
+ }
+
+ private static void setTime(Long timeInMillis) {
+ currentMillis.set(timeInMillis);
+ currentNanos.set(NANOSECONDS.convert(timeInMillis, MILLISECONDS));
+ }
+
+ private static void timePassed(Long timeInMillis) {
+ currentMillis.addAndGet(timeInMillis);
+ currentNanos.addAndGet(NANOSECONDS.convert(timeInMillis,
MILLISECONDS));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"timestamps", "entities"})
+ public void shouldResetStateWhenPropertiesChanged(String listingStrategy)
throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ testRunner.setProperty(LISTING_STRATEGY, listingStrategy);
+ testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
+ testRunner.setProperty(MINIMUM_AGE, "0 ms");
+ final DistributedMapCacheClient cacheService = mockDistributedMap();
+ testRunner.addControllerService("cacheService", cacheService);
+ testRunner.setProperty(TRACKING_STATE_CACHE, "cacheService");
+ testRunner.enableControllerService(cacheService);
+ final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
+ long now = System.currentTimeMillis();
+ mockSmbFolders(mockNifiSmbClientService,
+ listableEntity("should_list_this_again_after_property_change",
now - 100));
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.clearTransferState();
+ testRunner.setProperty(DIRECTORY, "testDirectoryChanged");
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.clearTransferState();
+
+ testRunner.setProperty(FILE_NAME_SUFFIX_FILTER, "suffix_changed");
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.clearTransferState();
+
+ final SmbClientProviderService clientProviderService =
mock(SmbClientProviderService.class);
+
when(clientProviderService.getIdentifier()).thenReturn("different-client-provider");
+
when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share"));
+
when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService);
+ testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE,
"different-client-provider");
+ testRunner.addControllerService("different-client-provider",
clientProviderService);
+ testRunner.enableControllerService(clientProviderService);
+
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+
+ testRunner.assertValid();
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {1L, 50L, 150L, 3000L})
+ public void testShouldUseTimestampBasedStrategyProperly(Long minimumAge)
throws Exception {
+ final TestRunner testRunner = newTestRunner(TimeMockingListSmb.class);
+ testRunner.setProperty(LISTING_STRATEGY, "timestamps");
+ testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
+ testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms");
+ final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
+ setTime(1000L);
+ mockSmbFolders(mockNifiSmbClientService);
+ testRunner.run();
+ verify(mockNifiSmbClientService).close();
+ mockSmbFolders(mockNifiSmbClientService,
+ listableEntity("first", 900),
+ listableEntity("second", 1000)
+ );
+ testRunner.run();
+ timePassed(100L);
+ mockSmbFolders(mockNifiSmbClientService,
+ listableEntity("first", 900),
+ listableEntity("second", 1000),
+ listableEntity("third", 1100)
+ );
+ testRunner.run();
+ timePassed(1L);
+ mockSmbFolders(mockNifiSmbClientService,
+ listableEntity("first", 900),
+ listableEntity("second", 1000),
+ listableEntity("third", 1100),
+
listableEntity("appeared_during_the_previous_iteration_and_it_was_missed", 1099)
+ );
+ timePassed(100L);
+ testRunner.run();
+ timePassed(minimumAge);
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 4);
+ testRunner.assertValid();
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {0L, 50L, 150L, 3000L})
+ public void testShouldUseEntityTrackingBasedStrategyProperly(Long
minimumAge) throws Exception {
+ final TestRunner testRunner = newTestRunner(TimeMockingListSmb.class);
+ testRunner.setProperty(LISTING_STRATEGY, "entities");
+ testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
+ testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms");
+ final DistributedMapCacheClient cacheService = mockDistributedMap();
+ testRunner.addControllerService("cacheService", cacheService);
+ testRunner.setProperty(TRACKING_STATE_CACHE, "cacheService");
+ testRunner.enableControllerService(cacheService);
+ final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
+ setTime(1000L);
+ mockSmbFolders(mockNifiSmbClientService,
+ listableEntity("first", 1000)
+ );
+ testRunner.run();
+ verify(mockNifiSmbClientService).close();
+ timePassed(100L);
+ mockSmbFolders(mockNifiSmbClientService,
+ listableEntity("first", 1000),
+ listableEntity("second", 1100)
+ );
+ testRunner.run();
+ timePassed(minimumAge);
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 2);
+ testRunner.assertValid();
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {0L, 50L, 150L, 3000L})
+ public void testShouldUseNoTrackingBasedStrategyProperly(Long minimumAge)
throws Exception {
+ final TestRunner testRunner = newTestRunner(TimeMockingListSmb.class);
+ testRunner.setProperty(LISTING_STRATEGY, "none");
+ testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
+ testRunner.setProperty(MINIMUM_AGE, minimumAge + " ms");
+ final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
+ setTime(1000L);
+ mockSmbFolders(mockNifiSmbClientService,
+ listableEntity("first", 1000)
+ );
+ timePassed(minimumAge);
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 2);
+ testRunner.assertValid();
+ }
+
+ @Test
+ public void testShouldFilterByFileAgeCriteria() throws Exception {
+ final TestRunner testRunner = newTestRunner(TimeMockingListSmb.class);
+ testRunner.setProperty(LISTING_STRATEGY, "none");
+ testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
+ testRunner.setProperty(MINIMUM_AGE, "10 ms");
+ testRunner.setProperty(MAXIMUM_AGE, "30 ms");
+ final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
+ setTime(1000L);
+ mockSmbFolders(mockNifiSmbClientService,
+ listableEntity("too_young", 1000),
+ listableEntity("too_old", 1000 - 31),
+ listableEntity("should_list_this", 1000 - 11)
+ );
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ testRunner.assertValid();
+ }
+
+ @Test
+ public void shouldTurnSmbClientExceptionsToBulletins() throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ testRunner.setProperty(LISTING_STRATEGY, "timestamps");
+ testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
+ final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
+
when(mockNifiSmbClientService.listRemoteFiles(anyString())).thenThrow(new
RuntimeException("test exception"));
+ testRunner.run();
+ assertEquals(1, testRunner.getLogger().getErrorMessages().size());
+ }
+
+ @Test
+ public void shouldFormatRemotePathProperly() throws Exception {
+ final TestRunner testRunner = newTestRunner(ListSmb.class);
+ final SmbClientProviderService clientProviderService =
mockSmbConnectionPoolService();
+ testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE,
CLIENT_SERVICE_PROVIDER_ID);
+ testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID,
clientProviderService);
+
when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://hostname:445/share"));
+ final ListSmb underTest = (ListSmb) testRunner.getProcessor();
+
+ assertEquals("smb://hostname:445/share/",
underTest.getPath(testRunner.getProcessContext()));
+
+ testRunner.setProperty(DIRECTORY, "/");
+ assertEquals("smb://hostname:445/share/",
underTest.getPath(testRunner.getProcessContext()));
+
+ testRunner.setProperty(DIRECTORY, "root");
+ assertEquals("smb://hostname:445/share/root/",
underTest.getPath(testRunner.getProcessContext()));
+
+ testRunner.setProperty(DIRECTORY, "root/subdirectory");
+ assertEquals("smb://hostname:445/share/root/subdirectory/",
underTest.getPath(testRunner.getProcessContext()));
+
+ }
+
+ private SmbClientProviderService mockSmbConnectionPoolService() {
+ final SmbClientProviderService clientProviderService =
mock(SmbClientProviderService.class);
+
when(clientProviderService.getIdentifier()).thenReturn(CLIENT_SERVICE_PROVIDER_ID);
+
when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share"));
+ return clientProviderService;
+ }
+
+ private SmbClientService
configureTestRunnerWithMockedSambaClient(TestRunner testRunner)
+ throws Exception {
+ final SmbClientService mockNifiSmbClientService =
mock(SmbClientService.class);
+ testRunner.setProperty(DIRECTORY, "testDirectory");
+
+ final SmbClientProviderService clientProviderService =
mockSmbConnectionPoolService();
+
when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService);
+ testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE,
CLIENT_SERVICE_PROVIDER_ID);
+ testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID,
clientProviderService);
+ testRunner.enableControllerService(clientProviderService);
+
+ return mockNifiSmbClientService;
+ }
+
+ private void mockSmbFolders(SmbClientService mockNifiSmbClientService,
SmbListableEntity... entities) {
+ doAnswer(ignore ->
stream(entities)).when(mockNifiSmbClientService).listRemoteFiles(anyString());
+ }
+
+ private SmbListableEntity listableEntity(String name, long timeStamp) {
+ return SmbListableEntity.builder()
+ .setName(name)
+ .setTimestamp(timeStamp)
+ .build();
+ }
+
+ private DistributedMapCacheClient mockDistributedMap() throws IOException {
+ final Map<String, ConcurrentHashMap<String, ListedEntity>> store = new
ConcurrentHashMap<>();
+ final DistributedMapCacheClient cacheService =
mock(DistributedMapCacheClient.class);
+ when(cacheService.putIfAbsent(any(), any(), any(),
any())).thenReturn(false);
+ when(cacheService.containsKey(any(), any())).thenReturn(false);
+ when(cacheService.getIdentifier()).thenReturn("cacheService");
+ doAnswer(invocation -> store.get(invocation.getArgument(0)))
+ .when(cacheService).get(any(), any(), any());
+ doAnswer(invocation -> store.put(invocation.getArgument(0),
invocation.getArgument(1)))
+ .when(cacheService).put(any(), any(), any(), any());
+ doAnswer(invocation ->
Optional.ofNullable(invocation.getArgument(0)).map(store::remove).isPresent())
+ .when(cacheService).remove(any(), any());
+ return cacheService;
+ }
+
+ public static class TimeMockingListSmb extends ListSmb {
+
+ @Override
+ protected long getCurrentTime() {
+ return currentMillis();
+ }
+
+ @Override
+ protected long getCurrentNanoTime() {
+ return currentNanos();
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client-nar/pom.xml
similarity index 82%
copy from nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml
copy to nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client-nar/pom.xml
index 25255fd5af..c9b13f4768 100644
--- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client-nar/pom.xml
@@ -22,7 +22,7 @@
<version>1.18.0-SNAPSHOT</version>
</parent>
- <artifactId>nifi-smb-nar</artifactId>
+ <artifactId>nifi-smb-smbj-client-nar</artifactId>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
@@ -32,7 +32,13 @@
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-smb-processors</artifactId>
+ <artifactId>nifi-smb-client-api-nar</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-smb-smbj-client</artifactId>
<version>1.18.0-SNAPSHOT</version>
</dependency>
</dependencies>
diff --git a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/pom.xml
similarity index 72%
copy from nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml
copy to nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/pom.xml
index 58b04c5898..b9bac48ddc 100644
--- a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/pom.xml
@@ -22,14 +22,24 @@
<version>1.18.0-SNAPSHOT</version>
</parent>
- <artifactId>nifi-smb-processors</artifactId>
+ <artifactId>nifi-smb-smbj-client</artifactId>
<packaging>jar</packaging>
<dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-smb-client-api</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
@@ -38,7 +48,11 @@
<dependency>
<groupId>com.hierynomus</groupId>
<artifactId>smbj</artifactId>
- <version>0.10.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@@ -46,5 +60,10 @@
<version>1.18.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>toxiproxy</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
new file mode 100644
index 0000000000..f72cec6a2b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.smb;
+
+import static java.util.Arrays.asList;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR;
+import static
org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
+import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR;
+import static
org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR;
+
+import com.hierynomus.smbj.SMBClient;
+import com.hierynomus.smbj.SmbConfig;
+import com.hierynomus.smbj.auth.AuthenticationContext;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+
+@Tags({"samba, smb, cifs, files"})
+@CapabilityDescription("Provides access to SMB Sessions with shared
authentication credentials.")
+public class SmbjClientProviderService extends AbstractControllerService
implements SmbClientProviderService {
+
+ public static final PropertyDescriptor HOSTNAME = new
PropertyDescriptor.Builder()
+ .displayName("Hostname")
+ .name("hostname")
+ .description("The network host of the SMB file server.")
+ .required(true)
+ .addValidator(NON_BLANK_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor DOMAIN = new
PropertyDescriptor.Builder()
+ .displayName("Domain")
+ .name("domain")
+ .description(
+ "The domain used for authentication. Optional, in most
cases username and password is sufficient.")
+ .required(false)
+ .addValidator(NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor USERNAME = new
PropertyDescriptor.Builder()
+ .displayName("Username")
+ .name("username")
+ .description(
+ "The username used for authentication.")
+ .required(false)
+ .defaultValue("Guest")
+ .addValidator(NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor PASSWORD = new
PropertyDescriptor.Builder()
+ .displayName("Password")
+ .name("password")
+ .description("The password used for authentication.")
+ .required(false)
+ .addValidator(NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .build();
+ public static final PropertyDescriptor PORT = new
PropertyDescriptor.Builder()
+ .displayName("Port")
+ .name("port")
+ .description("Port to use for connection.")
+ .required(true)
+ .addValidator(PORT_VALIDATOR)
+ .defaultValue("445")
+ .build();
+ public static final PropertyDescriptor SHARE = new
PropertyDescriptor.Builder()
+ .displayName("Share")
+ .name("share")
+ .description("The network share to which files should be listed
from. This is the \"first folder\"" +
+ "after the hostname:
smb://hostname:port/[share]/dir1/dir2")
+ .required(true)
+ .addValidator(NON_BLANK_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor TIMEOUT = new
PropertyDescriptor.Builder()
+ .displayName("Timeout")
+ .name("timeout")
+ .description("Timeout for read and write operations.")
+ .required(true)
+ .defaultValue("5 sec")
+ .addValidator(TIME_PERIOD_VALIDATOR)
+ .build();
+ private static final List<PropertyDescriptor> PROPERTIES = Collections
+ .unmodifiableList(asList(
+ HOSTNAME,
+ PORT,
+ SHARE,
+ USERNAME,
+ PASSWORD,
+ DOMAIN,
+ TIMEOUT
+ ));
+ private SMBClient smbClient;
+ private AuthenticationContext authenticationContext;
+ private String hostname;
+ private int port;
+ private String shareName;
+
+ @Override
+ public SmbClientService getClient() throws IOException {
+ final SmbjClientService client = new SmbjClientService(smbClient,
authenticationContext);
+
+ try {
+ client.connectToShare(hostname, port, shareName);
+ } catch (IOException e) {
+ client.forceFullyCloseConnection();
+ client.connectToShare(hostname, port, shareName);
+ }
+
+ return client;
+ }
+
+ @Override
+ public URI getServiceLocation() {
+ return URI.create(String.format("smb://%s:%d/%s", hostname, port,
shareName));
+ }
+
+ @OnEnabled
+ public void onEnabled(ConfigurationContext context) {
+ this.hostname = context.getProperty(HOSTNAME).getValue();
+ this.port = context.getProperty(PORT).asInteger();
+ this.shareName = context.getProperty(SHARE).getValue();
+ this.smbClient = new SMBClient(SmbConfig.builder()
+
.withTimeout(context.getProperty(TIMEOUT).asTimePeriod(MILLISECONDS),
MILLISECONDS)
+ .build());
+ createAuthenticationContext(context);
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ smbClient.close();
+ smbClient = null;
+ hostname = null;
+ port = 0;
+ shareName = null;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ private void createAuthenticationContext(ConfigurationContext context) {
+ if (context.getProperty(USERNAME).isSet()) {
+ final String userName = context.getProperty(USERNAME).getValue();
+ final String password =
+ context.getProperty(PASSWORD).isSet() ?
context.getProperty(PASSWORD).getValue() : "";
+ final String domainOrNull =
+ context.getProperty(DOMAIN).isSet() ?
context.getProperty(DOMAIN).getValue() : null;
+ authenticationContext = new AuthenticationContext(userName,
password.toCharArray(), domainOrNull);
+ } else {
+ authenticationContext = AuthenticationContext.anonymous();
+ }
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
new file mode 100644
index 0000000000..431a4e9f52
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.smb;
+
+import static java.lang.String.format;
+import static java.util.Arrays.asList;
+import static java.util.stream.StreamSupport.stream;
+
+import com.hierynomus.msdtyp.AccessMask;
+import com.hierynomus.msfscc.FileAttributes;
+import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation;
+import com.hierynomus.mssmb2.SMB2CreateDisposition;
+import com.hierynomus.mssmb2.SMB2CreateOptions;
+import com.hierynomus.mssmb2.SMB2ShareAccess;
+import com.hierynomus.mssmb2.SMBApiException;
+import com.hierynomus.smbj.SMBClient;
+import com.hierynomus.smbj.auth.AuthenticationContext;
+import com.hierynomus.smbj.connection.Connection;
+import com.hierynomus.smbj.session.Session;
+import com.hierynomus.smbj.share.Directory;
+import com.hierynomus.smbj.share.DiskShare;
+import com.hierynomus.smbj.share.Share;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.stream.Stream;
+
+public class SmbjClientService implements SmbClientService {
+
+ private static final List<String> SPECIAL_DIRECTORIES = asList(".", "..");
+
+ final private AuthenticationContext authenticationContext;
+ final private SMBClient smbClient;
+
+ private Connection connection;
+ private Session session;
+ private DiskShare share;
+
+ public SmbjClientService(SMBClient smbClient, AuthenticationContext
authenticationContext) {
+ this.smbClient = smbClient;
+ this.authenticationContext = authenticationContext;
+ }
+
+ public void connectToShare(String hostname, int port, String shareName)
throws IOException {
+ Share share;
+ try {
+ connection = smbClient.connect(hostname, port);
+ session = connection.authenticate(authenticationContext);
+ share = session.connectShare(shareName);
+ } catch (Exception e) {
+ close();
+ throw new IOException("Could not connect to share " +
format("%s:%d/%s", hostname, port, shareName), e);
+ }
+ if (share instanceof DiskShare) {
+ this.share = (DiskShare) share;
+ } else {
+ close();
+ throw new IllegalArgumentException("DiskShare not found. Share " +
+ share.getClass().getSimpleName() + " found on " +
format("%s:%d/%s", hostname, port,
+ shareName));
+ }
+ }
+
+ public void forceFullyCloseConnection() {
+ try {
+ if (connection != null) {
+ connection.close(true);
+ }
+ } catch (IOException ignore) {
+ } finally {
+ connection = null;
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ if (session != null) {
+ session.close();
+ }
+ } catch (IOException ignore) {
+
+ } finally {
+ session = null;
+ }
+ }
+
+ @Override
+ public Stream<SmbListableEntity> listRemoteFiles(String filePath) {
+ return Stream.of(filePath).flatMap(path -> {
+ final Directory directory = openDirectory(path);
+ return stream(directory::spliterator, 0, false)
+ .map(entity -> buildSmbListableEntity(entity, path))
+ .filter(entity -> !specialDirectory(entity))
+ .flatMap(listable -> listable.isDirectory() ?
listRemoteFiles(listable.getPathWithName())
+ : Stream.of(listable))
+ .onClose(directory::close);
+ });
+ }
+
+ @Override
+ public void createDirectory(String path) {
+ final int lastDirectorySeparatorPosition = path.lastIndexOf("/");
+ if (lastDirectorySeparatorPosition > 0) {
+ createDirectory(path.substring(0, lastDirectorySeparatorPosition));
+ }
+ if (!share.folderExists(path)) {
+ share.mkdir(path);
+ }
+ }
+
+ private SmbListableEntity
buildSmbListableEntity(FileIdBothDirectoryInformation info, String path) {
+ return SmbListableEntity.builder()
+ .setName(info.getFileName())
+ .setShortName(info.getShortName())
+ .setPath(path)
+ .setTimestamp(info.getLastWriteTime().toEpochMillis())
+ .setCreationTime(info.getCreationTime().toEpochMillis())
+ .setChangeTime(info.getChangeTime().toEpochMillis())
+ .setLastAccessTime(info.getLastAccessTime().toEpochMillis())
+ .setDirectory((info.getFileAttributes() &
FileAttributes.FILE_ATTRIBUTE_DIRECTORY.getValue()) != 0)
+ .setSize(info.getEndOfFile())
+ .setAllocationSize(info.getAllocationSize())
+ .build();
+ }
+
+ private Directory openDirectory(String path) {
+ try {
+ return share.openDirectory(
+ path,
+ EnumSet.of(AccessMask.GENERIC_READ),
+ EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY),
+ EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
+ SMB2CreateDisposition.FILE_OPEN,
+ EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE)
+ );
+ } catch (SMBApiException s) {
+ throw new RuntimeException("Could not open directory " + path + "
due to " + s.getMessage(), s);
+ }
+ }
+
+ private boolean specialDirectory(SmbListableEntity entity) {
+ return SPECIAL_DIRECTORIES.contains(entity.getName());
+ }
+
+}
+
+
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
similarity index 90%
copy from
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
copy to
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index bc5320b99b..3e64193628 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,5 +12,4 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.processors.smb.PutSmbFile
-org.apache.nifi.processors.smb.GetSmbFile
\ No newline at end of file
+org.apache.nifi.services.smb.SmbjClientProviderService
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java
new file mode 100644
index 0000000000..f3de70ce71
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.smb;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.TIMEOUT;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+
+import eu.rekawek.toxiproxy.model.ToxicDirection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.ToxiproxyContainer;
+import org.testcontainers.containers.ToxiproxyContainer.ContainerProxy;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.utility.DockerImageName;
+
+public class NiFiSmbjClientIT {
+
+ private final static Logger sambaContainerLogger =
LoggerFactory.getLogger("sambaContainer");
+ private final static Logger toxyProxyLogger =
LoggerFactory.getLogger("toxiProxy");
+
+ private final Network network = Network.newNetwork();
+
+ private final GenericContainer<?> sambaContainer = new
GenericContainer<>(DockerImageName.parse("dperson/samba"))
+ .withExposedPorts(139, 445)
+ .waitingFor(Wait.forListeningPort())
+ .withLogConsumer(new Slf4jLogConsumer(sambaContainerLogger))
+ .withNetwork(network)
+ .withNetworkAliases("samba")
+ .withCommand("-w domain -u username;password -s
share;/folder;;no;no;username;;; -p");
+
+ private final ToxiproxyContainer toxiproxy = new
ToxiproxyContainer("shopify/toxiproxy")
+ .withNetwork(network)
+ .withLogConsumer(new Slf4jLogConsumer(toxyProxyLogger))
+ .withNetworkAliases("toxiproxy");
+
+ @BeforeEach
+ public void beforeEach() {
+ sambaContainer.start();
+ toxiproxy.start();
+ }
+
+ @AfterEach
+ public void afterEach() {
+ toxiproxy.stop();
+ sambaContainer.stop();
+ }
+
+ @Test
+ public void shouldRescueAfterConnectionFailure() throws Exception {
+ writeFile("testDirectory/file", "content");
+ writeFile("testDirectory/directory1/file", "content");
+ writeFile("testDirectory/directory2/file", "content");
+ writeFile("testDirectory/directory2/nested_directory/file", "content");
+ ContainerProxy sambaProxy = toxiproxy.getProxy("samba", 445);
+ SmbjClientProviderService smbjClientProviderService = new
SmbjClientProviderService();
+ ConfigurationContext context = mock(ConfigurationContext.class);
+
+ Map<PropertyDescriptor, String> properties = new HashMap<>();
+ properties.put(HOSTNAME, sambaProxy.getContainerIpAddress());
+ properties.put(PORT, String.valueOf(sambaProxy.getProxyPort()));
+ properties.put(SHARE, "share");
+ properties.put(USERNAME, "username");
+ properties.put(PASSWORD, "password");
+ properties.put(DOMAIN, "domain");
+ properties.put(TIMEOUT, "0.5 sec");
+ MockConfigurationContext mockConfigurationContext = new
MockConfigurationContext(properties, null);
+
+ smbjClientProviderService.onEnabled(mockConfigurationContext);
+
+ sambaProxy.toxics().latency("slow", ToxicDirection.DOWNSTREAM, 300);
+
+ AtomicInteger i = new AtomicInteger(0);
+
+ ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(10);
+ CountDownLatch latch = new CountDownLatch(100);
+ executorService.scheduleWithFixedDelay(() -> {
+
+ int iteration = i.getAndIncrement();
+
+ if (iteration > 100) {
+ return;
+ }
+
+ executorService.submit(() -> {
+
+ SmbClientService s = null;
+ try {
+
+ s = smbjClientProviderService.getClient();
+ if (iteration == 25) {
+ sambaProxy.setConnectionCut(true);
+ }
+
+ final Set<String> actual =
s.listRemoteFiles("testDirectory")
+ .map(SmbListableEntity::getIdentifier)
+ .collect(toSet());
+
+ assertTrue(actual.contains("testDirectory/file"));
+
assertTrue(actual.contains("testDirectory/directory1/file"));
+
assertTrue(actual.contains("testDirectory/directory2/file"));
+
assertTrue(actual.contains("testDirectory/directory2/nested_directory/file"));
+
+
+ } catch (Exception e) {
+ if (iteration == 50) {
+ sambaProxy.setConnectionCut(false);
+ }
+ if (iteration == 100) {
+ fail();
+ }
+ } finally {
+ if (s != null) {
+ try {
+ s.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+ latch.countDown();
+ });
+
+ }, 0, 2, TimeUnit.SECONDS);
+
+ latch.await();
+ executorService.shutdown();
+ smbjClientProviderService.onDisabled();
+ }
+
+ private void writeFile(String path, String content) {
+ String containerPath = "/folder/" + path;
+ sambaContainer.copyFileToContainer(Transferable.of(content),
containerPath);
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java
new file mode 100644
index 0000000000..abf032072d
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.smb;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.hierynomus.smbj.SMBClient;
+import com.hierynomus.smbj.auth.AuthenticationContext;
+import com.hierynomus.smbj.connection.Connection;
+import com.hierynomus.smbj.session.Session;
+import com.hierynomus.smbj.share.DiskShare;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+class NiFiSmbjClientTest {
+
+ @Mock
+ DiskShare share;
+
+ @Mock
+ SMBClient smbClient;
+
+ @Mock
+ AuthenticationContext authenticationContext;
+
+ @Mock
+ Session session;
+
+ @Mock
+ Connection connection;
+
+ @InjectMocks
+ SmbjClientService underTest;
+
+ @BeforeEach
+ public void beforeEach() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ public void shouldCreateDirectoriesRecursively() throws Exception {
+
+ when(smbClient.connect("hostname", 445))
+ .thenReturn(connection);
+
when(connection.authenticate(authenticationContext)).thenReturn(session);
+ when(session.connectShare(anyString())).thenReturn(share);
+ when(share.fileExists("directory")).thenReturn(true);
+ when(share.fileExists("path")).thenReturn(false);
+ when(share.fileExists("to")).thenReturn(false);
+ when(share.fileExists("create")).thenReturn(false);
+
+ underTest.connectToShare("hostname", 445, "share");
+ underTest.createDirectory("directory/path/to/create");
+
+ verify(share).mkdir("directory/path");
+ verify(share).mkdir("directory/path/to");
+ verify(share).mkdir("directory/path/to/create");
+
+ }
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-smb-bundle/pom.xml
b/nifi-nar-bundles/nifi-smb-bundle/pom.xml
index 23553206c8..53e2db5783 100644
--- a/nifi-nar-bundles/nifi-smb-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-smb-bundle/pom.xml
@@ -26,7 +26,41 @@
<packaging>pom</packaging>
<modules>
+ <module>nifi-smb-client-api</module>
+ <module>nifi-smb-client-api-nar</module>
+ <module>nifi-smb-smbj-client</module>
+ <module>nifi-smb-smbj-client-nar</module>
<module>nifi-smb-processors</module>
<module>nifi-smb-nar</module>
</modules>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>com.hierynomus</groupId>
+ <artifactId>smbj</artifactId>
+ <version>0.11.5</version>
+ </dependency>
+ <dependency>
+ <groupId>net.engio</groupId>
+ <artifactId>mbassador</artifactId>
+ <version>1.3.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainers.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>toxiproxy</artifactId>
+ <version>${testcontainers.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
</project>
diff --git a/nifi-registry/pom.xml b/nifi-registry/pom.xml
index 7dea5683e1..8b52eb969f 100644
--- a/nifi-registry/pom.xml
+++ b/nifi-registry/pom.xml
@@ -40,7 +40,6 @@
<flyway.version>8.4.2</flyway.version>
<flyway.tests.version>7.0.0</flyway.tests.version>
<swagger.ui.version>3.12.0</swagger.ui.version>
- <testcontainers.version>1.17.3</testcontainers.version>
<groovy.eclipse.compiler.version>3.4.0-01</groovy.eclipse.compiler.version>
<jaxb.version>2.3.2</jaxb.version>
<jgit.version>5.13.0.202109080827-r</jgit.version>
diff --git a/pom.xml b/pom.xml
index 4e4d741108..2a705edb85 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,6 +112,7 @@
<org.apache.httpcomponents.httpclient.version>4.5.13</org.apache.httpcomponents.httpclient.version>
<org.apache.httpcomponents.httpcore.version>4.4.15</org.apache.httpcomponents.httpcore.version>
<org.bouncycastle.version>1.70</org.bouncycastle.version>
+ <testcontainers.version>1.17.3</testcontainers.version>
<org.slf4j.version>1.7.36</org.slf4j.version>
<ranger.version>2.2.0</ranger.version>
<jetty.version>9.4.48.v20220622</jetty.version>