This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5b565679df NIFI-10230 added FetchSmb
5b565679df is described below
commit 5b565679dfca1d1fb1134a9a3c819c11587fef40
Author: Gabor Kulik <[email protected]>
AuthorDate: Tue Aug 2 15:58:50 2022 +0200
NIFI-10230 added FetchSmb
This closes #6279.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../apache/nifi/services/smb/SmbClientService.java | 3 +
.../{SmbClientService.java => SmbException.java} | 16 ++-
.../nifi/services/smb/SmbListableEntity.java | 72 +++++++---
.../org/apache/nifi/processors/smb/FetchSmb.java | 157 +++++++++++++++++++++
.../org/apache/nifi/processors/smb/GetSmbFile.java | 2 +-
.../org/apache/nifi/processors/smb/ListSmb.java | 111 ++++++++-------
.../org/apache/nifi/processors/smb/PutSmbFile.java | 2 +-
.../services/org.apache.nifi.processor.Processor | 1 +
.../org/apache/nifi/processors/smb/FetchSmbIT.java | 67 +++++++++
.../apache/nifi/processors/smb/FetchSmbTest.java | 125 ++++++++++++++++
.../org/apache/nifi/processors/smb/ListSmbIT.java | 117 ++-------------
.../apache/nifi/processors/smb/ListSmbTest.java | 4 +-
.../nifi/processors/smb/SambaTestContainers.java | 89 ++++++++++++
.../services/smb/SmbjClientProviderService.java | 2 +-
.../nifi/services/smb/SmbjClientService.java | 35 ++++-
15 files changed, 610 insertions(+), 193 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
index 4ecaf9a507..9eb0e41a67 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.services.smb;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.stream.Stream;
/**
@@ -27,4 +29,5 @@ public interface SmbClientService extends AutoCloseable {
void createDirectory(String path);
+ void readFile(String fileName, OutputStream outputStream) throws
IOException;
}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbException.java
similarity index 74%
copy from
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
copy to
nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbException.java
index 4ecaf9a507..e076560943 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbClientService.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbException.java
@@ -16,15 +16,17 @@
*/
package org.apache.nifi.services.smb;
-import java.util.stream.Stream;
+public class SmbException extends RuntimeException {
-/**
- * Service abstraction for Server Message Block protocol operations.
- */
-public interface SmbClientService extends AutoCloseable {
+ private long errorCode;
- Stream<SmbListableEntity> listRemoteFiles(String path);
+ public SmbException(String message, long errorCode, Exception cause) {
+ super(message, cause);
+ this.errorCode = errorCode;
+ }
- void createDirectory(String path);
+ public long getErrorCode() {
+ return errorCode;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java
index da33602540..53862f6532 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-client-api/src/main/java/org/apache/nifi/services/smb/SmbListableEntity.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.services.smb;
+import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -29,30 +30,42 @@ import org.apache.nifi.serialization.record.RecordFieldType;
public class SmbListableEntity implements ListableEntity {
+ public static final String FILENAME = "filename";
+ public static final String SHORT_NAME = "shortName";
+ public static final String PATH = "path";
+ public static final String SERVICE_LOCATION = "serviceLocation";
+ public static final String CREATION_TIME = "creationTime";
+ public static final String LAST_ACCESS_TIME = "lastAccessTime";
+ public static final String CHANGE_TIME = "changeTime";
+ public static final String LAST_MODIFIED_TIME = "lastModifiedTime";
+ public static final String SIZE = "size";
+ public static final String ALLOCATION_SIZE = "allocationSize";
private final String name;
private final String shortName;
private final String path;
- private final long timestamp;
+ private final long lastModifiedTime;
private final long creationTime;
private final long lastAccessTime;
private final long changeTime;
private final boolean directory;
private final long size;
private final long allocationSize;
+ private final URI serviceLocation;
- private SmbListableEntity(String name, String shortName, String path, long
timestamp, long creationTime,
+ private SmbListableEntity(String name, String shortName, String path, long
lastModifiedTime, long creationTime,
long lastAccessTime, long changeTime, boolean directory,
- long size, long allocationSize) {
+ long size, long allocationSize, URI serviceLocation) {
this.name = name;
this.shortName = shortName;
this.path = path;
- this.timestamp = timestamp;
+ this.lastModifiedTime = lastModifiedTime;
this.creationTime = creationTime;
this.lastAccessTime = lastAccessTime;
this.changeTime = changeTime;
this.directory = directory;
this.size = size;
this.allocationSize = allocationSize;
+ this.serviceLocation = serviceLocation;
}
public static SimpleRecordSchema getRecordSchema() {
@@ -60,8 +73,8 @@ public class SmbListableEntity implements ListableEntity {
new RecordField("filename",
RecordFieldType.STRING.getDataType(), false),
new RecordField("shortName",
RecordFieldType.STRING.getDataType(), false),
new RecordField("path", RecordFieldType.STRING.getDataType(),
false),
- new RecordField("identifier",
RecordFieldType.STRING.getDataType(), false),
- new RecordField("timestamp",
RecordFieldType.LONG.getDataType(), false),
+ new RecordField("absolute.path",
RecordFieldType.STRING.getDataType(), false),
+ new RecordField("lastModifiedTime",
RecordFieldType.LONG.getDataType(), false),
new RecordField("creationTime",
RecordFieldType.LONG.getDataType(), false),
new RecordField("lastAccessTime",
RecordFieldType.LONG.getDataType(), false),
new RecordField("changeTime",
RecordFieldType.LONG.getDataType(), false),
@@ -108,6 +121,10 @@ public class SmbListableEntity implements ListableEntity {
return path.isEmpty() ? name : path + "/" + name;
}
+ public long getLastModifiedTime() {
+ return lastModifiedTime;
+ }
+
@Override
public String getIdentifier() {
return getPathWithName();
@@ -115,7 +132,7 @@ public class SmbListableEntity implements ListableEntity {
@Override
public long getTimestamp() {
- return timestamp;
+ return getLastModifiedTime();
}
@Override
@@ -146,36 +163,42 @@ public class SmbListableEntity implements ListableEntity {
@Override
public String toString() {
- return getPathWithName() + " (last write: " + timestamp + " size: " +
size + ")";
+ return getPathWithName() + " (last write: " + lastModifiedTime + "
size: " + size + ")";
}
@Override
public Record toRecord() {
final Map<String, Object> record = new TreeMap<>();
- record.put("filename", getName());
- record.put("shortName", getShortName());
- record.put("path", path);
- record.put("identifier", getPathWithName());
- record.put("timestamp", getTimestamp());
- record.put("creationTime", getCreationTime());
- record.put("lastAccessTime", getLastAccessTime());
- record.put("size", getSize());
- record.put("allocationSize", getAllocationSize());
+ record.put(FILENAME, getName());
+ record.put(SHORT_NAME, getShortName());
+ record.put(PATH, getPath());
+ record.put(SERVICE_LOCATION, getServiceLocation().toString());
+ record.put(CREATION_TIME, getCreationTime());
+ record.put(LAST_ACCESS_TIME, getLastAccessTime());
+ record.put(LAST_MODIFIED_TIME, getLastModifiedTime());
+ record.put(CHANGE_TIME, getChangeTime());
+ record.put(SIZE, getSize());
+ record.put(ALLOCATION_SIZE, getAllocationSize());
return new MapRecord(getRecordSchema(), record);
}
+ private URI getServiceLocation() {
+ return serviceLocation;
+ }
+
public static class SmbListableEntityBuilder {
private String name;
private String shortName;
private String path = "";
- private long timestamp;
+ private long lastModifiedTime;
private long creationTime;
private long lastAccessTime;
private long changeTime;
private boolean directory = false;
private long size = 0;
private long allocationSize = 0;
+ private URI serviceLocation;
public SmbListableEntityBuilder setName(String name) {
this.name = name;
@@ -192,8 +215,8 @@ public class SmbListableEntity implements ListableEntity {
return this;
}
- public SmbListableEntityBuilder setTimestamp(long timestamp) {
- this.timestamp = timestamp;
+ public SmbListableEntityBuilder setLastModifiedTime(long
lastModifiedTime) {
+ this.lastModifiedTime = lastModifiedTime;
return this;
}
@@ -227,9 +250,14 @@ public class SmbListableEntity implements ListableEntity {
return this;
}
+ public SmbListableEntityBuilder setServiceLocation(URI
serviceLocation) {
+ this.serviceLocation = serviceLocation;
+ return this;
+ }
+
public SmbListableEntity build() {
- return new SmbListableEntity(name, shortName, path, timestamp,
creationTime, lastAccessTime, changeTime,
- directory, size, allocationSize);
+ return new SmbListableEntity(name, shortName, path,
lastModifiedTime, creationTime, lastAccessTime, changeTime,
+ directory, size, allocationSize, serviceLocation);
}
}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
new file mode 100644
index 0000000000..c38216058d
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/FetchSmb.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableSet;
+import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static
org.apache.nifi.processor.util.StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"samba", "smb", "cifs", "files", "fetch"})
+@CapabilityDescription("Fetches files from a SMB Share. Designed to be used in
tandem with ListSmb.")
+@SeeAlso({ListSmb.class, PutSmbFile.class, GetSmbFile.class})
+@WritesAttributes({
+ @WritesAttribute(attribute = FetchSmb.ERROR_CODE_ATTRIBUTE,
description = "The error code returned by SMB when the fetch of a file fails."),
+ @WritesAttribute(attribute = FetchSmb.ERROR_MESSAGE_ATTRIBUTE,
description = "The error message returned by SMB when the fetch of a file
fails.")
+})
+public class FetchSmb extends AbstractProcessor {
+
+ public static final String ERROR_CODE_ATTRIBUTE = "error.code";
+ public static final String ERROR_MESSAGE_ATTRIBUTE = "error.message";
+
+ public static final PropertyDescriptor REMOTE_FILE = new PropertyDescriptor
+ .Builder().name("remote-file")
+ .displayName("Remote File")
+ .description("The full path of the file to be retrieved from the
remote server. Expression language is supported.")
+ .required(true)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .defaultValue("${path}/${filename}")
+ .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new
Builder()
+ .name("smb-client-provider-service")
+ .displayName("SMB Client Provider Service")
+ .description("Specifies the SMB client provider to use for
creating SMB connections.")
+ .required(true)
+ .identifiesControllerService(SmbClientProviderService.class)
+ .build();
+ public static final Relationship REL_SUCCESS =
+ new Relationship.Builder()
+ .name("success")
+ .description("A flowfile will be routed here for each
successfully fetched file.")
+ .build();
+ public static final Relationship REL_FAILURE =
+ new Relationship.Builder().name("failure")
+ .description(
+ "A flowfile will be routed here when failed to
fetch its content.")
+ .build();
+ public static final Set<Relationship> RELATIONSHIPS = unmodifiableSet(new
HashSet<>(asList(
+ REL_SUCCESS,
+ REL_FAILURE
+ )));
+ public static final String UNCATEGORIZED_ERROR = "-2";
+ private static final List<PropertyDescriptor> PROPERTIES = asList(
+ SMB_CLIENT_PROVIDER_SERVICE,
+ REMOTE_FILE
+ );
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final SmbClientProviderService clientProviderService =
+
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+
+ try (SmbClientService client = clientProviderService.getClient()) {
+ fetchAndTransfer(session, context, client, flowFile);
+ } catch (Exception e) {
+ getLogger().error("Couldn't connect to SMB.", e);
+ flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE,
getErrorCode(e));
+ flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE,
e.getMessage());
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ private void fetchAndTransfer(ProcessSession session, ProcessContext
context, SmbClientService client,
+ FlowFile flowFile) {
+ final Map<String, String> attributes = flowFile.getAttributes();
+ final String filename = context.getProperty(REMOTE_FILE)
+ .evaluateAttributeExpressions(attributes).getValue();
+ try {
+ flowFile = session.write(flowFile, outputStream ->
client.readFile(filename, outputStream));
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (Exception e) {
+ getLogger().error("Couldn't fetch file {}.", filename, e);
+ flowFile = session.putAttribute(flowFile, ERROR_CODE_ATTRIBUTE,
getErrorCode(e));
+ flowFile = session.putAttribute(flowFile, ERROR_MESSAGE_ATTRIBUTE,
getErrorMessage(e));
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+
+ private String getErrorCode(Exception exception) {
+ return Optional.ofNullable(exception instanceof SmbException ?
(SmbException) exception : null)
+ .map(SmbException::getErrorCode)
+ .map(String::valueOf)
+ .orElse(UNCATEGORIZED_ERROR);
+ }
+
+ private String getErrorMessage(Exception exception) {
+ return Optional.ofNullable(exception.getMessage())
+ .orElse(exception.getClass().getSimpleName());
+ }
+
+}
+
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
index 402e27173f..9ed9a3c969 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/GetSmbFile.java
@@ -85,7 +85,7 @@ import java.util.regex.Pattern;
@CapabilityDescription("Reads file from a samba network location to FlowFiles.
" +
"Use this processor instead of a cifs mounts if share access control is
important. " +
"Configure the Hostname, Share and Directory accordingly:
\\\\[Hostname]\\[Share]\\[path\\to\\Directory]")
-@SeeAlso({PutSmbFile.class})
+@SeeAlso({PutSmbFile.class, ListSmb.class, FetchSmb.class})
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "The filename
is set to the name of the file on the network share"),
@WritesAttribute(attribute = "path", description = "The path is set to
the relative path of the file's network share name. For example, "
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
index e0029144f6..f9eab575d4 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java
@@ -16,27 +16,37 @@
*/
package org.apache.nifi.processors.smb;
+import static java.time.ZoneOffset.UTC;
import static java.time.format.DateTimeFormatter.ISO_DATE_TIME;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.unmodifiableList;
import static java.util.Collections.unmodifiableMap;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.nifi.components.state.Scope.CLUSTER;
import static
org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
import static
org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR;
import static
org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR;
import static
org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR;
+import static org.apache.nifi.services.smb.SmbListableEntity.ALLOCATION_SIZE;
+import static org.apache.nifi.services.smb.SmbListableEntity.CHANGE_TIME;
+import static
org.apache.nifi.services.smb.SmbListableEntity.LAST_MODIFIED_TIME;
+import static org.apache.nifi.services.smb.SmbListableEntity.CREATION_TIME;
+import static org.apache.nifi.services.smb.SmbListableEntity.FILENAME;
+import static org.apache.nifi.services.smb.SmbListableEntity.LAST_ACCESS_TIME;
+import static org.apache.nifi.services.smb.SmbListableEntity.PATH;
+import static org.apache.nifi.services.smb.SmbListableEntity.SERVICE_LOCATION;
+import static org.apache.nifi.services.smb.SmbListableEntity.SHORT_NAME;
+import static org.apache.nifi.services.smb.SmbListableEntity.SIZE;
import java.io.IOException;
import java.net.URI;
import java.time.LocalDateTime;
-import java.time.ZoneOffset;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -69,7 +79,7 @@ import org.apache.nifi.services.smb.SmbListableEntity;
@PrimaryNodeOnly
@TriggerSerially
@Tags({"samba, smb, cifs, files", "list"})
-@SeeAlso({PutSmbFile.class, GetSmbFile.class})
+@SeeAlso({PutSmbFile.class, GetSmbFile.class, FetchSmb.class})
@CapabilityDescription("Lists concrete files shared via SMB protocol. " +
"Each listed file may result in one flowfile, the metadata being
written as flowfile attributes. " +
"Or - in case the 'Record Writer' property is set - the entire result
is written as records to a single flowfile. "
@@ -79,32 +89,26 @@ import org.apache.nifi.services.smb.SmbListableEntity;
"previous node left off without duplicating all of the data.")
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@WritesAttributes({
- @WritesAttribute(attribute = "filename", description = "The name of
the file that was read from filesystem."),
- @WritesAttribute(attribute = "shortname", description = "The short
name of the file that was read from filesystem."),
- @WritesAttribute(attribute = "path", description =
- "The path is set to the relative path of the file's directory "
- + "on filesystem compared to the Share and Input
Directory properties and the configured host "
- + "and port inherited from the configured connection
pool controller service. For example, for "
- + "a given remote location
smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listed from "
- + "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file
then the path attribute will be set to \"sub/folder/file\"."),
- @WritesAttribute(attribute = "absolute.path", description =
- "The absolute.path is set to the absolute path of the file's
directory on the remote location. For example, "
- + "given a remote location
smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is being listen from "
- + "SHARE/DIRECTORY/sub/folder/file then the
absolute.path attribute will be set to "
- + "SHARE/DIRECTORY/sub/folder/file."),
- @WritesAttribute(attribute = "identifier", description =
- "The identifier of the file. This equals to the path attribute
so two files with the same relative path "
- + "coming from different file shares considered to be
identical."),
- @WritesAttribute(attribute = "timestamp", description =
- "The timestamp of when the file's content changed in the
filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
- @WritesAttribute(attribute = "createTime", description =
- "The timestamp of when the file was created in the filesystem
as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
- @WritesAttribute(attribute = "lastAccessTime", description =
- "The timestamp of when the file was accessed in the filesystem
as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
- @WritesAttribute(attribute = "changeTime", description =
- "The timestamp of when the file's attributes was changed in
the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"),
- @WritesAttribute(attribute = "size", description = "The number of
bytes in the source file"),
- @WritesAttribute(attribute = "allocationSize", description = "The
number of bytes allocated for the file on the server"),
+ @WritesAttribute(attribute = FILENAME, description = "The name of the
file that was read from filesystem."),
+ @WritesAttribute(attribute = SHORT_NAME, description = "The short name
of the file that was read from filesystem."),
+ @WritesAttribute(attribute = PATH, description =
+ "The path is set to the relative path of the file's directory
on the remote filesystem compared to the "
+ + "Share root directory. For example, for a given
remote location"
+ + "smb://HOSTNAME:PORT/SHARE/DIRECTORY, and a file is
being listed from "
+ + "smb://HOSTNAME:PORT/SHARE/DIRECTORY/sub/folder/file
then the path attribute will be set to "
+ + "\"DIRECTORY/sub/folder/file\"."),
+ @WritesAttribute(attribute = SERVICE_LOCATION, description =
+ "The SMB URL of the share."),
+ @WritesAttribute(attribute = LAST_MODIFIED_TIME, description =
+ "The timestamp of when the file's content changed in the
filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."),
+ @WritesAttribute(attribute = CREATION_TIME, description =
+ "The timestamp of when the file was created in the filesystem
as 'yyyy-MM-dd'T'HH:mm:ss'."),
+ @WritesAttribute(attribute = LAST_ACCESS_TIME, description =
+ "The timestamp of when the file was accessed in the filesystem
as 'yyyy-MM-dd'T'HH:mm:ss'."),
+ @WritesAttribute(attribute = CHANGE_TIME, description =
+ "The timestamp of when the file's attributes was changed in
the filesystem as 'yyyy-MM-dd'T'HH:mm:ss'."),
+ @WritesAttribute(attribute = SIZE, description = "The size of the file
in bytes."),
+ @WritesAttribute(attribute = ALLOCATION_SIZE, description = "The
number of bytes allocated for the file on the server."),
})
@Stateful(scopes = {Scope.CLUSTER}, description =
"After performing a listing of files, the state of the previous
listing can be stored in order to list files "
@@ -137,7 +141,7 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
public static final PropertyDescriptor MAXIMUM_AGE = new
PropertyDescriptor.Builder()
.displayName("Maximum File Age")
.name("max-file-age")
- .description("Any file older than the given value will be omitted.
")
+ .description("Any file older than the given value will be
omitted.")
.required(false)
.addValidator(TIME_PERIOD_VALIDATOR)
.build();
@@ -184,11 +188,11 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
.build();
private static final List<PropertyDescriptor> PROPERTIES =
unmodifiableList(asList(
- SMB_LISTING_STRATEGY,
SMB_CLIENT_PROVIDER_SERVICE,
+ SMB_LISTING_STRATEGY,
DIRECTORY,
- AbstractListProcessor.RECORD_WRITER,
FILE_NAME_SUFFIX_FILTER,
+ AbstractListProcessor.RECORD_WRITER,
MINIMUM_AGE,
MAXIMUM_AGE,
MINIMUM_SIZE,
@@ -207,17 +211,18 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
@Override
protected Map<String, String> createAttributes(SmbListableEntity entity,
ProcessContext context) {
final Map<String, String> attributes = new TreeMap<>();
- attributes.put("filename", entity.getName());
- attributes.put("shortname", entity.getShortName());
- attributes.put("path", entity.getPath());
- attributes.put("absolute.path", entity.getPathWithName());
- attributes.put("identifier", entity.getIdentifier());
- attributes.put("timestamp", formatTimeStamp(entity.getTimestamp()));
- attributes.put("creationTime",
formatTimeStamp(entity.getCreationTime()));
- attributes.put("lastAccessTime",
formatTimeStamp(entity.getLastAccessTime()));
- attributes.put("changeTime", formatTimeStamp(entity.getChangeTime()));
- attributes.put("size", String.valueOf(entity.getSize()));
- attributes.put("allocationSize",
String.valueOf(entity.getAllocationSize()));
+ final SmbClientProviderService clientProviderService =
+
context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class);
+ attributes.put(FILENAME, entity.getName());
+ attributes.put(SHORT_NAME, entity.getShortName());
+ attributes.put(PATH, entity.getPath());
+ attributes.put(SERVICE_LOCATION,
clientProviderService.getServiceLocation().toString());
+ attributes.put(LAST_MODIFIED_TIME,
formatTimeStamp(entity.getLastModifiedTime()));
+ attributes.put(CREATION_TIME,
formatTimeStamp(entity.getCreationTime()));
+ attributes.put(LAST_ACCESS_TIME,
formatTimeStamp(entity.getLastAccessTime()));
+ attributes.put(CHANGE_TIME, formatTimeStamp(entity.getChangeTime()));
+ attributes.put(SIZE, String.valueOf(entity.getSize()));
+ attributes.put(ALLOCATION_SIZE,
String.valueOf(entity.getAllocationSize()));
return unmodifiableMap(attributes);
}
@@ -286,7 +291,7 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
private String formatTimeStamp(long timestamp) {
return ISO_DATE_TIME.format(
-
LocalDateTime.ofEpochSecond(TimeUnit.MILLISECONDS.toSeconds(timestamp), 0,
ZoneOffset.UTC));
+ LocalDateTime.ofEpochSecond(MILLISECONDS.toSeconds(timestamp),
0, UTC));
}
private boolean isExecutionStopped(ListingMode listingMode) {
@@ -295,9 +300,9 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
private Predicate<SmbListableEntity> createFileFilter(ProcessContext
context, Long minTimestampOrNull) {
- final Long minimumAge =
context.getProperty(MINIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+ final Long minimumAge =
context.getProperty(MINIMUM_AGE).asTimePeriod(MILLISECONDS);
final Long maximumAgeOrNull = context.getProperty(MAXIMUM_AGE).isSet()
? context.getProperty(MAXIMUM_AGE)
- .asTimePeriod(TimeUnit.MILLISECONDS) : null;
+ .asTimePeriod(MILLISECONDS) : null;
final Double minimumSizeOrNull =
context.getProperty(MINIMUM_SIZE).isSet() ?
context.getProperty(MINIMUM_SIZE).asDataSize(DataUnit.B)
: null;
@@ -307,14 +312,14 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
final String suffixOrNull =
context.getProperty(FILE_NAME_SUFFIX_FILTER).getValue();
final long now = getCurrentTime();
- Predicate<SmbListableEntity> filter = entity -> now -
entity.getTimestamp() >= minimumAge;
+ Predicate<SmbListableEntity> filter = entity -> now -
entity.getLastModifiedTime() >= minimumAge;
if (maximumAgeOrNull != null) {
- filter = filter.and(entity -> now - entity.getTimestamp() <=
maximumAgeOrNull);
+ filter = filter.and(entity -> now - entity.getLastModifiedTime()
<= maximumAgeOrNull);
}
if (minTimestampOrNull != null) {
- filter = filter.and(entity -> entity.getTimestamp() >=
minTimestampOrNull);
+ filter = filter.and(entity -> entity.getLastModifiedTime() >=
minTimestampOrNull);
}
if (minimumSizeOrNull != null) {
@@ -341,7 +346,7 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
try {
clientService.close();
} catch (Exception e) {
- throw new RuntimeException("Could not close samba client", e);
+ throw new RuntimeException("Could not close SMB client", e);
}
});
}
@@ -349,7 +354,7 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
private String getDirectory(ProcessContext context) {
final PropertyValue property = context.getProperty(DIRECTORY);
final String directory = property.isSet() ?
property.getValue().replace('\\', '/') : "";
- return directory.equals("/") ? "" : directory;
+ return "/".equals(directory) ? "" : directory;
}
private static class MustNotContainDirectorySeparatorsValidator implements
Validator {
@@ -359,11 +364,11 @@ public class ListSmb extends
AbstractListProcessor<SmbListableEntity> {
return new ValidationResult.Builder()
.subject(subject)
.input(value)
- .valid(!value.contains("/"))
+ .valid(!value.contains("/") && !value.contains("\\"))
.explanation(subject + " must not contain any folder
separator character.")
.build();
}
}
-}
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
index 6c6821fca4..d88561e895 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/PutSmbFile.java
@@ -68,7 +68,7 @@ import java.util.EnumSet;
@CapabilityDescription("Writes the contents of a FlowFile to a samba network
location. " +
"Use this processor instead of a cifs mounts if share access control is
important." +
"Configure the Hostname, Share and Directory accordingly:
\\\\[Hostname]\\[Share]\\[path\\to\\Directory]")
-@SeeAlso({GetSmbFile.class})
+@SeeAlso({GetSmbFile.class, ListSmb.class, FetchSmb.class})
@ReadsAttributes({@ReadsAttribute(attribute="filename", description="The
filename to use when writing the FlowFile to the network folder.")})
public class PutSmbFile extends AbstractProcessor {
public static final String SHARE_ACCESS_NONE = "none";
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 0dd9276bc0..650be2dd0a 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+org.apache.nifi.processors.smb.FetchSmb
org.apache.nifi.processors.smb.GetSmbFile
org.apache.nifi.processors.smb.ListSmb
org.apache.nifi.processors.smb.PutSmbFile
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
new file mode 100644
index 0000000000..fb057791a8
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbIT.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE;
+import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS;
+import static org.apache.nifi.processors.smb.FetchSmb.REMOTE_FILE;
+import static org.apache.nifi.util.TestRunners.newTestRunner;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.nifi.services.smb.SmbjClientProviderService;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.Test;
+
+public class FetchSmbIT extends SambaTestContainers {
+
+ @Test
+ public void fetchFilesUsingEL() throws Exception {
+ writeFile("/test_file", "test_content");
+ TestRunner testRunner = newTestRunner(FetchSmb.class);
+ testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
+ final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("attribute_to_find_using_EL", "test_file");
+
+ testRunner.enqueue("ignored", attributes);
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ assertEquals("test_content",
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent());
+ testRunner.assertValid();
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
+ @Test
+ public void tryToFetchNonExistingFileEmitsFailure() throws Exception {
+ TestRunner testRunner = newTestRunner(FetchSmb.class);
+ testRunner.setProperty(REMOTE_FILE, "${attribute_to_find_using_EL}");
+ final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
+
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("attribute_to_find_using_EL", "non_existing_file");
+
+ testRunner.enqueue("ignored", attributes);
+ testRunner.run();
+ testRunner.assertTransferCount(REL_FAILURE, 1);
+ testRunner.assertValid();
+ testRunner.disableControllerService(smbjClientProviderService);
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbTest.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbTest.java
new file mode 100644
index 0000000000..42651a4ef1
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/FetchSmbTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static org.apache.nifi.processors.smb.FetchSmb.ERROR_CODE_ATTRIBUTE;
+import static org.apache.nifi.processors.smb.FetchSmb.ERROR_MESSAGE_ATTRIBUTE;
+import static org.apache.nifi.processors.smb.FetchSmb.REL_FAILURE;
+import static org.apache.nifi.processors.smb.FetchSmb.REL_SUCCESS;
+import static
org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.util.TestRunners.newTestRunner;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.services.smb.SmbClientProviderService;
+import org.apache.nifi.services.smb.SmbClientService;
+import org.apache.nifi.services.smb.SmbException;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+class FetchSmbTest {
+
+ public static final String CLIENT_SERVICE_PROVIDER_ID =
"client-provider-service-id";
+
+ @Mock
+ SmbClientService mockNifiSmbClientService;
+
+ @Mock
+ SmbClientProviderService clientProviderService;
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
when(clientProviderService.getClient()).thenReturn(mockNifiSmbClientService);
+
when(clientProviderService.getIdentifier()).thenReturn(CLIENT_SERVICE_PROVIDER_ID);
+
when(clientProviderService.getServiceLocation()).thenReturn(URI.create("smb://localhost:445/share"));
+ }
+
+ @Test
+ public void shouldUseSmbClientProperly() throws Exception {
+ final TestRunner testRunner = createRunner();
+ mockNifiSmbClientService();
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("path", "testDirectory");
+ attributes.put("filename", "cannotReadThis");
+ testRunner.enqueue("ignore", attributes);
+ attributes = new HashMap<>();
+ attributes.put("path", "testDirectory");
+ attributes.put("filename", "canReadThis");
+ testRunner.enqueue("ignore", attributes);
+ testRunner.run();
+ testRunner.assertTransferCount(REL_FAILURE, 1);
+ assertEquals("test exception",
+
testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0).getAttribute(ERROR_MESSAGE_ATTRIBUTE));
+ assertEquals("1",
+
testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0).getAttribute(ERROR_CODE_ATTRIBUTE));
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 1);
+ assertEquals("content",
+
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent());
+ testRunner.assertValid();
+ }
+
+ @Test
+ public void noSuchAttributeReferencedInELShouldResultInFailure() throws
Exception {
+ final TestRunner testRunner = createRunner();
+ mockNifiSmbClientService();
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("different_field_name_than_what_EL_expect",
"testDirectory/cannotFindThis");
+ testRunner.enqueue("ignore", attributes);
+ testRunner.run();
+ testRunner.assertTransferCount(REL_SUCCESS, 0);
+ testRunner.assertTransferCount(REL_FAILURE, 1);
+ testRunner.assertValid();
+ }
+
+ private void mockNifiSmbClientService() throws IOException {
+ doThrow(new SmbException("test exception", 1L, new
RuntimeException())).when(mockNifiSmbClientService)
+ .readFile(anyString(), any(OutputStream.class));
+ doAnswer(invocation -> {
+ final OutputStream o = invocation.getArgument(1);
+ final ByteArrayInputStream bytes = new
ByteArrayInputStream("content".getBytes());
+ IOUtils.copy(bytes, o);
+ return true;
+ }).when(mockNifiSmbClientService)
+ .readFile(eq("testDirectory/canReadThis"),
any(OutputStream.class));
+ }
+
+ private TestRunner createRunner() throws Exception {
+ final TestRunner testRunner = newTestRunner(FetchSmb.class);
+ testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE,
CLIENT_SERVICE_PROVIDER_ID);
+ testRunner.addControllerService(CLIENT_SERVICE_PROVIDER_ID,
clientProviderService);
+ testRunner.enableControllerService(clientProviderService);
+ return testRunner;
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
index 8ef5aa7ba3..f059502353 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbIT.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.smb;
import static java.util.Arrays.asList;
-import static java.util.Arrays.fill;
import static java.util.stream.Collectors.toSet;
import static
org.apache.nifi.processor.util.list.AbstractListProcessor.LISTING_STRATEGY;
import static
org.apache.nifi.processor.util.list.AbstractListProcessor.RECORD_WRITER;
@@ -26,13 +25,9 @@ import static
org.apache.nifi.processors.smb.ListSmb.DIRECTORY;
import static org.apache.nifi.processors.smb.ListSmb.FILE_NAME_SUFFIX_FILTER;
import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_AGE;
import static org.apache.nifi.processors.smb.ListSmb.MINIMUM_SIZE;
-import static
org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
-import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN;
import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
-import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
-import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
import static org.apache.nifi.util.TestRunners.newTestRunner;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -40,47 +35,16 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.services.smb.SmbClientProviderService;
-import org.apache.nifi.services.smb.SmbListableEntity;
import org.apache.nifi.services.smb.SmbjClientProviderService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.images.builder.Transferable;
-import org.testcontainers.utility.DockerImageName;
-public class ListSmbIT {
-
- private final static Integer DEFAULT_SAMBA_PORT = 445;
- private final static Logger logger =
LoggerFactory.getLogger(ListSmbTest.class);
- private final GenericContainer<?> sambaContainer = new
GenericContainer<>(DockerImageName.parse("dperson/samba"))
- .withExposedPorts(DEFAULT_SAMBA_PORT, 139)
- .waitingFor(Wait.forListeningPort())
- .withLogConsumer(new Slf4jLogConsumer(logger))
- .withCommand("-w domain -u username;password -s
share;/folder;;no;no;username;;; -p");
-
- @BeforeEach
- public void beforeEach() {
- sambaContainer.start();
- }
-
- @AfterEach
- public void afterEach() {
- sambaContainer.stop();
- }
+public class ListSmbIT extends SambaTestContainers {
@ParameterizedTest
@ValueSource(ints = {4, 50, 45000})
@@ -89,8 +53,7 @@ public class ListSmbIT {
final TestRunner testRunner = newTestRunner(ListSmb.class);
testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(MINIMUM_AGE, "0 ms");
- SmbjClientProviderService smbjClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
- testRunner.enableControllerService(smbjClientProviderService);
+ SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
testRunner.getFlowFilesForRelationship(REL_SUCCESS)
@@ -105,8 +68,7 @@ public class ListSmbIT {
testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(MINIMUM_AGE, "0 ms");
testRunner.setProperty(DIRECTORY, "folderDoesNotExists");
- SmbjClientProviderService smbjClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
- testRunner.enableControllerService(smbjClientProviderService);
+ SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size());
testRunner.assertValid();
@@ -116,7 +78,7 @@ public class ListSmbIT {
@Test
public void shouldShowBulletinWhenShareIsInvalid() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
- SmbjClientProviderService smbjClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
+ SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, false);
testRunner.setProperty(smbjClientProviderService, SHARE,
"invalid_share");
testRunner.enableControllerService(smbjClientProviderService);
testRunner.run();
@@ -128,8 +90,7 @@ public class ListSmbIT {
@Test
public void shouldShowBulletinWhenSMBPortIsInvalid() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
- final SmbClientProviderService smbClientProviderService =
- configureTestRunnerForSambaDockerContainer(testRunner);
+ final SmbClientProviderService smbClientProviderService =
configureSmbClient(testRunner, false);
testRunner.setProperty(smbClientProviderService, PORT, "1");
testRunner.enableControllerService(smbClientProviderService);
testRunner.run();
@@ -141,12 +102,12 @@ public class ListSmbIT {
@Test
public void shouldShowBulletinWhenSMBHostIsInvalid() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
- final SmbClientProviderService smbClientProviderService =
- configureTestRunnerForSambaDockerContainer(testRunner);
+ final SmbClientProviderService smbClientProviderService =
configureSmbClient(testRunner, false);
testRunner.setProperty(smbClientProviderService, HOSTNAME,
"this.host.should.not.exists");
testRunner.enableControllerService(smbClientProviderService);
testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size());
+ testRunner.assertValid();
testRunner.disableControllerService(smbClientProviderService);
}
@@ -163,22 +124,14 @@ public class ListSmbIT {
final TestRunner testRunner = newTestRunner(ListSmb.class);
final MockRecordWriter writer = new MockRecordWriter(null, false);
- final SimpleRecordSchema simpleRecordSchema =
SmbListableEntity.getRecordSchema();
testRunner.addControllerService("writer", writer);
testRunner.enableControllerService(writer);
testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(RECORD_WRITER, "writer");
testRunner.setProperty(MINIMUM_AGE, "0 ms");
- SmbjClientProviderService smbjClientProviderService =
configureTestRunnerForSambaDockerContainer(testRunner);
- testRunner.enableControllerService(smbjClientProviderService);
+ final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
- final String result =
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0).getContent();
- final int identifierColumnIndex =
simpleRecordSchema.getFieldNames().indexOf("identifier");
- final Set<String> actual = Arrays.stream(result.split("\n"))
- .map(row -> row.split(",")[identifierColumnIndex])
- .collect(toSet());
- assertEquals(testFiles, actual);
testRunner.assertValid();
testRunner.disableControllerService(smbjClientProviderService);
}
@@ -190,8 +143,7 @@ public class ListSmbIT {
));
testFiles.forEach(file -> writeFile(file, generateContentWithSize(4)));
final TestRunner testRunner = newTestRunner(ListSmb.class);
- final SmbjClientProviderService smbjClientProviderService =
- configureTestRunnerForSambaDockerContainer(testRunner);
+ final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, false);
testRunner.setProperty(LISTING_STRATEGY, "none");
testRunner.setProperty(MINIMUM_AGE, "0 sec");
testRunner.enableControllerService(smbjClientProviderService);
@@ -202,16 +154,6 @@ public class ListSmbIT {
.map(MockFlowFile::getAttributes)
.collect(toSet());
- final Set<String> identifiers = allAttributes.stream()
- .map(attributes -> attributes.get("identifier"))
- .collect(toSet());
- assertEquals(testFiles, identifiers);
-
- allAttributes.forEach(attribute -> assertEquals(
- Stream.of(attribute.get("path"),
attribute.get("filename")).filter(s -> !s.isEmpty()).collect(
- Collectors.joining("/")),
- attribute.get("absolute.path")));
-
final Set<String> fileNames = allAttributes.stream()
.map(attributes -> attributes.get("filename"))
.collect(toSet());
@@ -225,9 +167,7 @@ public class ListSmbIT {
@Test
public void shouldFilterFilesBySizeCriteria() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
- final SmbClientProviderService smbClientProviderService =
- configureTestRunnerForSambaDockerContainer(testRunner);
- testRunner.enableControllerService(smbClientProviderService);
+ final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
testRunner.setProperty(MINIMUM_AGE, "0 ms");
testRunner.setProperty(LISTING_STRATEGY, "none");
@@ -247,17 +187,15 @@ public class ListSmbIT {
testRunner.setProperty(MINIMUM_SIZE, "50 B");
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
-
- testRunner.disableControllerService(smbClientProviderService);
+ testRunner.assertValid();
+ testRunner.disableControllerService(smbjClientProviderService);
}
@Test
public void shouldFilterByGivenSuffix() throws Exception {
final TestRunner testRunner = newTestRunner(ListSmb.class);
- final SmbClientProviderService smbClientProviderService =
- configureTestRunnerForSambaDockerContainer(testRunner);
- testRunner.enableControllerService(smbClientProviderService);
+ final SmbjClientProviderService smbjClientProviderService =
configureSmbClient(testRunner, true);
testRunner.setProperty(MINIMUM_AGE, "0 ms");
testRunner.setProperty(FILE_NAME_SUFFIX_FILTER, ".suffix");
testRunner.setProperty(LISTING_STRATEGY, "none");
@@ -265,33 +203,8 @@ public class ListSmbIT {
writeFile("should_skip_this.suffix", generateContentWithSize(1));
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
- testRunner.disableControllerService(smbClientProviderService);
- }
-
- private SmbjClientProviderService
configureTestRunnerForSambaDockerContainer(TestRunner testRunner)
- throws Exception {
- SmbjClientProviderService smbjClientProviderService = new
SmbjClientProviderService();
- testRunner.addControllerService("connection-pool",
smbjClientProviderService);
- testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "connection-pool");
- testRunner.setProperty(smbjClientProviderService, HOSTNAME,
sambaContainer.getHost());
- testRunner.setProperty(smbjClientProviderService, PORT,
-
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
- testRunner.setProperty(smbjClientProviderService, USERNAME,
"username");
- testRunner.setProperty(smbjClientProviderService, PASSWORD,
"password");
- testRunner.setProperty(smbjClientProviderService, SHARE, "share");
- testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain");
- return smbjClientProviderService;
- }
-
- private String generateContentWithSize(int sizeInBytes) {
- byte[] bytes = new byte[sizeInBytes];
- fill(bytes, (byte) 1);
- return new String(bytes);
- }
-
- private void writeFile(String path, String content) {
- String containerPath = "/folder/" + path;
- sambaContainer.copyFileToContainer(Transferable.of(content),
containerPath);
+ testRunner.assertValid();
+ testRunner.disableControllerService(smbjClientProviderService);
}
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
index e279bf2e85..0594ef3e2f 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/ListSmbTest.java
@@ -115,7 +115,6 @@ class ListSmbTest {
testRunner.run();
testRunner.assertTransferCount(REL_SUCCESS, 1);
-
testRunner.assertValid();
}
@@ -236,6 +235,7 @@ class ListSmbTest {
when(mockNifiSmbClientService.listRemoteFiles(anyString())).thenThrow(new
RuntimeException("test exception"));
testRunner.run();
assertEquals(1, testRunner.getLogger().getErrorMessages().size());
+ testRunner.assertValid();
}
@Test
@@ -288,7 +288,7 @@ class ListSmbTest {
private SmbListableEntity listableEntity(String name, long timeStamp) {
return SmbListableEntity.builder()
.setName(name)
- .setTimestamp(timeStamp)
+ .setLastModifiedTime(timeStamp)
.build();
}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
new file mode 100644
index 0000000000..f38d36d171
--- /dev/null
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/test/java/org/apache/nifi/processors/smb/SambaTestContainers.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.smb;
+
+import static java.util.Arrays.fill;
+import static
org.apache.nifi.processors.smb.ListSmb.SMB_CLIENT_PROVIDER_SERVICE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.DOMAIN;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.HOSTNAME;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PASSWORD;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.PORT;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.SHARE;
+import static org.apache.nifi.services.smb.SmbjClientProviderService.USERNAME;
+
+import org.apache.nifi.services.smb.SmbjClientProviderService;
+import org.apache.nifi.util.TestRunner;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.images.builder.Transferable;
+import org.testcontainers.utility.DockerImageName;
+
+public class SambaTestContainers {
+
+ protected final static Integer DEFAULT_SAMBA_PORT = 445;
+ protected final static Logger logger =
LoggerFactory.getLogger(SambaTestContainers.class);
+ protected final GenericContainer<?> sambaContainer = new
GenericContainer<>(DockerImageName.parse("dperson/samba"))
+ .withExposedPorts(DEFAULT_SAMBA_PORT, 139)
+ .waitingFor(Wait.forListeningPort())
+ .withLogConsumer(new Slf4jLogConsumer(logger))
+ .withCommand("-w domain -u username;password -s
share;/folder;;no;no;username;;; -p");
+
+ @BeforeEach
+ public void beforeEach() {
+ sambaContainer.start();
+ }
+
+ @AfterEach
+ public void afterEach() {
+ sambaContainer.stop();
+ }
+
+ protected SmbjClientProviderService configureSmbClient(TestRunner
testRunner, boolean shouldEnableSmbClient)
+ throws Exception {
+ final SmbjClientProviderService smbjClientProviderService = new
SmbjClientProviderService();
+ testRunner.addControllerService("client-provider",
smbjClientProviderService);
+ testRunner.setProperty(SMB_CLIENT_PROVIDER_SERVICE, "client-provider");
+ testRunner.setProperty(smbjClientProviderService, HOSTNAME,
sambaContainer.getHost());
+ testRunner.setProperty(smbjClientProviderService, PORT,
+
String.valueOf(sambaContainer.getMappedPort(DEFAULT_SAMBA_PORT)));
+ testRunner.setProperty(smbjClientProviderService, USERNAME,
"username");
+ testRunner.setProperty(smbjClientProviderService, PASSWORD,
"password");
+ testRunner.setProperty(smbjClientProviderService, SHARE, "share");
+ testRunner.setProperty(smbjClientProviderService, DOMAIN, "domain");
+ if (shouldEnableSmbClient) {
+ testRunner.enableControllerService(smbjClientProviderService);
+ }
+ return smbjClientProviderService;
+ }
+
+ protected String generateContentWithSize(int sizeInBytes) {
+ byte[] bytes = new byte[sizeInBytes];
+ fill(bytes, (byte) 1);
+ return new String(bytes);
+ }
+
+ protected void writeFile(String path, String content) {
+ String containerPath = "/folder/" + path;
+ sambaContainer.copyFileToContainer(Transferable.of(content),
containerPath);
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
index f72cec6a2b..3ee35d6289 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientProviderService.java
@@ -116,7 +116,7 @@ public class SmbjClientProviderService extends
AbstractControllerService impleme
@Override
public SmbClientService getClient() throws IOException {
- final SmbjClientService client = new SmbjClientService(smbClient,
authenticationContext);
+ final SmbjClientService client = new SmbjClientService(smbClient,
authenticationContext, getServiceLocation());
try {
client.connectToShare(hostname, port, shareName);
diff --git
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
index 431a4e9f52..a103fec359 100644
---
a/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
+++
b/nifi-nar-bundles/nifi-smb-bundle/nifi-smb-smbj-client/src/main/java/org/apache/nifi/services/smb/SmbjClientService.java
@@ -33,8 +33,11 @@ import com.hierynomus.smbj.connection.Connection;
import com.hierynomus.smbj.session.Session;
import com.hierynomus.smbj.share.Directory;
import com.hierynomus.smbj.share.DiskShare;
+import com.hierynomus.smbj.share.File;
import com.hierynomus.smbj.share.Share;
import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Stream;
@@ -42,17 +45,20 @@ import java.util.stream.Stream;
public class SmbjClientService implements SmbClientService {
private static final List<String> SPECIAL_DIRECTORIES = asList(".", "..");
+ private static final long UNCATEGORISED_ERROR = -1L;
final private AuthenticationContext authenticationContext;
final private SMBClient smbClient;
+ final private URI serviceLocation;
private Connection connection;
private Session session;
private DiskShare share;
- public SmbjClientService(SMBClient smbClient, AuthenticationContext
authenticationContext) {
+ public SmbjClientService(SMBClient smbClient, AuthenticationContext
authenticationContext, URI serviceLocation) {
this.smbClient = smbClient;
this.authenticationContext = authenticationContext;
+ this.serviceLocation = serviceLocation;
}
public void connectToShare(String hostname, int port, String shareName)
throws IOException {
@@ -104,7 +110,7 @@ public class SmbjClientService implements SmbClientService {
return Stream.of(filePath).flatMap(path -> {
final Directory directory = openDirectory(path);
return stream(directory::spliterator, 0, false)
- .map(entity -> buildSmbListableEntity(entity, path))
+ .map(entity -> buildSmbListableEntity(entity, path,
serviceLocation))
.filter(entity -> !specialDirectory(entity))
.flatMap(listable -> listable.isDirectory() ?
listRemoteFiles(listable.getPathWithName())
: Stream.of(listable))
@@ -123,18 +129,39 @@ public class SmbjClientService implements
SmbClientService {
}
}
- private SmbListableEntity
buildSmbListableEntity(FileIdBothDirectoryInformation info, String path) {
+ @Override
+ public void readFile(String fileName, OutputStream outputStream) throws
IOException {
+ try (File f = share.openFile(
+ fileName,
+ EnumSet.of(AccessMask.GENERIC_READ),
+ EnumSet.of(FileAttributes.FILE_ATTRIBUTE_NORMAL),
+ EnumSet.of(SMB2ShareAccess.FILE_SHARE_READ),
+ SMB2CreateDisposition.FILE_OPEN,
+ EnumSet.of(SMB2CreateOptions.FILE_SEQUENTIAL_ONLY))
+ ) {
+ f.read(outputStream);
+ } catch (SMBApiException a) {
+ throw new SmbException(a.getMessage(), a.getStatusCode(), a);
+ } catch (Exception e) {
+ throw new SmbException(e.getMessage(), UNCATEGORISED_ERROR, e);
+ } finally {
+ outputStream.close();
+ }
+ }
+
+ private SmbListableEntity
buildSmbListableEntity(FileIdBothDirectoryInformation info, String path, URI
serviceLocation) {
return SmbListableEntity.builder()
.setName(info.getFileName())
.setShortName(info.getShortName())
.setPath(path)
- .setTimestamp(info.getLastWriteTime().toEpochMillis())
+ .setLastModifiedTime(info.getLastWriteTime().toEpochMillis())
.setCreationTime(info.getCreationTime().toEpochMillis())
.setChangeTime(info.getChangeTime().toEpochMillis())
.setLastAccessTime(info.getLastAccessTime().toEpochMillis())
.setDirectory((info.getFileAttributes() &
FileAttributes.FILE_ATTRIBUTE_DIRECTORY.getValue()) != 0)
.setSize(info.getEndOfFile())
.setAllocationSize(info.getAllocationSize())
+ .setServiceLocation(serviceLocation)
.build();
}