This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 10ce00862d NIFI-12837 Added DFS support in SMB processors
10ce00862d is described below
commit 10ce00862d1479dee1444d0da9cc48bbbd83729f
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Mon Mar 18 11:04:32 2024 +0100
NIFI-12837 Added DFS support in SMB processors
Signed-off-by: Pierre Villard <[email protected]>
This closes #8527.
---
.../org/apache/nifi/processors/smb/GetSmbFile.java | 2 +
.../org/apache/nifi/processors/smb/PutSmbFile.java | 2 +
.../org/apache/nifi/processors/smb/SmbDfsIT.java | 229 +++++++++++++++++++++
.../services/smb/SmbjClientProviderService.java | 83 +++-----
...iSmbjClientIT.java => SmbjClientServiceIT.java} | 4 +-
...jClientTest.java => SmbjClientServiceTest.java} | 8 +-
.../java/org/apache/nifi/smb/common/SmbClient.java | 92 +++++++++
.../org/apache/nifi/smb/common/SmbProperties.java | 9 +
.../java/org/apache/nifi/smb/common/SmbUtils.java | 7 +-
9 files changed, 376 insertions(+), 60 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
index ed843f9bff..f1649a4faa 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
@@ -80,6 +80,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
+import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT;
import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT;
import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION;
@@ -257,6 +258,7 @@ public class GetSmbFile extends AbstractProcessor {
descriptors.add(IGNORE_HIDDEN_FILES);
descriptors.add(SMB_DIALECT);
descriptors.add(USE_ENCRYPTION);
+ descriptors.add(ENABLE_DFS);
descriptors.add(TIMEOUT);
this.descriptors = Collections.unmodifiableList(descriptors);
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
index 468828f819..af2eab2ff1 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
@@ -64,6 +64,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT;
import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT;
import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION;
@@ -194,6 +195,7 @@ public class PutSmbFile extends AbstractProcessor {
descriptors.add(RENAME_SUFFIX);
descriptors.add(SMB_DIALECT);
descriptors.add(USE_ENCRYPTION);
+ descriptors.add(ENABLE_DFS);
descriptors.add(TIMEOUT);
this.descriptors = Collections.unmodifiableList(descriptors);
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SmbDfsIT.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SmbDfsIT.java
new file mode 100644
index 0000000000..6fe51b3eb6
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SmbDfsIT.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.services.smb.SmbjClientProviderService;
+import org.apache.nifi.smb.common.SmbProperties;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.FixedHostPortGenericContainer;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.nifi.processor.util.list.AbstractListProcessor.NO_TRACKING;
+import static
org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
+import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
+import static org.apache.nifi.util.TestRunners.newTestRunner;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class SmbDfsIT {
+
+ private final static Logger LOGGER =
LoggerFactory.getLogger(SmbDfsIT.class);
+
+ private static final int DEFAULT_SMB_PORT = 445;
+
+ // DFS works only on the default SMB port (445). Not sure if it is a
generic DFS vs Samba DFS constraint, or an issue in the smbj client library.
+ private final GenericContainer<?> sambaContainer = new
FixedHostPortGenericContainer<>("dperson/samba")
+ .withFixedExposedPort(DEFAULT_SMB_PORT, DEFAULT_SMB_PORT)
+ .waitingFor(Wait.forListeningPort())
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER))
+ .withCommand("-u", "myuser;mypass",
+ "-s", "share;/share-dir;;no;no;myuser;;;",
+ "-s", "dfs-share;/dfs-share-dir;;no;no;myuser;;;",
+ "-p",
+ "-g", "host msdfs = yes",
+ "-G", "dfs-share;msdfs root = yes");
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ sambaContainer.start();
+
+ sambaContainer.execInContainer("ln", "-s", "msdfs:" +
sambaContainer.getHost() + "\\share", "/dfs-share-dir/dfs-link");
+ Thread.sleep(100);
+ }
+
+ @AfterEach
+ void afterEach() {
+ sambaContainer.stop();
+ }
+
+ @Test
+ void testFetchSmb() throws Exception {
+ writeFile("fetch_file", "fetch_content");
+
+ TestRunner testRunner = newTestRunner(FetchSmb.class);
+ testRunner.setProperty(FetchSmb.REMOTE_FILE, "dfs-link/fetch_file");
+ SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner);
+
+ testRunner.enqueue("");
+ testRunner.run();
+
+ testRunner.assertTransferCount(FetchSmb.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(FetchSmb.REL_SUCCESS).get(0);
+ assertEquals("fetch_content", flowFile.getContent());
+
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
+ @Test
+ void testFetchFileFailsWhenDfsIsDisabled() throws Exception {
+ writeFile("fetch_file", "fetch_content");
+
+ TestRunner testRunner = newTestRunner(FetchSmb.class);
+ testRunner.setProperty(FetchSmb.REMOTE_FILE, "dfs-link/fetch_file");
+ SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, false);
+
+ testRunner.enqueue("");
+ testRunner.run();
+
+ testRunner.assertTransferCount(FetchSmb.REL_FAILURE, 1);
+ MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(FetchSmb.REL_FAILURE).get(0);
+ assertEquals(0, flowFile.getSize());
+
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
+ @Test
+ void testListSmbWithDfsLink() throws Exception {
+ testListSmb("dfs-link");
+ }
+
+ @Test
+ @Disabled("Listing folders recursively from the DFS root or a parent
directory of the DFS link does not work on Samba due to
https://github.com/hierynomus/smbj/issues/717#")
+ void testListSmbWithDfsRoot() throws Exception {
+ testListSmb(null);
+ }
+
+ private void testListSmb(String directory) throws Exception {
+ writeFile("list_file", "list_content");
+
+ TestRunner testRunner = newTestRunner(ListSmb.class);
+ if (directory != null) {
+ testRunner.setProperty(ListSmb.DIRECTORY, directory);
+ }
+ testRunner.setProperty(ListSmb.LISTING_STRATEGY, NO_TRACKING);
+ testRunner.setProperty(ListSmb.MINIMUM_AGE, "0 ms");
+ SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner);
+
+ testRunner.run();
+
+ testRunner.assertTransferCount(ListSmb.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(ListSmb.REL_SUCCESS).get(0);
+ assertEquals(0, flowFile.getSize());
+ assertEquals("dfs-link",
flowFile.getAttribute(CoreAttributes.PATH.key()));
+ assertEquals("list_file",
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
+ @Test
+ void testPutSmbFile() {
+ TestRunner testRunner = newTestRunner(PutSmbFile.class);
+ testRunner.setProperty(PutSmbFile.HOSTNAME, sambaContainer.getHost());
+ testRunner.setProperty(PutSmbFile.SHARE, "dfs-share");
+ testRunner.setProperty(PutSmbFile.DIRECTORY, "dfs-link");
+ testRunner.setProperty(PutSmbFile.USERNAME, "myuser");
+ testRunner.setProperty(PutSmbFile.PASSWORD, "mypass");
+ testRunner.setProperty(SmbProperties.ENABLE_DFS, "true");
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "put_file");
+
+ testRunner.enqueue("put_content", attributes);
+ testRunner.run();
+
+ testRunner.assertTransferCount(PutSmbFile.REL_SUCCESS, 1);
+
+ String fileContent = readFile("put_file");
+ assertEquals("put_content", fileContent);
+ }
+
+ @Test
+ void testGetSmbFile() {
+ writeFile("get_file", "get_content");
+
+ TestRunner testRunner = newTestRunner(GetSmbFile.class);
+ testRunner.setProperty(GetSmbFile.HOSTNAME, sambaContainer.getHost());
+ testRunner.setProperty(GetSmbFile.SHARE, "dfs-share");
+ testRunner.setProperty(GetSmbFile.DIRECTORY, "dfs-link");
+ testRunner.setProperty(GetSmbFile.USERNAME, "myuser");
+ testRunner.setProperty(GetSmbFile.PASSWORD, "mypass");
+ testRunner.setProperty(SmbProperties.ENABLE_DFS, "true");
+
+ testRunner.run();
+
+ testRunner.assertTransferCount(GetSmbFile.REL_SUCCESS, 1);
+ MockFlowFile flowFile =
testRunner.getFlowFilesForRelationship(GetSmbFile.REL_SUCCESS).get(0);
+ assertEquals("get_content", flowFile.getContent());
+ assertEquals("dfs-link",
flowFile.getAttribute(CoreAttributes.PATH.key()));
+ assertEquals("get_file",
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+ }
+
+ private SmbjClientProviderService configureSmbClient(TestRunner
testRunner) throws InitializationException {
+ return configureSmbClient(testRunner, true);
+ }
+
+ private SmbjClientProviderService configureSmbClient(TestRunner
testRunner, boolean enableDfs) throws InitializationException {
+ SmbjClientProviderService smbjClientProviderService = new
SmbjClientProviderService();
+
+ testRunner.addControllerService("client-provider",
smbjClientProviderService);
+
+ testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider");
+
+ testRunner.setProperty(smbjClientProviderService, HOSTNAME,
sambaContainer.getHost());
+ testRunner.setProperty(smbjClientProviderService, PORT,
Integer.toString(DEFAULT_SMB_PORT));
+ testRunner.setProperty(smbjClientProviderService, USERNAME, "myuser");
+ testRunner.setProperty(smbjClientProviderService, PASSWORD, "mypass");
+ testRunner.setProperty(smbjClientProviderService, SHARE, "dfs-share");
+ testRunner.setProperty(smbjClientProviderService, ENABLE_DFS,
Boolean.toString(enableDfs));
+
+ testRunner.enableControllerService(smbjClientProviderService);
+
+ return smbjClientProviderService;
+ }
+
+ private void writeFile(String filename, String content) {
+ String containerPath = "/share-dir/" + filename;
+ sambaContainer.copyFileToContainer(Transferable.of(content),
containerPath);
+ }
+
+ private String readFile(String filename) {
+ String containerPath = "/share-dir/" + filename;
+ return sambaContainer.copyFileFromContainer(containerPath, is ->
IOUtils.toString(is, StandardCharsets.UTF_8));
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
index 080be06199..ebffb816ec 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
@@ -39,6 +39,7 @@ import static java.util.Arrays.asList;
import static
org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR;
import static
org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR;
+import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT;
import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT;
import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION;
@@ -112,6 +113,7 @@ public class SmbjClientProviderService extends
AbstractControllerService impleme
DOMAIN,
SMB_DIALECT,
USE_ENCRYPTION,
+ ENABLE_DFS,
TIMEOUT
));
@@ -122,24 +124,37 @@ public class SmbjClientProviderService extends
AbstractControllerService impleme
private String shareName;
@Override
- public SmbClientService getClient() throws IOException {
- Connection connection = null;
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
- try {
- connection = smbClient.connect(hostname, port);
- return connectToShare(connection);
- } catch (IOException e) {
- getLogger().debug("Closing stale connection and trying to create a
new one for share " + getServiceLocation());
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ this.hostname = context.getProperty(HOSTNAME).getValue();
+ this.port = context.getProperty(PORT).asInteger();
+ this.shareName = context.getProperty(SHARE).getValue();
+ this.smbClient = buildSmbClient(context);
+ createAuthenticationContext(context);
+ }
- closeConnection(connection);
- unregisterHost();
+ @OnDisabled
+ public void onDisabled() {
+ smbClient.close();
+ smbClient = null;
+ hostname = null;
+ port = 0;
+ shareName = null;
+ }
- connection = smbClient.connect(hostname, port);
- return connectToShare(connection);
- }
+ @Override
+ public URI getServiceLocation() {
+ return URI.create(String.format("smb://%s:%d/%s", hostname, port,
shareName));
}
- private SmbjClientService connectToShare(final Connection connection)
throws IOException {
+ @Override
+ public SmbClientService getClient() throws IOException {
+ final Connection connection = smbClient.connect(hostname, port);
+
final Session session;
final Share share;
@@ -164,20 +179,6 @@ public class SmbjClientProviderService extends
AbstractControllerService impleme
return new SmbjClientService(session, (DiskShare) share,
getServiceLocation());
}
- private void unregisterHost() {
- smbClient.getServerList().unregister(hostname);
- }
-
- private void closeConnection(final Connection connection) {
- try {
- if (connection != null) {
- connection.close(true);
- }
- } catch (Exception e) {
- getLogger().error("Could not close connection to {}",
getServiceLocation(), e);
- }
- }
-
private void closeSession(final Session session) {
try {
if (session != null) {
@@ -188,34 +189,6 @@ public class SmbjClientProviderService extends
AbstractControllerService impleme
}
}
- @Override
- public URI getServiceLocation() {
- return URI.create(String.format("smb://%s:%d/%s", hostname, port,
shareName));
- }
-
- @OnEnabled
- public void onEnabled(final ConfigurationContext context) {
- this.hostname = context.getProperty(HOSTNAME).getValue();
- this.port = context.getProperty(PORT).asInteger();
- this.shareName = context.getProperty(SHARE).getValue();
- this.smbClient = buildSmbClient(context);
- createAuthenticationContext(context);
- }
-
- @OnDisabled
- public void onDisabled() {
- smbClient.close();
- smbClient = null;
- hostname = null;
- port = 0;
- shareName = null;
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
-
private void createAuthenticationContext(final ConfigurationContext
context) {
if (context.getProperty(USERNAME).isSet()) {
final String userName = context.getProperty(USERNAME).getValue();
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
similarity index 99%
rename from
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java
rename to
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
index a030abc363..59278e54c5 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientIT.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
@@ -54,7 +54,7 @@ import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;
-public class NiFiSmbjClientIT {
+public class SmbjClientServiceIT {
private final static Logger sambaContainerLogger =
LoggerFactory.getLogger("sambaContainer");
private final static Logger toxyProxyLogger =
LoggerFactory.getLogger("toxiProxy");
@@ -62,7 +62,7 @@ public class NiFiSmbjClientIT {
private final Network network = Network.newNetwork();
private final GenericContainer<?> sambaContainer = new
GenericContainer<>(DockerImageName.parse("dperson/samba"))
- .withExposedPorts(139, 445)
+ .withExposedPorts(445)
.waitingFor(Wait.forListeningPort())
.withLogConsumer(new Slf4jLogConsumer(sambaContainerLogger))
.withNetwork(network)
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
similarity index 92%
rename from
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java
rename to
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
index f00b505bea..dba330fbc8 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/NiFiSmbjClientTest.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
@@ -28,7 +28,11 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-class NiFiSmbjClientTest {
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class SmbjClientServiceTest {
@Mock
Session session;
@@ -60,4 +64,4 @@ class NiFiSmbjClientTest {
}
-}
\ No newline at end of file
+}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbClient.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbClient.java
new file mode 100644
index 0000000000..92c173c10e
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbClient.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.smb.common;
+
+import com.hierynomus.mssmb2.messages.SMB2Echo;
+import com.hierynomus.smbj.SMBClient;
+import com.hierynomus.smbj.SmbConfig;
+import com.hierynomus.smbj.connection.Connection;
+import com.hierynomus.smbj.event.ConnectionClosed;
+import com.hierynomus.smbj.event.SMBEventBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Extends {@link com.hierynomus.smbj.SMBClient} with connection health check.
+ * <br/>
+ * Workaround to https://github.com/hierynomus/smbj/issues/796.
+ * <br/><br/>
+ * Health check method:
+ * <ul>
+ * <li>get connection from the parent class</li>
+ * <li>if it is a newly created connection, then return it</li>
+ * <li>if it is an old connection, send ECHO message to the server
+ * <ul>
+ * <li>if ECHO succeeds, return the connection</li>
+ * <li>if ECHO fails, unregister the connection, get connection again
(which creates a new one) and return it</li>
+ * </ul>
+ * </li>
+ * </ul>
+ */
+class SmbClient extends SMBClient {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SmbClient.class);
+
+ private SMBEventBus bus;
+
+ private SmbClient(final SmbConfig config, final SMBEventBus bus) {
+ super(config, bus);
+ }
+
+ static SmbClient create(final SmbConfig config) {
+ final SMBEventBus bus = new SMBEventBus();
+
+ final SmbClient client = new SmbClient(config, bus);
+
+ client.bus = bus;
+
+ return client;
+ }
+
+ public Connection connect(final String hostname) throws IOException {
+ return connect(hostname, DEFAULT_PORT);
+ }
+
+ public synchronized Connection connect(final String hostname, final int
port) throws IOException {
+ final Connection connection = super.connect(hostname, port);
+
+ try {
+ // SMB2 ECHO message can only be sent if this is not a new
connection (and health check is only needed in this case)
+ if (!connection.release()) {
+ connection.send(new
SMB2Echo(connection.getNegotiatedProtocol().getDialect())).get(10,
TimeUnit.SECONDS);
+ }
+
+ // set lease counter back
+ connection.lease();
+
+ return connection;
+ } catch (Exception e) {
+ LOGGER.info("Stale connection found, unregistering it and creating
a new one");
+ bus.publish(new ConnectionClosed(hostname, port));
+ }
+
+ return super.connect(hostname, port);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbProperties.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbProperties.java
index 5b474fcc94..b79d68790f 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbProperties.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbProperties.java
@@ -44,6 +44,15 @@ public class SmbProperties {
.defaultValue("false")
.build();
+ public static final PropertyDescriptor ENABLE_DFS = new
PropertyDescriptor.Builder()
+ .name("enable-dfs")
+ .displayName("Enable DFS")
+ .description("Enables accessing Distributed File System (DFS) and
following DFS links during SMB operations.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
public static final PropertyDescriptor TIMEOUT = new
PropertyDescriptor.Builder()
.displayName("Timeout")
.name("timeout")
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbUtils.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbUtils.java
index 0895abfae0..b705c5c38a 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbUtils.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-common/src/main/java/org/apache/nifi/smb/common/SmbUtils.java
@@ -21,6 +21,7 @@ import com.hierynomus.smbj.SmbConfig;
import org.apache.nifi.context.PropertyContext;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.nifi.smb.common.SmbProperties.ENABLE_DFS;
import static org.apache.nifi.smb.common.SmbProperties.SMB_DIALECT;
import static org.apache.nifi.smb.common.SmbProperties.TIMEOUT;
import static org.apache.nifi.smb.common.SmbProperties.USE_ENCRYPTION;
@@ -32,7 +33,7 @@ public final class SmbUtils {
}
public static SMBClient buildSmbClient(final PropertyContext context) {
- return new SMBClient(buildSmbConfig(context));
+ return SmbClient.create(buildSmbConfig(context));
}
static SmbConfig buildSmbConfig(final PropertyContext context) {
@@ -50,6 +51,10 @@ public final class SmbUtils {
configBuilder.withEncryptData(context.getProperty(USE_ENCRYPTION).asBoolean());
}
+ if (context.getProperty(ENABLE_DFS).isSet()) {
+
configBuilder.withDfsEnabled(context.getProperty(ENABLE_DFS).asBoolean());
+ }
+
if (context.getProperty(TIMEOUT).isSet()) {
configBuilder.withTimeout(context.getProperty(TIMEOUT).asTimePeriod(MILLISECONDS),
MILLISECONDS);
}