This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 00d90e3f4a Revert "NIFI-12231 FetchSmb supports Move and Delete
Completion Strategies"
00d90e3f4a is described below
commit 00d90e3f4a71eac304efaa30faf6137c8e507798
Author: Joseph Witt <[email protected]>
AuthorDate: Mon May 6 11:37:56 2024 -0700
Revert "NIFI-12231 FetchSmb supports Move and Delete Completion Strategies"
This reverts commit d3a7549e8109774b06e79f13d570ef5da04c451f.
---
.../processors/smb/util/CompletionStrategy.java | 49 -----
.../apache/nifi/services/smb/SmbClientService.java | 10 +-
.../org/apache/nifi/processors/smb/FetchSmb.java | 152 +++++----------
.../org/apache/nifi/processors/smb/ListSmb.java | 2 +-
.../org/apache/nifi/processors/smb/FetchSmbIT.java | 212 ++-------------------
.../apache/nifi/processors/smb/ListSmbTest.java | 4 +-
.../nifi/processors/smb/SambaTestContainers.java | 59 ++----
.../nifi/services/smb/SmbjClientService.java | 113 ++++-------
.../nifi/services/smb/SmbjClientServiceIT.java | 2 +-
.../nifi/services/smb/SmbjClientServiceTest.java | 2 +-
10 files changed, 123 insertions(+), 482 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/CompletionStrategy.java
b/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/CompletionStrategy.java
deleted file mode 100644
index cf3dd0fcbf..0000000000
---
a/nifi-extension-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/util/CompletionStrategy.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.smb.util;
-
-import org.apache.nifi.components.DescribedValue;
-
-public enum CompletionStrategy implements DescribedValue {
-
- NONE("None", "Leaves the file as-is."),
- MOVE("Move File", "Moves the file to the specified directory on the remote
system. This option cannot be used when DFS is enabled on 'SMB Client Provider
Service'."),
- DELETE("Delete File", "Deletes the file from the remote system.");
-
- private final String displayName;
- private final String description;
-
- CompletionStrategy(String displayName, String description) {
- this.displayName = displayName;
- this.description = description;
- }
-
- @Override
- public String getValue() {
- return name();
- }
-
- @Override
- public String getDisplayName() {
- return displayName;
- }
-
- @Override
- public String getDescription() {
- return description;
- }
-}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
index c70dea133e..9eb0e41a67 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
@@ -25,13 +25,9 @@ import java.util.stream.Stream;
*/
public interface SmbClientService extends AutoCloseable {
- Stream<SmbListableEntity> listFiles(String directoryPath);
+ Stream<SmbListableEntity> listRemoteFiles(String path);
- void ensureDirectory(String directoryPath);
+ void createDirectory(String path);
- void readFile(String filePath, OutputStream outputStream) throws
IOException;
-
- void moveFile(String filePath, String directoryPath);
-
- void deleteFile(String filePath);
+ void readFile(String fileName, OutputStream outputStream) throws
IOException;
}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
index b5f58b5890..c38216058d 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
@@ -16,6 +16,16 @@
*/
package org.apache.nifi.processors.smb;
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableSet;
+import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static
org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -23,29 +33,17 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.smb.util.CompletionStrategy;
import org.apache.nifi.services.smb.SmbClientProviderService;
import org.apache.nifi.services.smb.SmbClientService;
import org.apache.nifi.services.smb.SmbException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-import static java.util.Arrays.asList;
-import static java.util.Collections.unmodifiableSet;
-import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
-import static
org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
-
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"samba", "smb", "cifs", "files", "fetch"})
@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in
tandem with ListSmb.")
@@ -59,8 +57,8 @@ public class FetchSmb extends AbstractProcessor {
public static final String ERROR_CODE_ATTRIBUTE = "error.code";
public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
- public static final PropertyDescriptor REMOTE_FILE = new
PropertyDescriptor.Builder()
- .name("remote-file")
+ public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+ .Builder().name("remote-file")
.displayName("Remote File")
.description("The full path of the file to be retrieved from the
remote server. Expression language is supported.")
.required(true)
@@ -69,139 +67,91 @@ public class FetchSmb extends AbstractProcessor {
.addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build();
- public static final PropertyDescriptor COMPLETION_STRATEGY = new
PropertyDescriptor.Builder()
- .name("Completion Strategy")
- .description("Specifies what to do with the original file on the
server once it has been processed. If the Completion Strategy fails, a warning
will be "
- + "logged but the data will still be transferred.")
- .allowableValues(CompletionStrategy.class)
- .defaultValue(CompletionStrategy.NONE)
- .required(true)
- .build();
-
- public static final PropertyDescriptor DESTINATION_DIRECTORY = new
PropertyDescriptor.Builder()
- .name("Destination Directory")
- .description("The directory on the remote server to move the
original file to once it has been processed.")
- .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .required(true)
- .dependsOn(COMPLETION_STRATEGY, CompletionStrategy.MOVE)
- .build();
-
- public static final PropertyDescriptor CREATE_DESTINATION_DIRECTORY = new
PropertyDescriptor.Builder()
- .name("Create Destination Directory")
- .description("Specifies whether or not the remote directory should
be created if it does not exist.")
- .required(true)
- .allowableValues("true", "false")
- .defaultValue("false")
- .dependsOn(COMPLETION_STRATEGY, CompletionStrategy.MOVE)
- .build();
-
- public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new
PropertyDescriptor.Builder()
+ public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new
Builder()
.name("smb-client-provider-service")
.displayName("SMB Client Provider Service")
.description("Specifies the SMB client provider to use for
creating SMB connections.")
.required(true)
.identifiesControllerService(SmbClientProviderService.class)
.build();
-
public static final Relationship REL_SUCCESS =
new Relationship.Builder()
.name("success")
- .description("A FlowFile will be routed here for each
successfully fetched file.")
+ .description("A flowfile will be routed here for each
successfully fetched file.")
.build();
-
public static final Relationship REL_FAILURE =
- new Relationship.Builder()
- .name("failure")
- .description("A FlowFile will be routed here when failed
to fetch its content.")
+ new Relationship.Builder().name("failure")
+ .description(
+ "A flowfile will be routed here when failed to
fetch its content.")
.build();
-
public static final Set<Relationship> RELATIONSHIPS = unmodifiableSet(new
HashSet<>(asList(
REL_SUCCESS,
REL_FAILURE
)));
-
+ public static final String UNCATEGORIZED_ERROR = "-2";
private static final List<PropertyDescriptor> PROPERTIES = asList(
SMB_CLIENT_PROVIDER_SERVICE,
- REMOTE_FILE,
- COMPLETION_STRATEGY,
- DESTINATION_DIRECTORY,
- CREATE_DESTINATION_DIRECTORY
+ REMOTE_FILE
);
- public static final String UNCATEGORIZED_ERROR = "-2";
-
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
- final Map<String, String> attributes = flowFile.getAttributes();
- final String filePath =
context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(attributes).getValue();
-
- final SmbClientProviderService clientProviderService =
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+ final SmbClientProviderService clientProviderService =
+
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
try (SmbClientService client = clientProviderService.getClient()) {
- flowFile = session.write(flowFile, outputStream ->
client.readFile(filePath, outputStream));
-
- session.transfer(flowFile, REL_SUCCESS);
+ fetchAndTransfer(session, context, client, flowFile);
} catch (Exception e) {
- getLogger().error("Could not fetch file {}.", filePath, e);
+ getLogger().error("Couldn't connect to SMB.", e);
flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE,
getErrorCode(e));
flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE,
e.getMessage());
session.transfer(flowFile, REL_FAILURE);
- return;
}
- session.commitAsync(() -> performCompletionStrategy(context,
attributes));
}
- private String getErrorCode(final Exception exception) {
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ private void fetchAndTransfer(ProcessSession session, ProcessContext
context, SmbClientService client,
+ FlowFile flowFile) {
+ final Map<String, String> attributes = flowFile.getAttributes();
+ final String filename = context.getProperty(REMOTE_FILE)
+ .evaluateAttributeExpressions(attributes).getValue();
+ try {
+ flowFile = session.write(flowFile, outputStream ->
client.readFile(filename, outputStream));
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (Exception e) {
+ getLogger().error("Couldn't fetch file {}.", filename, e);
+ flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE,
getErrorCode(e));
+ flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE,
getErrorMessage(e));
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
+ private String getErrorCode(Exception exception) {
return Optional.ofNullable(exception instanceof SmbException ?
(SmbException) exception : null)
.map(SmbException::getErrorCode)
.map(String::valueOf)
.orElse(UNCATEGORIZED_ERROR);
}
- private void performCompletionStrategy(final ProcessContext context, final
Map<String, String> attributes) {
- final CompletionStrategy completionStrategy =
context.getProperty(COMPLETION_STRATEGY).asAllowableValue(CompletionStrategy.class);
-
- if (completionStrategy == CompletionStrategy.NONE) {
- return;
- }
-
- final String filePath =
context.getProperty(REMOTE_FILE).evaluateAttributeExpressions(attributes).getValue();
-
- final SmbClientProviderService clientProviderService =
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
-
- try (SmbClientService client = clientProviderService.getClient()) {
- if (completionStrategy == CompletionStrategy.MOVE) {
- final String destinationDirectory =
context.getProperty(DESTINATION_DIRECTORY).evaluateAttributeExpressions(attributes).getValue();
- final boolean createDestinationDirectory =
context.getProperty(CREATE_DESTINATION_DIRECTORY).asBoolean();
-
- if (createDestinationDirectory) {
- client.ensureDirectory(destinationDirectory);
- }
-
- client.moveFile(filePath, destinationDirectory);
- } else if (completionStrategy == CompletionStrategy.DELETE) {
- client.deleteFile(filePath);
- }
- } catch (Exception e) {
- getLogger().warn("Could not perform completion strategy {} for
file {}", completionStrategy, filePath, e);
- }
+ private String getErrorMessage(Exception exception) {
+ return Optional.ofNullable(exception.getMessage())
+ .orElse(exception.getClass().getSimpleName());
}
}
+
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
index 091a5b176e..886f1bee46 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
@@ -345,7 +345,7 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
final String directory = getDirectory(context);
final SmbClientService clientService =
clientProviderService.getClient();
- return clientService.listFiles(directory).onClose(() -> {
+ return clientService.listRemoteFiles(directory).onClose(() -> {
try {
clientService.close();
} catch (Exception e) {
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
index a1ad08db0b..fb057791a8 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
@@ -16,226 +16,52 @@
*/
package org.apache.nifi.processors.smb;
-import static org.apache.nifi.processors.smb.FetchSmb.COMPLETION_STRATEGY;
-import static
org.apache.nifi.processors.smb.FetchSmb.CREATE_DESTINATION_DIRECTORY;
-import static org.apache.nifi.processors.smb.FetchSmb.DESTINATION_DIRECTORY;
import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE;
import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS;
import static org.apache.nifi.processors.smb.FetchSmb.REMOTE_FILE;
import static org.apache.nifi.util.TestRunners.newTestRunner;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-
-import org.apache.nifi.processors.smb.util.CompletionStrategy;
import org.apache.nifi.services.smb.SmbjClientProviderService;
import org.apache.nifi.util.TestRunner;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-class FetchSmbIT extends SambaTestContainers {
-
- private static final String TEST_CONTENT = "test_content";
-
- private TestRunner testRunner;
-
- private SmbjClientProviderService smbjClientProviderService;
-
- @BeforeEach
- void setUpComponents() throws Exception {
- testRunner = newTestRunner(FetchSmb.class);
-
- smbjClientProviderService = configureSmbClient(testRunner, true);
- }
-
- @AfterEach
- void tearDownComponents() {
- testRunner.disableControllerService(smbjClientProviderService);
- }
+public class FetchSmbIT extends SambaTestContainers {
@Test
- void fetchFilesUsingEL() {
- writeFile("test_file", TEST_CONTENT);
-
+ public void fetchFilesUsingEL() throws Exception {
+ writeFile("/test_file", "test_content");
+ TestRunner testRunner = newTestRunner(FetchSmb.class);
testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
+ final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
- final Map<String, String> attributes = new HashMap<>();
+ Map<String, String> attributes = new HashMap<>();
attributes.put("attribute_to_find_using_EL", "test_file");
- runProcessor(attributes);
-
- assertSuccessFlowFile();
+ testRunner.enqueue("ignored", attributes);
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ assertEquals("test_content",
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent());
+ testRunner.assertValid();
+ testRunner.disableControllerService(smbjClientProviderService);
}
@Test
- void tryToFetchNonExistingFileEmitsFailure() throws Exception {
+ public void tryToFetchNonExistingFileEmitsFailure() throws Exception {
+ TestRunner testRunner = newTestRunner(FetchSmb.class);
testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
+ final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
- final Map<String, String> attributes = new HashMap<>();
+ Map<String, String> attributes = new HashMap<>();
attributes.put("attribute_to_find_using_EL", "non_existing_file");
- runProcessor(attributes);
-
- testRunner.assertTransferCount(REL_FAILURE, 1);
- }
-
- @Test
- void testCompletionStrategyNone() {
- final String baseDir = "dir_none";
- final String filename = "test_file";
- final String filePath = baseDir + "/" + filename;
-
- createDirectory(baseDir, AccessMode.READ_ONLY);
- writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY);
-
- testRunner.setProperty(REMOTE_FILE, filePath);
- testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.NONE);
-
- runProcessor();
-
- assertSuccessFlowFile();
- assertNoWarning();
-
- assertTrue(fileExists(filePath));
- }
-
- @Test
- void testCompletionStrategyDelete() {
- final String baseDir = "dir_delete";
- final String filename = "test_file";
- final String filePath = baseDir + "/" + filename;
-
- createDirectory(baseDir, AccessMode.READ_WRITE);
- writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE);
-
- testRunner.setProperty(REMOTE_FILE, filePath);
- testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.DELETE);
-
- runProcessor();
-
- assertSuccessFlowFile();
- assertNoWarning();
-
- assertFalse(fileExists(filePath));
- }
-
- @Test
- void testCompletionStrategyMoveWithExistingDirectory() {
- final String baseDir = "dir_move_existing";
- final String filename = "test_file";
- final String filePath = baseDir + "/" + filename;
- final String processedDir = "processed";
-
- createDirectory(baseDir, AccessMode.READ_WRITE);
- writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE);
- createDirectory(processedDir, AccessMode.READ_WRITE);
-
- testRunner.setProperty(REMOTE_FILE, filePath);
- testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE);
- testRunner.setProperty(DESTINATION_DIRECTORY, processedDir);
-
- runProcessor();
-
- assertSuccessFlowFile();
- assertNoWarning();
-
- assertFalse(fileExists(filePath));
- assertTrue(fileExists(processedDir + "/" + filename));
- }
-
- @Test
- void testCompletionStrategyMoveWithCreatingDirectory() {
- final String baseDir = "dir_move_creating";
- final String filename = "test_file";
- final String filePath = baseDir + "/" + filename;
- final String processedDir = "processed";
-
- createDirectory(baseDir, AccessMode.READ_WRITE);
- writeFile(filePath, TEST_CONTENT, AccessMode.READ_WRITE);
-
- testRunner.setProperty(REMOTE_FILE, filePath);
- testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE);
- testRunner.setProperty(DESTINATION_DIRECTORY, processedDir);
- testRunner.setProperty(CREATE_DESTINATION_DIRECTORY, "true");
-
- runProcessor();
-
- assertSuccessFlowFile();
- assertNoWarning();
-
- assertFalse(fileExists(filePath));
- assertTrue(fileExists(processedDir + "/" + filename));
- }
-
- @Test
- void testCompletionStrategyDeleteFailsWhenNoPermission() {
- final String baseDir = "dir_delete_noperm";
- final String filename = "test_file";
- final String filePath = baseDir + "/" + filename;
-
- createDirectory(baseDir, AccessMode.READ_ONLY);
- writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY);
-
- testRunner.setProperty(REMOTE_FILE, filePath);
- testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.DELETE);
-
- runProcessor();
-
- assertSuccessFlowFile();
- assertWarning();
-
- assertTrue(fileExists(filePath));
- }
-
- @Test
- void testCompletionStrategyMoveFailsWhenNoPermission() {
- final String baseDir = "dir_move_noperm";
- final String filename = "test_file";
- final String filePath = baseDir + "/" + filename;
- final String processedDir = "processed";
-
- createDirectory(baseDir, AccessMode.READ_ONLY);
- writeFile(filePath, TEST_CONTENT, AccessMode.READ_ONLY);
- createDirectory(processedDir, AccessMode.READ_ONLY);
-
- testRunner.setProperty(REMOTE_FILE, filePath);
- testRunner.setProperty(COMPLETION_STRATEGY, CompletionStrategy.MOVE);
- testRunner.setProperty(DESTINATION_DIRECTORY, processedDir);
-
- runProcessor();
-
- assertSuccessFlowFile();
- assertWarning();
-
- assertTrue(fileExists(filePath));
- assertFalse(fileExists(processedDir + "/" + filename));
- }
-
- private void runProcessor() {
- runProcessor(Collections.emptyMap());
- }
-
- private void runProcessor(final Map<String, String> attributes) {
testRunner.enqueue("ignored", attributes);
testRunner.run();
- }
-
- private void assertSuccessFlowFile() {
- testRunner.assertTransferCount(REL_SUCCESS, 1);
- assertEquals(TEST_CONTENT,
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent());
- }
-
- private void assertWarning() {
- assertFalse(testRunner.getLogger().getWarnMessages().isEmpty());
- }
-
- private void assertNoWarning() {
- assertTrue(testRunner.getLogger().getWarnMessages().isEmpty());
+ testRunner.assertTransferCount(REL_FAILURE, 1);
+ testRunner.assertValid();
+ testRunner.disableControllerService(smbjClientProviderService);
}
}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
index 621ec74209..0594ef3e2f 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
@@ -232,7 +232,7 @@ class ListSmbTest {
testRunner.setProperty(LISTING_STRATEGY, "timestamps");
testRunner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, "millis");
final SmbClientService mockNifiSmbClientService =
configureTestRunnerWithMockedSambaClient(testRunner);
- when(mockNifiSmbClientService.listFiles(anyString())).thenThrow(new
RuntimeException("test exception"));
+
when(mockNifiSmbClientService.listRemoteFiles(anyString())).thenThrow(new
RuntimeException("test exception"));
testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size());
testRunner.assertValid();
@@ -282,7 +282,7 @@ class ListSmbTest {
}
private void mockSmbFolders(SmbClientService mockNifiSmbClientService,
SmbListableEntity... entities) {
- doAnswer(ignore ->
stream(entities)).when(mockNifiSmbClientService).listFiles(anyString());
+ doAnswer(ignore ->
stream(entities)).when(mockNifiSmbClientService).listRemoteFiles(anyString());
}
private SmbListableEntity listableEntity(String name, long timeStamp) {
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
index 51c283b85d..f38d36d171 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
@@ -39,19 +39,12 @@ import org.testcontainers.utility.DockerImageName;
public class SambaTestContainers {
- protected final static Logger LOGGER =
LoggerFactory.getLogger(SambaTestContainers.class);
-
protected final static Integer DEFAULT_SAMBA_PORT = 445;
-
- protected enum AccessMode {
- READ_ONLY,READ_WRITE;
- }
-
+ protected final static Logger logger =
LoggerFactory.getLogger(SambaTestContainers.class);
protected final GenericContainer<?> sambaContainer = new
GenericContainer<>(DockerImageName.parse("dperson/samba"))
- .withCreateContainerCmdModifier(cmd -> cmd.withName("samba-test"))
.withExposedPorts(DEFAULT_SAMBA_PORT, 139)
.waitingFor(Wait.forListeningPort())
- .withLogConsumer(new Slf4jLogConsumer(LOGGER))
+ .withLogConsumer(new Slf4jLogConsumer(logger))
.withCommand("-w domain -u username;password -s
share;/folder;;no;no;username;;; -p");
@BeforeEach
@@ -64,63 +57,33 @@ public class SambaTestContainers {
sambaContainer.stop();
}
- protected SmbjClientProviderService configureSmbClient(final TestRunner
testRunner, final boolean shouldEnableSmbClient) throws Exception {
+ protected SmbjClientProviderService configureSmbClient(TestRunner
testRunner, boolean shouldEnableSmbClient)
+ throws Exception {
final SmbjClientProviderService smbjClientProviderService = new
SmbjClientProviderService();
testRunner.addControllerService("client-provider",
smbjClientProviderService);
-
testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider");
testRunner.setProperty(smbjClientProviderService, HOSTNAME,
sambaContainer.getHost());
- testRunner.setProperty(smbjClientProviderService, PORT,
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
+ testRunner.setProperty(smbjClientProviderService, PORT,
+
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
testRunner.setProperty(smbjClientProviderService, USERNAME,
"username");
testRunner.setProperty(smbjClientProviderService, PASSWORD,
"password");
testRunner.setProperty(smbjClientProviderService, SHARE, "share");
testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain");
-
if (shouldEnableSmbClient) {
testRunner.enableControllerService(smbjClientProviderService);
}
-
return smbjClientProviderService;
}
- protected String generateContentWithSize(final int sizeInBytes) {
- final byte[] bytes = new byte[sizeInBytes];
+ protected String generateContentWithSize(int sizeInBytes) {
+ byte[] bytes = new byte[sizeInBytes];
fill(bytes, (byte) 1);
return new String(bytes);
}
- protected void createDirectory(final String path) {
- createDirectory(path, AccessMode.READ_ONLY);
- }
-
- protected void createDirectory(final String path, final AccessMode
accessMode) {
- final String dirMode = accessMode == AccessMode.READ_ONLY ? "755" :
"777";
- try {
- sambaContainer.execInContainer("bash", "-c", "mkdir -m " + dirMode
+ " -p " + getContainerPath(path));
- } catch (Exception e) {
- throw new RuntimeException("Failed to create directory", e);
- }
- }
-
- protected void writeFile(final String path, final String content) {
- writeFile(path, content, AccessMode.READ_ONLY);
- }
-
- protected void writeFile(final String path, final String content, final
AccessMode accessMode) {
- final int fileMode = accessMode == AccessMode.READ_ONLY ? 0100644:
0100666;
- sambaContainer.copyFileToContainer(Transferable.of(content, fileMode),
getContainerPath(path));
- }
-
- protected boolean fileExists(final String path) {
- try {
- return sambaContainer.execInContainer("bash", "-c", "cat " +
getContainerPath(path) + " > /dev/null").getExitCode() == 0;
- } catch (Exception e) {
- throw new RuntimeException("Failed to check file", e);
- }
- }
-
- private String getContainerPath(final String path) {
- return "/folder/" + path;
+ protected void writeFile(String path, String content) {
+ String containerPath = "/folder/" + path;
+ sambaContainer.copyFileToContainer(Transferable.of(content),
containerPath);
}
}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
index 4e04bf85d1..ae9307ea64 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
@@ -20,7 +20,6 @@ import static java.util.Arrays.asList;
import static java.util.stream.StreamSupport.stream;
import com.hierynomus.msdtyp.AccessMask;
-import com.hierynomus.mserref.NtStatus;
import com.hierynomus.msfscc.FileAttributes;
import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation;
import com.hierynomus.mssmb2.SMB2CreateDisposition;
@@ -46,13 +45,13 @@ class SmbjClientService implements SmbClientService {
private final static Logger LOGGER =
LoggerFactory.getLogger(SmbjClientService.class);
private static final List<String> SPECIAL_DIRECTORIES = asList(".", "..");
- private static final long UNCATEGORIZED_ERROR = -1L;
+ private static final long UNCATEGORISED_ERROR = -1L;
private final Session session;
private final DiskShare share;
private final URI serviceLocation;
- SmbjClientService(final Session session, final DiskShare share, final URI
serviceLocation) {
+ SmbjClientService(Session session, DiskShare share, URI serviceLocation) {
this.session = session;
this.share = share;
this.serviceLocation = serviceLocation;
@@ -70,89 +69,50 @@ class SmbjClientService implements SmbClientService {
}
@Override
- public Stream<SmbListableEntity> listFiles(final String directoryPath) {
- return Stream.of(directoryPath).flatMap(path -> {
+ public Stream<SmbListableEntity> listRemoteFiles(String filePath) {
+ return Stream.of(filePath).flatMap(path -> {
final Directory directory = openDirectory(path);
return stream(directory::spliterator, 0, false)
.map(entity -> buildSmbListableEntity(entity, path,
serviceLocation))
.filter(entity -> !specialDirectory(entity))
- .flatMap(listable -> listable.isDirectory() ?
listFiles(listable.getPathWithName())
+ .flatMap(listable -> listable.isDirectory() ?
listRemoteFiles(listable.getPathWithName())
: Stream.of(listable))
.onClose(directory::close);
});
}
@Override
- public void ensureDirectory(final String directoryPath) {
- try {
- final int lastDirectorySeparatorPosition =
directoryPath.lastIndexOf("/");
- if (lastDirectorySeparatorPosition > 0) {
- ensureDirectory(directoryPath.substring(0,
lastDirectorySeparatorPosition));
- }
-
- if (!share.folderExists(directoryPath)) {
- try {
- share.mkdir(directoryPath);
- } catch (SMBApiException e) {
- if (e.getStatus() ==
NtStatus.STATUS_OBJECT_NAME_COLLISION) {
- if (!share.folderExists(directoryPath)) {
- throw e;
- }
- }
- }
- }
- } catch (Exception e) {
- throw wrapException(e);
+ public void createDirectory(String path) {
+ final int lastDirectorySeparatorPosition = path.lastIndexOf("/");
+ if (lastDirectorySeparatorPosition > 0) {
+ createDirectory(path.substring(0, lastDirectorySeparatorPosition));
+ }
+ if (!share.folderExists(path)) {
+ share.mkdir(path);
}
}
@Override
- public void readFile(final String filePath, final OutputStream
outputStream) throws IOException {
- try (File file = share.openFile(
- filePath,
+ public void readFile(String fileName, OutputStream outputStream) throws
IOException {
+ try (File f = share.openFile(
+ fileName,
EnumSet.of(AccessMask.GENERIC_READ),
EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
SMB2CreateDisposition.FILE_OPEN,
EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY))
) {
- file.read(outputStream);
+ f.read(outputStream);
+ } catch (SMBApiException a) {
+ throw new SmbException(a.getMessage(), a.getStatusCode(), a);
} catch (Exception e) {
- throw wrapException(e);
+ throw new SmbException(e.getMessage(), UNCATEGORISED_ERROR, e);
} finally {
outputStream.close();
}
}
- @Override
- public void moveFile(final String filePath, final String directoryPath) {
- try (File file = share.openFile(
- filePath,
- EnumSet.of(AccessMask.GENERIC_WRITE, AccessMask.DELETE),
- EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
- EnumSet.noneOf(SMB2ShareAccess.class),
- SMB2CreateDisposition.FILE_OPEN,
- EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY))
- ) {
- final String[] parts = filePath.split("/");
- // rename operation on Windows requires \ (backslash) path
separator
- final String newFilePath = directoryPath.replace('/', '\\') + "\\"
+ parts[parts.length - 1];
- file.rename(newFilePath);
- } catch (Exception e) {
- throw wrapException(e);
- }
- }
-
- @Override
- public void deleteFile(final String filePath) {
- try {
- share.rm(filePath);
- } catch (Exception e) {
- throw wrapException(e);
- }
- }
-
- private SmbListableEntity buildSmbListableEntity(final
FileIdBothDirectoryInformation info, final String path, final URI
serviceLocation) {
+ private SmbListableEntity
buildSmbListableEntity(FileIdBothDirectoryInformation info, String path, URI
serviceLocation) {
return SmbListableEntity.builder()
.setName(info.getFileName())
.setShortName(info.getShortName())
@@ -168,30 +128,25 @@ class SmbjClientService implements SmbClientService {
.build();
}
- private Directory openDirectory(final String path) {
- return share.openDirectory(
- path,
- EnumSet.of(AccessMask.GENERIC_READ),
- EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY),
- EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
- SMB2CreateDisposition.FILE_OPEN,
- EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE)
- );
+ private Directory openDirectory(String path) {
+ try {
+ return share.openDirectory(
+ path,
+ EnumSet.of(AccessMask.GENERIC_READ),
+ EnumSet.of(FileAttributes.FILE_ATTRIBUTE_DIRECTORY),
+ EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
+ SMB2CreateDisposition.FILE_OPEN,
+ EnumSet.of(SMB2CreateOptions.FILE_DIRECTORY_FILE)
+ );
+ } catch (SMBApiException s) {
+ throw new RuntimeException("Could not open directory " + path + "
due to " + s.getMessage(), s);
+ }
}
- private boolean specialDirectory(final SmbListableEntity entity) {
+ private boolean specialDirectory(SmbListableEntity entity) {
return SPECIAL_DIRECTORIES.contains(entity.getName());
}
- private SmbException wrapException(final Exception e) {
- if (e instanceof SmbException) {
- return (SmbException) e;
- } else {
- final long errorCode = e instanceof SMBApiException ?
((SMBApiException) e).getStatusCode() : UNCATEGORIZED_ERROR;
- return new SmbException(e.getMessage(), errorCode, e);
- }
- }
-
}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
index e671510a2e..59278e54c5 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceIT.java
@@ -132,7 +132,7 @@ public class SmbjClientServiceIT {
sambaProxy.setConnectionCut(true);
}
- final Set<String> actual = s.listFiles("testDirectory")
+ final Set<String> actual =
s.listRemoteFiles("testDirectory")
.map(SmbListableEntity::getIdentifier)
.collect(toSet());
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
index e30038684a..7193aad091 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/test/java/org/apache/nifi/services/smb/SmbjClientServiceTest.java
@@ -52,7 +52,7 @@ class SmbjClientServiceTest {
when(share.fileExists("to")).thenReturn(false);
when(share.fileExists("create")).thenReturn(false);
- underTest.ensureDirectory("directory/path/to/create");
+ underTest.createDirectory("directory/path/to/create");
verify(share).mkdir("directory/path");
verify(share).mkdir("directory/path/to");