This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9f69ff233c NIFI-12231 FetchSmb supports Move and Delete Completion
Strategies
9f69ff233c is described below
commit 9f69ff233c1780c7bcf09ca67fbf68985a766c18
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Tue Apr 9 18:09:13 2024 +0200
NIFI-12231 FetchSmb supports Move and Delete Completion Strategies
Signed-off-by: Pierre Villard <[email protected]>
This closes #8617.
---
.../apache/nifi/services/smb/SmbClientService.java | 10 +-
.../org/apache/nifi/processors/smb/FetchSmb.java | 152 ++++++++++-----
.../org/apache/nifi/processors/smb/ListSmb.java | 2 +-
.../processors/smb/util/CompletionStrategy.java | 49 +++++
.../org/apache/nifi/processors/smb/FetchSmbIT.java | 212 +++++++++++++++++++--
.../apache/nifi/processors/smb/ListSmbTest.java | 4 +-
.../nifi/processors/smb/SambaTestContainers.java | 59 ++++--
.../nifi/services/smb/SmbjClientService.java | 113 +++++++----
.../nifi/services/smb/SmbjClientServiceIT.java | 2 +-
.../nifi/services/smb/SmbjClientServiceTest.java | 2 +-
10 files changed, 482 insertions(+), 123 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
index 9eb0e41a67..c70dea133e 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
@@ -25,9 +25,13 @@ import java.util.stream.Stream;
*/
public interface SmbClientService extends AutoCloseable {
- Stream<SmbListableEntity> listRemoteFiles(String path);
+ Stream<SmbListableEntity> listFiles(String directoryPath);
- void createDirectory(String path);
+ void ensureDirectory(String directoryPath);
- void readFile(String fileName, OutputStream outputStream) throws
IOException;
+ void readFile(String filePath, OutputStream outputStream) throws
IOException;
+
+ void moveFile(String filePath, String directoryPath);
+
+ void deleteFile(String filePath);
}
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
index c38216058d..b5f58b5890 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
@@ -16,16 +16,6 @@
*/
package org.apache.nifi.processors.smb;
-import static java.util.Arrays.asList;
-import static java.util.Collections.unmodifiableSet;
-import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
-import static
org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -33,17 +23,29 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.smb.util.CompletionStrategy;
import org.apache.nifi.services.smb.SmbClientProviderService;
import org.apache.nifi.services.smb.SmbClientService;
import org.apache.nifi.services.smb.SmbException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableSet;
+import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static
org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"samba", "smb", "cifs", "files", "fetch"})
@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in
tandem with ListSmb.")
@@ -57,8 +59,8 @@ public class FetchSmb extends AbstractProcessor {
public static final String ERROR_CODE_ATTRIBUTE = "error.code";
public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
- public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
- .Builder().name("remote-file")
+ public static final PropertyDescriptor REMOTE_FILE = new
PropertyDescriptor.Builder()
+ .name("remote-file")
.displayName("Remote File")
.description("The full path of the file to be retrieved from the
remote server. Expression language is supported.")
.required(true)
@@ -67,91 +69,139 @@ public class FetchSmb extends AbstractProcessor {
.addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build();
- public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new
Builder()
+ public static final PropertyDescriptor COMPLETION_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Completion Strategy")
+ .description("Specifies what to do with the original file on the
server once it has been processed. If the Completion Strategy fails, a warning
will be "
+ + "logged but the data will still be transferred.")
+ .allowableValues(CompletionStrategy.class)
+ .defaultValue(CompletionStrategy.NONE)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor DESTINATION_DIRECTORY = new
PropertyDescriptor.Builder()
+ .name("Destination Directory")
+ .description("The directory on the remote server to move the
original file to once it has been processed.")
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .dependsOn(COMPLETION_STRATEGY, CompletionStrategy.MOVE)
+ .build();
+
+ public static final PropertyDescriptor CREATE_DESTINATION_DIRECTORY = new
PropertyDescriptor.Builder()
+ .name("Create Destination Directory")
+ .description("Specifies whether or not the remote directory should
be created if it does not exist.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .dependsOn(COMPLETION_STRATEGY, CompletionStrategy.MOVE)
+ .build();
+
+ public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new
PropertyDescriptor.Builder()
.name("smb-client-provider-service")
.displayName("SMB Client Provider Service")
.description("Specifies the SMB client provider to use for
creating SMB connections.")
.required(true)
.identifiesControllerService(SmbClientProviderService.class)
.build();
+
public static final Relationship REL_SUCCESS =
new Relationship.Builder()
.name("success")
- .description("A flowfile will be routed here for each
successfully fetched file.")
+ .description("A FlowFile will be routed here for each
successfully fetched file.")
.build();
+
public static final Relationship REL_FAILURE =
- new Relationship.Builder().name("failure")
- .description(
- "A flowfile will be routed here when failed to
fetch its content.")
+ new Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile will be routed here when failed
to fetch its content.")
.build();
+
public static final Set<Relationship> RELATIONSHIPS = unmodifiableSet(new
HashSet<>(asList(
REL_SUCCESS,
REL_FAILURE
)));
- public static final String UNCATEGORIZED_ERROR = "-2";
+
private static final List<PropertyDescriptor> PROPERTIES = asList(
SMB_CLIENT_PROVIDER_SERVICE,
- REMOTE_FILE
+ REMOTE_FILE,
+ COMPLETION_STRATEGY,
+ DESTINATION_DIRECTORY,
+ CREATE_DESTINATION_DIRECTORY
);
+ public static final String UNCATEGORIZED_ERROR = "-2";
+
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
- public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
- final SmbClientProviderService clientProviderService =
-
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
-
- try (SmbClientService client = clientProviderService.getClient()) {
- fetchAndTransfer(session, context, client, flowFile);
- } catch (Exception e) {
- getLogger().error("Couldn't connect to SMB.", e);
- flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE,
getErrorCode(e));
- flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE,
e.getMessage());
- session.transfer(flowFile, REL_FAILURE);
- }
+ final Map<String, String> attributes = flowFile.getAttributes();
+ final String filePath =
context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(attributes).getValue();
- }
+ final SmbClientProviderService clientProviderService =
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
+ try (SmbClientService client = clientProviderService.getClient()) {
+ flowFile = session.write(flowFile, outputStream ->
client.readFile(filePath, outputStream));
- private void fetchAndTransfer(ProcessSession session, ProcessContext
context, SmbClientService client,
- FlowFile flowFile) {
- final Map<String, String> attributes = flowFile.getAttributes();
- final String filename = context.getProperty(REMOTE_FILE)
- .evaluateAttributeExpressions(attributes).getValue();
- try {
- flowFile = session.write(flowFile, outputStream ->
client.readFile(filename, outputStream));
session.transfer(flowFile, REL_SUCCESS);
} catch (Exception e) {
- getLogger().error("Couldn't fetch file {}.", filename, e);
+ getLogger().error("Could not fetch file {}.", filePath, e);
flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE,
getErrorCode(e));
- flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE,
getErrorMessage(e));
+ flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE,
e.getMessage());
session.transfer(flowFile, REL_FAILURE);
+ return;
}
+
+ session.commitAsync(() -> performCompletionStrategy(context,
attributes));
}
- private String getErrorCode(Exception exception) {
+ private String getErrorCode(final Exception exception) {
return Optional.ofNullable(exception instanceof SmbException ?
(SmbException) exception : null)
.map(SmbException::getErrorCode)
.map(String::valueOf)
.orElse(UNCATEGORIZED_ERROR);
}
- private String getErrorMessage(Exception exception) {
- return Optional.ofNullable(exception.getMessage())
- .orElse(exception.getClass().getSimpleName());
+ private void performCompletionStrategy(final ProcessContext context, final
Map<String, String> attributes) {
+ final CompletionStrategy completionStrategy =
context.getProperty(COMPLETION_STRATEGY).asAllowableValue(CompletionStrategy.class);
+
+ if (completionStrategy == CompletionStrategy.NONE) {
+ return;
+ }
+
+ final String filePath =
context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(attributes).getValue();
+
+ final SmbClientProviderService clientProviderService =
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+ try (SmbClientService client = clientProviderService.getClient()) {
+ if (completionStrategy == CompletionStrategy.MOVE) {
+ final String destinationDirectory =
context.getProperty(DESTINATION_DIRECTORY).evaluateAttributeExpressions(attributes).getValue();
+ final boolean createDestinationDirectory =
context.getProperty(CREATE_DESTINATION_DIRECTORY).asBoolean();
+
+ if (createDestinationDirectory) {
+ client.ensureDirectory(destinationDirectory);
+ }
+
+ client.moveFile(filePath, destinationDirectory);
+ } else if (completionStrategy == CompletionStrategy.DELETE) {
+ client.deleteFile(filePath);
+ }
+ } catch (Exception e) {
+ getLogger().warn("Could not perform completion strategy {} for
file {}", completionStrategy, filePath, e);
+ }
}
}
-
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
index 886f1bee46..091a5b176e 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
@@ -345,7 +345,7 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
final String directory = getDirectory(context);
final SmbClientService clientService =
clientProviderService.getClient();
- return clientService.listRemoteFiles(directory).onClose(() -> {
+ return clientService.listFiles(directory).onClose(() -> {
try {
clientService.close();
} catch (Exception e) {
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/CompletionStrategy.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/CompletionStrategy.java
new file mode 100644
index 0000000000..cf3dd0fcbf
--- /dev/null
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/CompletionStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb.util;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum CompletionStrategy implements DescribedValue {
+
+ NONE("None", "Leaves the file as-is."),
+ MOVE("Move File", "Moves the file to the specified directory on the remote
system. This option cannot be used when DFS is enabled on 'SMB Client Provider
Service'."),
+ DELETE("Delete File", "Deletes the file from the remote system.");
+
+ private final String displayName;
+ private final String description;
+
+ CompletionStrategy(String displayName, String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
index fb057791a8..a1ad08db0b 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
@@ -16,52 +16,226 @@
*/
package org.apache.nifi.processors.smb;
+import static org.apache.nifi.processors.smb.FetchSmb.COMPLETION_STRATEGY;
+import static
org.apache.nifi.processors.smb.FetchSmb.CREATE_DESTINATION_DIRECTORY;
+import static org.apache.nifi.processors.smb.FetchSmb.DESTINATION_DIRECTORY;
import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE;
import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS;
import static org.apache.nifi.processors.smb.FetchSmb.REMOTE_FILE;
import static org.apache.nifi.util.TestRunners.newTestRunner;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+
+import org.apache.nifi.processors.smb.util.CompletionStrategy;
import org.apache.nifi.services.smb.SmbjClientProviderService;
import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-public class FetchSmbIT extends SambaTestContainers {
+class FetchSmbIT extends SambaTestContainers {
+
+ private static final String TEST_CONTENT = "test_content";
+
+ private TestRunner testRunner;
+
+ private SmbjClientProviderService smbjClientProviderService;
+
+ @BeforeEach
+ void setUpComponents() throws Exception {
+ testRunner = newTestRunner(FetchSmb.class);
+
+ smbjClientProviderService = configureSmbClient(testRunner, true);
+ }
+
+ @AfterEach
+ void tearDownComponents() {
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
@Test
- public void fetchFilesUsingEL() throws Exception {
- writeFile("/test_file", "test_content");
- TestRunner testRunner = newTestRunner(FetchSmb.class);
+ void fetchFilesUsingEL() {
+ writeFile("test_file", TEST_CONTENT);
+
testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
- final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
- Map<String, String> attributes = new HashMap<>();
+ final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute_to_find_using_EL", "test_file");
- testRunner.enqueue("ignored", attributes);
- testRunner.run();
- testRunner.assertTransferCount(REL_SUCCESS, 1);
- assertEquals("test_content",
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent());
- testRunner.assertValid();
- testRunner.disableControllerService(smbjClientProviderService);
+ runProcessor(attributes);
+
+ assertSuccessFlowFile();
}
@Test
- public void tryToFetchNonExistingFileEmitsFailure() throws Exception {
- TestRunner testRunner = newTestRunner(FetchSmb.class);
+ void tryToFetchNonExistingFileEmitsFailure() throws Exception {
testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
- final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
- Map<String, String> attributes = new HashMap<>();
+ final Map<String, String> attributes = new HashMap<>();
attributes.put("attribute_to_find_using_EL", "non_existing_file");
+ runProcessor(attributes);
+
+ testRunner.assertTransferCount(REL_FAILURE, 1);
+ }
+
+ @Test
+ void testCompletionStrategyNone() {
+ final String baseDir = "dir_none";
+ final String filename = "test_file";
+ final String filePath = baseDir + "/" + filename;
+
+ createDirectory(baseDir, AccessMode.READ_ONLY);
+ writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY);
+
+ testRunner.setProperty(REMOTE_FILE, filePath);
+ testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.NONE);
+
+ runProcessor();
+
+ assertSuccessFlowFile();
+ assertNoWarning();
+
+ assertTrue(fileExists(filePath));
+ }
+
+ @Test
+ void testCompletionStrategyDelete() {
+ final String baseDir = "dir_delete";
+ final String filename = "test_file";
+ final String filePath = baseDir + "/" + filename;
+
+ createDirectory(baseDir, AccessMode.READ_WRITE);
+ writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE);
+
+ testRunner.setProperty(REMOTE_FILE, filePath);
+ testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.DELETE);
+
+ runProcessor();
+
+ assertSuccessFlowFile();
+ assertNoWarning();
+
+ assertFalse(fileExists(filePath));
+ }
+
+ @Test
+ void testCompletionStrategyMoveWithExistingDirectory() {
+ final String baseDir = "dir_move_existing";
+ final String filename = "test_file";
+ final String filePath = baseDir + "/" + filename;
+ final String processedDir = "processed";
+
+ createDirectory(baseDir, AccessMode.READ_WRITE);
+ writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE);
+ createDirectory(processedDir, AccessMode.READ_WRITE);
+
+ testRunner.setProperty(REMOTE_FILE, filePath);
+ testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE);
+ testRunner.setProperty(DESTINATION_DIRECTORY, processedDir);
+
+ runProcessor();
+
+ assertSuccessFlowFile();
+ assertNoWarning();
+
+ assertFalse(fileExists(filePath));
+ assertTrue(fileExists(processedDir + "/" + filename));
+ }
+
+ @Test
+ void testCompletionStrategyMoveWithCreatingDirectory() {
+ final String baseDir = "dir_move_creating";
+ final String filename = "test_file";
+ final String filePath = baseDir + "/" + filename;
+ final String processedDir = "processed";
+
+ createDirectory(baseDir, AccessMode.READ_WRITE);
+ writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE);
+
+ testRunner.setProperty(REMOTE_FILE, filePath);
+ testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE);
+ testRunner.setProperty(DESTINATION_DIRECTORY, processedDir);
+ testRunner.setProperty(CREATE_DESTINATION_DIRECTORY, "true");
+
+ runProcessor();
+
+ assertSuccessFlowFile();
+ assertNoWarning();
+
+ assertFalse(fileExists(filePath));
+ assertTrue(fileExists(processedDir + "/" + filename));
+ }
+
+ @Test
+ void testCompletionStrategyDeleteFailsWhenNoPermission() {
+ final String baseDir = "dir_delete_noperm";
+ final String filename = "test_file";
+ final String filePath = baseDir + "/" + filename;
+
+ createDirectory(baseDir, AccessMode.READ_ONLY);
+ writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY);
+
+ testRunner.setProperty(REMOTE_FILE, filePath);
+ testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.DELETE);
+
+ runProcessor();
+
+ assertSuccessFlowFile();
+ assertWarning();
+
+ assertTrue(fileExists(filePath));
+ }
+
+ @Test
+ void testCompletionStrategyMoveFailsWhenNoPermission() {
+ final String baseDir = "dir_move_noperm";
+ final String filename = "test_file";
+ final String filePath = baseDir + "/" + filename;
+ final String processedDir = "processed";
+
+ createDirectory(baseDir, AccessMode.READ_ONLY);
+ writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY);
+ createDirectory(processedDir, AccessMode.READ_ONLY);
+
+ testRunner.setProperty(REMOTE_FILE, filePath);
+ testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE);
+ testRunner.setProperty(DESTINATION_DIRECTORY, processedDir);
+
+ runProcessor();
+
+ assertSuccessFlowFile();
+ assertWarning();
+
+ assertTrue(fileExists(filePath));
+ assertFalse(fileExists(processedDir + "/" + filename));
+ }
+
+ private void runProcessor() {
+ runProcessor(Collections.emptyMap());
+ }
+
+ private void runProcessor(final Map<String, String> attributes) {
testRunner.enqueue("ignored", attributes);
testRunner.run();
- testRunner.assertTransferCount(REL_FAILURE, 1);
- testRunner.assertValid();
- testRunner.disableControllerService(smbjClientProviderService);
+ }
+
+ private void assertSuccessFlowFile() {
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ assertEquals(TEST_CONTENT,
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent());
+ }
+
+ private void assertWarning() {
+ assertFalse(testRunner.getLogger().getWarnMessages().isEmpty());
+ }
+
+ private void assertNoWarning() {
+ assertTrue(testRunner.getLogger().getWarnMessages().isEmpty());
}
}
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
index 0594ef3e2f..621ec74209 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
@@ -232,7 +232,7 @@ class ListSmbTest {
testRunner.setProperty(LISTING_STRATEGY, "timestamps");
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
-
when(mockNifiSmbClientService.listRemoteFiles(anyString())).thenThrow(new
RuntimeException("test exception"));
+ when(mockNifiSmbClientService.listFiles(anyString())).thenThrow(new
RuntimeException("test exception"));
testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size());
testRunner.assertValid();
@@ -282,7 +282,7 @@ class ListSmbTest {
}
private void mockSmbFolders(SmbClientService mockNifiSmbClientService,
SmbListableEntity... entities) {
- doAnswer(ignore ->
stream(entities)).when(mockNifiSmbClientService).listRemoteFiles(anyString());
+ doAnswer(ignore ->
stream(entities)).when(mockNifiSmbClientService).listFiles(anyString());
}
private SmbListableEntity listableEntity(String name, long timeStamp) {
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
index f38d36d171..51c283b85d 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
@@ -39,12 +39,19 @@ import org.testcontainers.utility.DockerImageName;
public class SambaTestContainers {
+ protected final static Logger LOGGER =
LoggerFactory.getLogger(SambaTestContainers.class);
+
protected final static Integer DEFAULT_SAMBA_PORT = 445;
- protected final static Logger logger =
LoggerFactory.getLogger(SambaTestContainers.class);
+
+ protected enum AccessMode {
+ READ_ONLY,READ_WRITE;
+ }
+
protected final GenericContainer<?> sambaContainer = new
GenericContainer<>(DockerImageName.parse("dperson/samba"))
+ .withCreateContainerCmdModifier(cmd -> cmd.withName("samba-test"))
.withExposedPorts(DEFAULT_SAMBA_PORT, 139)
.waitingFor(Wait.forListeningPort())
- .withLogConsumer(new Slf4jLogConsumer(logger))
+ .withLogConsumer(new Slf4jLogConsumer(LOGGER))
.withCommand("-w domain -u username;password -s
share;/folder;;no;no;username;;; -p");
@BeforeEach
@@ -57,33 +64,63 @@ public class SambaTestContainers {
sambaContainer.stop();
}
- protected SmbjClientProviderService configureSmbClient(TestRunner
testRunner, boolean shouldEnableSmbClient)
- throws Exception {
+ protected SmbjClientProviderService configureSmbClient(final TestRunner
testRunner, final boolean shouldEnableSmbClient) throws Exception {
final SmbjClientProviderService smbjClientProviderService = new
SmbjClientProviderService();
testRunner.addControllerService("client-provider",
smbjClientProviderService);
+
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider");
testRunner.setProperty(smbjClientProviderService, HOSTNAME,
sambaContainer.getHost());
- testRunner.setProperty(smbjClientProviderService, PORT,
-
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
+ testRunner.setProperty(smbjClientProviderService, PORT,
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
testRunner.setProperty(smbjClientProviderService, USERNAME,
"username");
testRunner.setProperty(smbjClientProviderService, PASSWORD,
"password");
testRunner.setProperty(smbjClientProviderService, SHARE, "share");
testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain");
+
if (shouldEnableSmbClient) {
testRunner.enableControllerService(smbjClientProviderService);
}
+
return smbjClientProviderService;
}
- protected String generateContentWithSize(int sizeInBytes) {
- byte[] bytes = new byte[sizeInBytes];
+ protected String generateContentWithSize(final int sizeInBytes) {
+ final byte[] bytes = new byte[sizeInBytes];
fill(bytes, (byte) 1);
return new String(bytes);
}
- protected void writeFile(String path, String content) {
- String containerPath = "/folder/" + path;
- sambaContainer.copyFileToContainer(Transferable.of(content),
containerPath);
+ protected void createDirectory(final String path) {
+ createDirectory(path, AccessMode.READ_ONLY);
+ }
+
+ protected void createDirectory(final String path, final AccessMode
accessMode) {
+ final String dirMode = accessMode == AccessMode.READ_ONLY ? "755" :
"777";
+ try {
+ sambaContainer.execInContainer("bash", "-c", "mkdir -m " + dirMode
+ " -p " + getContainerPath(path));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create directory", e);
+ }
+ }
+
+ protected void writeFile(final String path, final String content) {
+ writeFile(path, content, AccessMode.READ_ONLY);
+ }
+
+ protected void writeFile(final String path, final String content, final
AccessMode accessMode) {
+ final int fileMode = accessMode == AccessMode.READ_ONLY ? 0100644:
0100666;
+ sambaContainer.copyFileToContainer(Transferable.of(content, fileMode),
getContainerPath(path));
+ }
+
+ protected boolean fileExists(final String path) {
+ try {
+ return sambaContainer.execInContainer("bash", "-c", "cat " +
getContainerPath(path) + " > /dev/null").getExitCode() == 0;
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to check file", e);
+ }
+ }
+
+ private String getContainerPath(final String path) {
+ return "/folder/" + path;
}
}
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
index ae9307ea64..4e04bf85d1 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
@@ -20,6 +20,7 @@ import static java.util.Arrays.asList;
import static java.util.stream.StreamSupport.stream;
import com.hierynomus.msdtyp.AccessMask;
+import com.hierynomus.mserref.NtStatus;
import com.hierynomus.msfscc.FileAttributes;
import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation;
import com.hierynomus.mssmb2.SMB2CreateDisposition;
@@ -45,13 +46,13 @@ class SmbjClientService implements SmbClientService {
private final static Logger LOGGER =
LoggerFactory.getLogger(SmbjClientService.class);
private static final List<String> SPECIAL_DIRECTORIES = asList(".", "..");
- private static final long UNCATEGORISED_ERROR = -1L;
+ private static final long UNCATEGORIZED_ERROR = -1L;
private final Session session;
private final DiskShare share;
private final URI serviceLocation;
- SmbjClientService(Session session, DiskShare share, URI serviceLocation) {
+ SmbjClientService(final Session session, final DiskShare share, final URI
serviceLocation) {
this.session = session;
this.share = share;
this.serviceLocation = serviceLocation;
@@ -69,50 +70,89 @@ class SmbjClientService implements SmbClientService {
}
@Override
- public Stream<SmbListableEntity> listRemoteFiles(String filePath) {
- return Stream.of(filePath).flatMap(path -> {
+ public Stream<SmbListableEntity> listFiles(final String directoryPath) {
+ return Stream.of(directoryPath).flatMap(path -> {
final Directory directory = openDirectory(path);
return stream(directory::spliterator, 0, false)
.map(entity -> buildSmbListableEntity(entity, path,
serviceLocation))
.filter(entity -> !specialDirectory(entity))
- .flatMap(listable -> listable.isDirectory() ?
listRemoteFiles(listable.getPathWithName())
+ .flatMap(listable -> listable.isDirectory() ?
listFiles(listable.getPathWithName())
: Stream.of(listable))
.onClose(directory::close);
});
}
@Override
- public void createDirectory(String path) {
- final int lastDirectorySeparatorPosition = path.lastIndexOf("/");
- if (lastDirectorySeparatorPosition > 0) {
- createDirectory(path.substring(0, lastDirectorySeparatorPosition));
- }
- if (!share.folderExists(path)) {
- share.mkdir(path);
+ public void ensureDirectory(final String directoryPath) {
+ try {
+ final int lastDirectorySeparatorPosition =
directoryPath.lastIndexOf("/");
+ if (lastDirectorySeparatorPosition > 0) {
+ ensureDirectory(directoryPath.substring(0,
lastDirectorySeparatorPosition));
+ }
+
+ if (!share.folderExists(directoryPath)) {
+ try {
+ share.mkdir(directoryPath);
+ } catch (SMBApiException e) {
+ if (e.getStatus() ==
NtStatus.STATUS_OBJECT_NAME_COLLISION) {
+ if (!share.folderExists(directoryPath)) {
+ throw e;
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw wrapException(e);
}
}
@Override
- public void readFile(String fileName, OutputStream outputStream) throws
IOException {
- try (File f = share.openFile(
- fileName,
+ public void readFile(final String filePath, final OutputStream
outputStream) throws IOException {
+ try (File file = share.openFile(
+ filePath,
EnumSet.of(AccessMask.GENERIC_READ),
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
SMB2CreateDisposition.FILE_OPEN,
EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY))
) {
- f.read(outputStream);
- } catch (SMBApiException a) {
- throw new SmbException(a.getMessage(), a.getStatusCode(), a);
+ file.read(outputStream);
} catch (Exception e) {
- throw new SmbException(e.getMessage(), UNCATEGORISED_ERROR, e);
+ throw wrapException(e);
} finally {
outputStream.close();
}
}
- private SmbListableEntity
buildSmbListableEntity(FileIdBothDirectoryInformation info, String path, URI
serviceLocation) {
+ @Override
+ public void moveFile(final String filePath, final String directoryPath) {
+ try (File file = share.openFile(
+ filePath,
+ EnumSet.of(AccessMask.GENERIC_WRITE, AccessMask.DELETE),
+ EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
+ EnumSet.noneOf(SMB2ShareAccess.class),
+ SMB2CreateDisposition.FILE_OPEN,
+ EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY))
+ ) {
+ final String[] parts = filePath.split("/");
+ // rename operation on Windows requires \ (backslash) path
separator
+ final String newFilePath = directoryPath.replace('/', '\\') + "\\"
+ parts[parts.length - 1];
+ file.rename(newFilePath);
+ } catch (Exception e) {
+ throw wrapException(e);
+ }
+ }
+
+ @Override
+ public void deleteFile(final String filePath) {
+ try {
+ share.rm(filePath);
+ } catch (Exception e) {
+ throw wrapException(e);
+ }
+ }
+
+ private SmbListableEntity buildSmbListableEntity(final
FileIdBothDirectoryInformation info, final String path, final URI
serviceLocation) {
return SmbListableEntity.builder()
.setName(info.getFileName())
.setShortName(info.getShortName())
@@ -128,25 +168,30 @@ class SmbjClientService implements SmbClientService {
.build();
}
- private Directory openDirectory(String path) {
- try {
- return share.openDirectory(
- path,
- EnumSet.of(AccessMask.GENERIC_READ),
- EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY),
- EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
- SMB2CreateDisposition.FILE_OPEN,
- EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE)
- );
- } catch (SMBApiException s) {
- throw new RuntimeException("Could not open directory " + path + "
due to " + s.getMessage(), s);
- }
+ private Directory openDirectory(final String path) {
+ return share.openDirectory(
+ path,
+ EnumSet.of(AccessMask.GENERIC_READ),
+ EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY),
+ EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
+ SMB2CreateDisposition.FILE_OPEN,
+ EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE)
+ );
}
- private boolean specialDirectory(SmbListableEntity entity) {
+ private boolean specialDirectory(final SmbListableEntity entity) {
return SPECIAL_DIRECTORIES.contains(entity.getName());
}
+ private SmbException wrapException(final Exception e) {
+ if (e instanceof SmbException) {
+ return (SmbException) e;
+ } else {
+ final long errorCode = e instanceof SMBApiException ?
((SMBApiException) e).getStatusCode() : UNCATEGORIZED_ERROR;
+ return new SmbException(e.getMessage(), errorCode, e);
+ }
+ }
+
}
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
index 3540587445..56cd07803f 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
@@ -132,7 +132,7 @@ public class SmbjClientServiceIT {
sambaProxy.setConnectionCut(true);
}
- final Set<String> actual =
s.listRemoteFiles("testDirectory")
+ final Set<String> actual = s.listFiles("testDirectory")
.map(SmbListableEntity::getIdentifier)
.collect(toSet());
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
index 5d06b166a9..13fafdf58b 100644
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
+++
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
@@ -62,7 +62,7 @@ class SmbjClientServiceTest {
when(share.fileExists("to")).thenReturn(false);
when(share.fileExists("create")).thenReturn(false);
- underTest.createDirectory("directory/path/to/create");
+ underTest.ensureDirectory("directory/path/to/create");
verify(share).mkdir("directory/path");
verify(share).mkdir("directory/path/to");