exceptionfactory commented on code in PR #6192: URL: https://github.com/apache/nifi/pull/6192#discussion_r918153432
########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/pom.xml: ########## @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-smb-bundle</artifactId> + <version>1.17.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-smb-connection-pool</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-smb-connection-pool-api</artifactId> + <version>1.17.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>com.hierynomus</groupId> + <artifactId>smbj</artifactId> + </dependency> + <dependency> + <groupId>net.engio</groupId> + <artifactId>mbassador</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> Review Comment: Apache Commons-Lang3 should be used instead of the older Apache Commons-Lang. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool-api/src/main/java/org/apache/nifi/services/smb/SmbConnectionPoolService.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.auth.AuthenticationContext; +import org.apache.nifi.controller.ControllerService; + +public interface SmbConnectionPoolService extends ControllerService { + + /** + * Returns the name of the share to connect. + * + * @return the share + */ + String getShareName(); + + /** + * Returns the hostname to connect to. + * + * @return the hostname + */ + String getHostname(); + + /** + * Returns the port using to connect. + * + * @return the port. + */ + Integer getPort(); + + /** + * Returns the SmbClient to use + * + * @return the smbClient + */ + SMBClient getSmbClient(); + + /** + * Returns the authentication context. + * + * @return the authentication context. + */ + AuthenticationContext getAuthenticationContext(); Review Comment: Instead of returning these smbj objects, access could be encapsulated in several ways. One option is to encapsulate the authentication process and return a `Session`, which would allow reuse. A better option would be to encapsulate the service operation and provide a method that would enumerate a shared directory along the following lines of: ``` List<ListableEntity> getRemoteFiles(String directory); ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool-api/src/main/java/org/apache/nifi/services/smb/SmbConnectionPoolService.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.auth.AuthenticationContext; +import org.apache.nifi.controller.ControllerService; + +public interface SmbConnectionPoolService extends ControllerService { + + /** + * Returns the name of the share to connect. + * + * @return the share + */ + String getShareName(); + + /** + * Returns the hostname to connect to. + * + * @return the hostname + */ + String getHostname(); + + /** + * Returns the port using to connect. + * + * @return the port. + */ + Integer getPort(); Review Comment: Instead of exposing individual property values, the service interface could be simplified to return `java.net.URI` that would encapsulate SMB URI construction. ```suggestion /** * Get the service location including the protocol, hostname, and port for the current connection * * @return Service Location URI */ URI getServiceLocation(); ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; + +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbConnectionPoolService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file in the filesystem was modified as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum file age in milliseconds") + .name("min-age") + .description( + "Any file younger the the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(INTEGER_VALIDATOR) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("5000") + .build(); + + public static final PropertyDescriptor SMB_LISTING_STRATEGY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(LISTING_STRATEGY) + .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS) + .build(); + + public static final PropertyDescriptor SMB_CONNECTION_POOL_SERVICE = new Builder() + .name("smb-connection-pool-service") + .displayName("SMB Connection Pool Service") + .description("Specifies the SMB Connection Pool to use for creating SMB connections.") + .required(true) + .identifiesControllerService(SmbConnectionPoolService.class) + .build(); + + public static final PropertyDescriptor SKIP_FILES_WITH_SUFFIX = new Builder() + .name("skip-files-with-suffix") + .displayName("File name suffix filter") Review Comment: As this is a new property, it would be best to align the property name and display name. Should it be `file-name-suffix-filter`? Taking a closer look at the approach, is there a reason for limiting the scope to the filename suffix, as opposed to supporting a more generalized Filename Filter property? ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjConnectionPoolService.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.auth.AuthenticationContext; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.util.StringUtils; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjConnectionPoolService extends AbstractControllerService implements SmbConnectionPoolService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host to which files should be written.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor SHARE = new PropertyDescriptor.Builder() + .displayName("Share") + .name("share") + .description("The network share to which files should be written. This is the \"first folder\"" + + "after the hostname: \\\\hostname\\[share]\\dir1\\dir2") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder() + .displayName("Domain") + .name("domain") + .description( + "The domain used for authentication. Optional, in most cases username and password is sufficient.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .displayName("Username") + .name("username") + .description( + "The username used for authentication. If no username is set then anonymous authentication is attempted.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .displayName("Password") + .name("password") + .description("The password used for authentication. Required if Username is set.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .displayName("Port") + .name("port") + .description("Port to use for connection.") + .required(true) + .addValidator(INTEGER_VALIDATOR) + .defaultValue("445") + .build(); + private static final List<PropertyDescriptor> PROPERTIES = Collections + .unmodifiableList(Arrays.asList( + HOSTNAME, + SHARE, + DOMAIN, + USERNAME, + PASSWORD, + PORT + )); + + public SMBClient getSmbClient() { + return smbClient; + } + + public AuthenticationContext getAuthenticationContext() { + return authenticationContext; + } + + private final SMBClient smbClient = new SMBClient(); + private AuthenticationContext authenticationContext; + private ConfigurationContext context; + private String hostname; + private Integer port; + private String shareName; + + public String getShareName() { + return shareName; + } + + public String getHostname() { + return hostname; + } + + public Integer getPort() { + return port; + } + + @OnEnabled + public void onEnabled(ConfigurationContext context) { + this.context = context; + this.hostname = context.getProperty(HOSTNAME).getValue(); + this.port = context.getProperty(PORT).asInteger(); + this.shareName = context.getProperty(SHARE).getValue(); + createAuthenticationContext(); + } + + private void createAuthenticationContext() { + final String userName = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + final String domain = context.getProperty(DOMAIN).getValue(); + if (userName != null && password != null) { + authenticationContext = new AuthenticationContext(userName, password.toCharArray(), domain); + } else { + authenticationContext = AuthenticationContext.anonymous(); + } + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(); + + final String hostname = validationContext.getProperty(HOSTNAME).getValue(); + final Integer port = validationContext.getProperty(PORT).asInteger(); + final String share = validationContext.getProperty(SHARE).getValue(); + + if (StringUtils.isBlank(hostname)) { + results.add(new ValidationResult.Builder() + .subject(this.getClass().getSimpleName()) + .valid(false) + .explanation("hostname is required") + .build() + ); + } + + if (StringUtils.isBlank(share)) { + results.add(new ValidationResult.Builder() + .subject(this.getClass().getSimpleName()) + .valid(false) + .explanation("share is required") + .build() + ); + } + + if (port == null || port <= 0) { + results.add(new ValidationResult.Builder() + .subject(this.getClass().getSimpleName()) + .valid(false) + .explanation("port is invalid") + .build() + ); + } + + return results; + } Review Comment: Is this method necessary? It looks like this validation should be handled using the validators configured for each property descriptor. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; + +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbConnectionPoolService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file in the filesystem was modified as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum file age in milliseconds") + .name("min-age") + .description( + "Any file younger the the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(INTEGER_VALIDATOR) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("5000") + .build(); + + public static final PropertyDescriptor SMB_LISTING_STRATEGY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(LISTING_STRATEGY) + .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS) + .build(); + + public static final PropertyDescriptor SMB_CONNECTION_POOL_SERVICE = new Builder() + .name("smb-connection-pool-service") + .displayName("SMB Connection Pool Service") + .description("Specifies the SMB Connection Pool to use for creating SMB connections.") + .required(true) + .identifiesControllerService(SmbConnectionPoolService.class) + .build(); + + public static final PropertyDescriptor SKIP_FILES_WITH_SUFFIX = new Builder() + .name("skip-files-with-suffix") + .displayName("File name suffix filter") + .description("Files ends with the given suffix will be omitted. This is handy when writing large data into " + + "temporary files and then moved to a final one. Please be advised that writing data into files " + + "first is highly recommended when using Entity Tracking or Timestamp based listing strategies.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .addValidator(new MustNotContainDirectorySeparatorsValidator()) + .build(); + + private static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; + private static final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US); + private static final List<PropertyDescriptor> PROPERTIES = unmodifiableList(asList( + AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, + AbstractListProcessor.RECORD_WRITER, + ListedEntityTracker.TRACKING_STATE_CACHE, + ListedEntityTracker.TRACKING_TIME_WINDOW, + ListedEntityTracker.INITIAL_LISTING_TARGET, + SMB_LISTING_STRATEGY, + SMB_CONNECTION_POOL_SERVICE, + DIRECTORY, + MINIMUM_AGE, + SKIP_FILES_WITH_SUFFIX + )); + + NiFiSmbClientFactory smbClientFactory = new NiFiSmbClientFactory(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected Map<String, String> createAttributes(SmbListableEntity entity, ProcessContext context) { + final Map<String, String> attributes = new TreeMap<>(); + attributes.put("filename", entity.getName()); + attributes.put("path", entity.getPath()); + attributes.put("absolute.path", getPath(context) + entity.getPathWithName()); + attributes.put("identifier", entity.getIdentifier()); + attributes.put("timestamp", formatter.format(new Date(entity.getTimestamp()))); + attributes.put("size", String.valueOf(entity.getSize())); + return unmodifiableMap(attributes); + } + + @Override + protected String getPath(ProcessContext context) { + final SmbConnectionPoolService connectionPoolService = + context.getProperty(SMB_CONNECTION_POOL_SERVICE).asControllerService(SmbConnectionPoolService.class); + final String hostname = connectionPoolService.getHostname(); + final String port = String.valueOf(connectionPoolService.getPort()); + final String share = connectionPoolService.getShareName(); + final String directory = getDirectory(context); + return String.format("smb://%s:%s/%s:\\%s", hostname, port, share, directory.isEmpty() ? "" : directory + "\\"); + } + + @Override + protected List<SmbListableEntity> performListing(ProcessContext context, Long minimumTimestampOrNull, + ListingMode listingMode) throws IOException { + + final Integer minimumAge = context.getProperty(MINIMUM_AGE).asInteger(); + final String suffixOrNull = context.getProperty(SKIP_FILES_WITH_SUFFIX).getValue(); + final Predicate<SmbListableEntity> fileFilter = + createFileFilter(minimumAge, minimumTimestampOrNull, suffixOrNull); + try (Stream<SmbListableEntity> listing = performListing(context)) { + final Iterator<SmbListableEntity> iterator = listing.iterator(); + final List<SmbListableEntity> result = new LinkedList<>(); + while (iterator.hasNext()) { + if (!isExecutionScheduled(listingMode)) { + return emptyList(); + } + final SmbListableEntity entity = iterator.next(); + if (fileFilter.test(entity)) { + result.add(entity); + } + } + return result; + } catch (IOException o) { + throw o; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + protected boolean isListingResetNecessary(PropertyDescriptor property) { + return asList(SMB_CONNECTION_POOL_SERVICE, DIRECTORY, SKIP_FILES_WITH_SUFFIX).contains(property); + } + + @Override + protected Scope getStateScope(PropertyContext context) { + return CLUSTER; + } + + @Override + protected RecordSchema getRecordSchema() { + return SmbListableEntity.getRecordSchema(); + } + + @Override + protected Integer countUnfilteredListing(ProcessContext context) throws IOException { + try (Stream<SmbListableEntity> listing = performListing(context)) { + return Long.valueOf(listing.count()).intValue(); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + protected String getListingContainerName(ProcessContext context) { + return String.format("Samba Directory [%s]", getPath(context)); Review Comment: `Samba` is a Linux-specific implementation of the SMB protocol. This should be adjusted: ```suggestion return String.format("Remote Directory [%s]", getPath(context)); ``` ########## nifi-nar-bundles/nifi-smb-bundle/pom.xml: ########## @@ -26,7 +26,36 @@ <packaging>pom</packaging> <modules> + <module>nifi-smb-connection-pool-api</module> + <module>nifi-smb-connection-pool-api-nar</module> + <module>nifi-smb-connection-pool</module> + <module>nifi-smb-connection-pool-nar</module> <module>nifi-smb-processors</module> <module>nifi-smb-nar</module> </modules> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.hierynomus</groupId> + <artifactId>smbj</artifactId> + <version>0.11.5</version> + </dependency> + <dependency> + <groupId>net.engio</groupId> + <artifactId>mbassador</artifactId> + <version>1.3.0</version> Review Comment: The latest version appears to be 1.3.2, is there a reason for not using that version? ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; + +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbConnectionPoolService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file in the filesystem was modified as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum file age in milliseconds") + .name("min-age") + .description( + "Any file younger the the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(INTEGER_VALIDATOR) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("5000") + .build(); + + public static final PropertyDescriptor SMB_LISTING_STRATEGY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(LISTING_STRATEGY) + .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS) + .build(); + + public static final PropertyDescriptor SMB_CONNECTION_POOL_SERVICE = new Builder() + .name("smb-connection-pool-service") + .displayName("SMB Connection Pool Service") + .description("Specifies the SMB Connection Pool to use for creating SMB connections.") + .required(true) + .identifiesControllerService(SmbConnectionPoolService.class) + .build(); + + public static final PropertyDescriptor SKIP_FILES_WITH_SUFFIX = new Builder() + .name("skip-files-with-suffix") + .displayName("File name suffix filter") + .description("Files ends with the given suffix will be omitted. This is handy when writing large data into " + + "temporary files and then moved to a final one. Please be advised that writing data into files " + + "first is highly recommended when using Entity Tracking or Timestamp based listing strategies.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .addValidator(new MustNotContainDirectorySeparatorsValidator()) + .build(); + + private static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; + private static final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US); + private static final List<PropertyDescriptor> PROPERTIES = unmodifiableList(asList( + AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, + AbstractListProcessor.RECORD_WRITER, + ListedEntityTracker.TRACKING_STATE_CACHE, + ListedEntityTracker.TRACKING_TIME_WINDOW, + ListedEntityTracker.INITIAL_LISTING_TARGET, + SMB_LISTING_STRATEGY, + SMB_CONNECTION_POOL_SERVICE, + DIRECTORY, + MINIMUM_AGE, + SKIP_FILES_WITH_SUFFIX + )); + + NiFiSmbClientFactory smbClientFactory = new NiFiSmbClientFactory(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected Map<String, String> createAttributes(SmbListableEntity entity, ProcessContext context) { + final Map<String, String> attributes = new TreeMap<>(); + attributes.put("filename", entity.getName()); + attributes.put("path", entity.getPath()); + attributes.put("absolute.path", getPath(context) + entity.getPathWithName()); + attributes.put("identifier", entity.getIdentifier()); + attributes.put("timestamp", formatter.format(new Date(entity.getTimestamp()))); + attributes.put("size", String.valueOf(entity.getSize())); + return unmodifiableMap(attributes); + } + + @Override + protected String getPath(ProcessContext context) { + final SmbConnectionPoolService connectionPoolService = + context.getProperty(SMB_CONNECTION_POOL_SERVICE).asControllerService(SmbConnectionPoolService.class); + final String hostname = connectionPoolService.getHostname(); + final String port = String.valueOf(connectionPoolService.getPort()); + final String share = connectionPoolService.getShareName(); + final String directory = getDirectory(context); + return String.format("smb://%s:%s/%s:\\%s", hostname, port, share, directory.isEmpty() ? "" : directory + "\\"); + } + + @Override + protected List<SmbListableEntity> performListing(ProcessContext context, Long minimumTimestampOrNull, + ListingMode listingMode) throws IOException { + + final Integer minimumAge = context.getProperty(MINIMUM_AGE).asInteger(); + final String suffixOrNull = context.getProperty(SKIP_FILES_WITH_SUFFIX).getValue(); + final Predicate<SmbListableEntity> fileFilter = + createFileFilter(minimumAge, minimumTimestampOrNull, suffixOrNull); + try (Stream<SmbListableEntity> listing = performListing(context)) { + final Iterator<SmbListableEntity> iterator = listing.iterator(); + final List<SmbListableEntity> result = new LinkedList<>(); + while (iterator.hasNext()) { + if (!isExecutionScheduled(listingMode)) { + return emptyList(); + } + final SmbListableEntity entity = iterator.next(); + if (fileFilter.test(entity)) { + result.add(entity); + } + } + return result; + } catch (IOException o) { + throw o; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + protected boolean isListingResetNecessary(PropertyDescriptor property) { + return asList(SMB_CONNECTION_POOL_SERVICE, DIRECTORY, SKIP_FILES_WITH_SUFFIX).contains(property); + } + + @Override + protected Scope getStateScope(PropertyContext context) { + return CLUSTER; + } + + @Override + protected RecordSchema getRecordSchema() { + return SmbListableEntity.getRecordSchema(); + } + + @Override + protected Integer countUnfilteredListing(ProcessContext context) throws IOException { + try (Stream<SmbListableEntity> listing = performListing(context)) { + return Long.valueOf(listing.count()).intValue(); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } Review Comment: See other comments on streamlining and adding a message. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjConnectionPoolService.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.auth.AuthenticationContext; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.util.StringUtils; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjConnectionPoolService extends AbstractControllerService implements SmbConnectionPoolService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host to which files should be written.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor SHARE = new PropertyDescriptor.Builder() + .displayName("Share") + .name("share") + .description("The network share to which files should be written. This is the \"first folder\"" + + "after the hostname: \\\\hostname\\[share]\\dir1\\dir2") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder() + .displayName("Domain") + .name("domain") + .description( + "The domain used for authentication. Optional, in most cases username and password is sufficient.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .displayName("Username") + .name("username") + .description( + "The username used for authentication. If no username is set then anonymous authentication is attempted.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .displayName("Password") + .name("password") + .description("The password used for authentication. Required if Username is set.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .displayName("Port") + .name("port") + .description("Port to use for connection.") + .required(true) + .addValidator(INTEGER_VALIDATOR) Review Comment: This should be changed to use the Port Validator. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; + +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbConnectionPoolService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file in the filesystem was modified as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum file age in milliseconds") + .name("min-age") + .description( + "Any file younger the the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(INTEGER_VALIDATOR) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("5000") + .build(); + + public static final PropertyDescriptor SMB_LISTING_STRATEGY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(LISTING_STRATEGY) + .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS) + .build(); + + public static final PropertyDescriptor SMB_CONNECTION_POOL_SERVICE = new Builder() + .name("smb-connection-pool-service") + .displayName("SMB Connection Pool Service") + .description("Specifies the SMB Connection Pool to use for creating SMB connections.") + .required(true) + .identifiesControllerService(SmbConnectionPoolService.class) + .build(); + + public static final PropertyDescriptor SKIP_FILES_WITH_SUFFIX = new Builder() + .name("skip-files-with-suffix") + .displayName("File name suffix filter") + .description("Files ends with the given suffix will be omitted. This is handy when writing large data into " + + "temporary files and then moved to a final one. Please be advised that writing data into files " + + "first is highly recommended when using Entity Tracking or Timestamp based listing strategies.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .addValidator(new MustNotContainDirectorySeparatorsValidator()) + .build(); + + private static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; + private static final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US); + private static final List<PropertyDescriptor> PROPERTIES = unmodifiableList(asList( + AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, + AbstractListProcessor.RECORD_WRITER, + ListedEntityTracker.TRACKING_STATE_CACHE, + ListedEntityTracker.TRACKING_TIME_WINDOW, + ListedEntityTracker.INITIAL_LISTING_TARGET, + SMB_LISTING_STRATEGY, + SMB_CONNECTION_POOL_SERVICE, + DIRECTORY, + MINIMUM_AGE, + SKIP_FILES_WITH_SUFFIX + )); + + NiFiSmbClientFactory smbClientFactory = new NiFiSmbClientFactory(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected Map<String, String> createAttributes(SmbListableEntity entity, ProcessContext context) { + final Map<String, String> attributes = new TreeMap<>(); + attributes.put("filename", entity.getName()); + attributes.put("path", entity.getPath()); + attributes.put("absolute.path", getPath(context) + entity.getPathWithName()); + attributes.put("identifier", entity.getIdentifier()); + attributes.put("timestamp", formatter.format(new Date(entity.getTimestamp()))); + attributes.put("size", String.valueOf(entity.getSize())); + return unmodifiableMap(attributes); + } + + @Override + protected String getPath(ProcessContext context) { + final SmbConnectionPoolService connectionPoolService = + context.getProperty(SMB_CONNECTION_POOL_SERVICE).asControllerService(SmbConnectionPoolService.class); + final String hostname = connectionPoolService.getHostname(); + final String port = String.valueOf(connectionPoolService.getPort()); + final String share = connectionPoolService.getShareName(); + final String directory = getDirectory(context); + return String.format("smb://%s:%s/%s:\\%s", hostname, port, share, directory.isEmpty() ? "" : directory + "\\"); + } + + @Override + protected List<SmbListableEntity> performListing(ProcessContext context, Long minimumTimestampOrNull, + ListingMode listingMode) throws IOException { + + final Integer minimumAge = context.getProperty(MINIMUM_AGE).asInteger(); + final String suffixOrNull = context.getProperty(SKIP_FILES_WITH_SUFFIX).getValue(); + final Predicate<SmbListableEntity> fileFilter = + createFileFilter(minimumAge, minimumTimestampOrNull, suffixOrNull); + try (Stream<SmbListableEntity> listing = performListing(context)) { + final Iterator<SmbListableEntity> iterator = listing.iterator(); + final List<SmbListableEntity> result = new LinkedList<>(); + while (iterator.hasNext()) { + if (!isExecutionScheduled(listingMode)) { + return emptyList(); + } + final SmbListableEntity entity = iterator.next(); + if (fileFilter.test(entity)) { + result.add(entity); + } + } + return result; + } catch (IOException o) { + throw o; + } catch (Exception e) { + throw new IOException(e); + } Review Comment: This could be collapsed into a single catch block, and it would be helpful to include an exception message. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjConnectionPoolService.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.auth.AuthenticationContext; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.util.StringUtils; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjConnectionPoolService extends AbstractControllerService implements SmbConnectionPoolService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host to which files should be written.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor SHARE = new PropertyDescriptor.Builder() + .displayName("Share") + .name("share") + .description("The network share to which files should be written. This is the \"first folder\"" + + "after the hostname: \\\\hostname\\[share]\\dir1\\dir2") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder() + .displayName("Domain") + .name("domain") + .description( + "The domain used for authentication. Optional, in most cases username and password is sufficient.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .displayName("Username") + .name("username") + .description( + "The username used for authentication. If no username is set then anonymous authentication is attempted.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .displayName("Password") + .name("password") + .description("The password used for authentication. Required if Username is set.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .displayName("Port") + .name("port") + .description("Port to use for connection.") + .required(true) + .addValidator(INTEGER_VALIDATOR) + .defaultValue("445") + .build(); + private static final List<PropertyDescriptor> PROPERTIES = Collections + .unmodifiableList(Arrays.asList( + HOSTNAME, + SHARE, + DOMAIN, + USERNAME, + PASSWORD, + PORT + )); + + public SMBClient getSmbClient() { + return smbClient; + } + + public AuthenticationContext getAuthenticationContext() { + return authenticationContext; + } + + private final SMBClient smbClient = new SMBClient(); Review Comment: The `SMBClient` is closeable`, so it should be instantiated in `onEnabled` and closed in a method annotated with `OnDisabled`. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; + +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbConnectionPoolService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file in the filesystem was modified as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum file age in milliseconds") + .name("min-age") + .description( + "Any file younger the the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(INTEGER_VALIDATOR) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("5000") + .build(); + + public static final PropertyDescriptor SMB_LISTING_STRATEGY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(LISTING_STRATEGY) + .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS) + .build(); + + public static final PropertyDescriptor SMB_CONNECTION_POOL_SERVICE = new Builder() + .name("smb-connection-pool-service") + .displayName("SMB Connection Pool Service") + .description("Specifies the SMB Connection Pool to use for creating SMB connections.") + .required(true) + .identifiesControllerService(SmbConnectionPoolService.class) + .build(); + + public static final PropertyDescriptor SKIP_FILES_WITH_SUFFIX = new Builder() + .name("skip-files-with-suffix") + .displayName("File name suffix filter") + .description("Files ends with the given suffix will be omitted. This is handy when writing large data into " + + "temporary files and then moved to a final one. Please be advised that writing data into files " + + "first is highly recommended when using Entity Tracking or Timestamp based listing strategies.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .addValidator(new MustNotContainDirectorySeparatorsValidator()) + .build(); + + private static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; + private static final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US); + private static final List<PropertyDescriptor> PROPERTIES = unmodifiableList(asList( + AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, + AbstractListProcessor.RECORD_WRITER, + ListedEntityTracker.TRACKING_STATE_CACHE, + ListedEntityTracker.TRACKING_TIME_WINDOW, + ListedEntityTracker.INITIAL_LISTING_TARGET, + SMB_LISTING_STRATEGY, + SMB_CONNECTION_POOL_SERVICE, + DIRECTORY, + MINIMUM_AGE, + SKIP_FILES_WITH_SUFFIX + )); + + NiFiSmbClientFactory smbClientFactory = new NiFiSmbClientFactory(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected Map<String, String> createAttributes(SmbListableEntity entity, ProcessContext context) { + final Map<String, String> attributes = new TreeMap<>(); + attributes.put("filename", entity.getName()); + attributes.put("path", entity.getPath()); + attributes.put("absolute.path", getPath(context) + entity.getPathWithName()); + attributes.put("identifier", entity.getIdentifier()); + attributes.put("timestamp", formatter.format(new Date(entity.getTimestamp()))); + attributes.put("size", String.valueOf(entity.getSize())); + return unmodifiableMap(attributes); + } + + @Override + protected String getPath(ProcessContext context) { + final SmbConnectionPoolService connectionPoolService = + context.getProperty(SMB_CONNECTION_POOL_SERVICE).asControllerService(SmbConnectionPoolService.class); + final String hostname = connectionPoolService.getHostname(); + final String port = String.valueOf(connectionPoolService.getPort()); + final String share = connectionPoolService.getShareName(); + final String directory = getDirectory(context); + return String.format("smb://%s:%s/%s:\\%s", hostname, port, share, directory.isEmpty() ? "" : directory + "\\"); Review Comment: As mentioned on the interface comments, the leading portion of this URI could be provided through a single service method, then this processor method could just append the directory. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; + +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbConnectionPoolService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file in the filesystem was modified as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum file age in milliseconds") + .name("min-age") + .description( + "Any file younger the the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(INTEGER_VALIDATOR) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("5000") + .build(); + + public static final PropertyDescriptor SMB_LISTING_STRATEGY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(LISTING_STRATEGY) + .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS) + .build(); + + public static final PropertyDescriptor SMB_CONNECTION_POOL_SERVICE = new Builder() + .name("smb-connection-pool-service") + .displayName("SMB Connection Pool Service") + .description("Specifies the SMB Connection Pool to use for creating SMB connections.") + .required(true) + .identifiesControllerService(SmbConnectionPoolService.class) + .build(); + + public static final PropertyDescriptor SKIP_FILES_WITH_SUFFIX = new Builder() + .name("skip-files-with-suffix") + .displayName("File name suffix filter") + .description("Files ends with the given suffix will be omitted. This is handy when writing large data into " + + "temporary files and then moved to a final one. Please be advised that writing data into files " + + "first is highly recommended when using Entity Tracking or Timestamp based listing strategies.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .addValidator(new MustNotContainDirectorySeparatorsValidator()) + .build(); + + private static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; + private static final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US); Review Comment: `SimpleDateFormat` is not thread safe. Using the Java 8 `DateTimeFormatter` provides a better option. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjConnectionPoolService.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.auth.AuthenticationContext; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.util.StringUtils; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjConnectionPoolService extends AbstractControllerService implements SmbConnectionPoolService { Review Comment: It would be helpful to provide a property for configuring the Timeout on SMBClient connections. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
