exceptionfactory commented on code in PR #6192: URL: https://github.com/apache/nifi/pull/6192#discussion_r921492624
########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjSessionProviderService.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.transport.tcp.async.AsyncDirectTcpTransportFactory; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjSessionProviderService extends AbstractControllerService implements SmbSessionProviderService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host of the SMB file server.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder() + .displayName("Domain") + .name("domain") + .description( + "The domain used for authentication. Optional, in most cases username and password is sufficient.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .displayName("Username") + .name("username") + .description( + "The username used for authentication.") + .required(false) + .defaultValue("Guest") Review Comment: Is there a reason to have `Guest` as the default value as opposed to leaving this blank as an optional property? ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjSessionProviderService.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.transport.tcp.async.AsyncDirectTcpTransportFactory; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjSessionProviderService extends AbstractControllerService implements SmbSessionProviderService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host of the SMB file server.") + .required(false) Review Comment: It looks like this property should be required, correct? ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjSessionProviderService.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.transport.tcp.async.AsyncDirectTcpTransportFactory; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjSessionProviderService extends AbstractControllerService implements SmbSessionProviderService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host of the SMB file server.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder() + .displayName("Domain") + .name("domain") + .description( + "The domain used for authentication. Optional, in most cases username and password is sufficient.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .displayName("Username") + .name("username") + .description( + "The username used for authentication.") + .required(false) + .defaultValue("Guest") + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .displayName("Password") + .name("password") + .description("The password used for authentication.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .displayName("Port") + .name("port") + .description("Port to use for connection.") + .required(true) + .addValidator(PORT_VALIDATOR) + .defaultValue("445") + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .displayName("Timeout") + .name("timeout") + .description("Timeout in seconds for read and write operations.") + .required(true) + .defaultValue("5") + .addValidator(INTEGER_VALIDATOR) + .build(); + private static final List<PropertyDescriptor> PROPERTIES = Collections + .unmodifiableList(asList( + HOSTNAME, + DOMAIN, + USERNAME, + PASSWORD, + PORT, + TIMEOUT + )); + private SMBClient smbClient; + private AuthenticationContext authenticationContext; + private ConfigurationContext context; + private String hostname; + private Integer port; Review Comment: Recommend changing `port` to a primitive `int` since it is a required property. ```suggestion private int port; ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjSessionProviderService.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.transport.tcp.async.AsyncDirectTcpTransportFactory; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjSessionProviderService extends AbstractControllerService implements SmbSessionProviderService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host of the SMB file server.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder() + .displayName("Domain") + .name("domain") + .description( + "The domain used for authentication. Optional, in most cases username and password is sufficient.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .displayName("Username") + .name("username") + .description( + "The username used for authentication.") + .required(false) + .defaultValue("Guest") + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .displayName("Password") + .name("password") + .description("The password used for authentication.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .displayName("Port") + .name("port") + .description("Port to use for connection.") + .required(true) + .addValidator(PORT_VALIDATOR) + .defaultValue("445") + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .displayName("Timeout") + .name("timeout") + .description("Timeout in seconds for read and write operations.") + .required(true) + .defaultValue("5") + .addValidator(INTEGER_VALIDATOR) + .build(); + private static final List<PropertyDescriptor> PROPERTIES = Collections + .unmodifiableList(asList( + HOSTNAME, + DOMAIN, + USERNAME, + PASSWORD, + PORT, Review Comment: Recommend moving `PORT` right after `HOSTNAME`: ```suggestion HOSTNAME, PORT, DOMAIN, USERNAME, PASSWORD, ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjSessionProviderService.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.transport.tcp.async.AsyncDirectTcpTransportFactory; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjSessionProviderService extends AbstractControllerService implements SmbSessionProviderService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host of the SMB file server.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder() + .displayName("Domain") + .name("domain") + .description( + "The domain used for authentication. Optional, in most cases username and password is sufficient.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .displayName("Username") + .name("username") + .description( + "The username used for authentication.") + .required(false) + .defaultValue("Guest") + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .displayName("Password") + .name("password") + .description("The password used for authentication.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .displayName("Port") + .name("port") + .description("Port to use for connection.") + .required(true) + .addValidator(PORT_VALIDATOR) + .defaultValue("445") + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .displayName("Timeout") + .name("timeout") + .description("Timeout in seconds for read and write operations.") + .required(true) + .defaultValue("5") + .addValidator(INTEGER_VALIDATOR) + .build(); + private static final List<PropertyDescriptor> PROPERTIES = Collections + .unmodifiableList(asList( + HOSTNAME, + DOMAIN, + USERNAME, + PASSWORD, + PORT, + TIMEOUT + )); + private SMBClient smbClient; + private AuthenticationContext authenticationContext; + private ConfigurationContext context; + private String hostname; + private Integer port; + + @Override + public Session getSession() { + try { + return smbClient.connect(hostname, port).authenticate(authenticationContext); Review Comment: Although chaining these methods is concise, it would be helpful to declare the `Connection` variable and then call `authenticate()` to make it easier to distinguish between connection and authentication failures. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjSessionProviderService.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.transport.tcp.async.AsyncDirectTcpTransportFactory; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjSessionProviderService extends AbstractControllerService implements SmbSessionProviderService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host of the SMB file server.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder() + .displayName("Domain") + .name("domain") + .description( + "The domain used for authentication. Optional, in most cases username and password is sufficient.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .displayName("Username") + .name("username") + .description( + "The username used for authentication.") + .required(false) + .defaultValue("Guest") + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .displayName("Password") + .name("password") + .description("The password used for authentication.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .displayName("Port") + .name("port") + .description("Port to use for connection.") + .required(true) + .addValidator(PORT_VALIDATOR) + .defaultValue("445") + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .displayName("Timeout") + .name("timeout") + .description("Timeout in seconds for read and write operations.") + .required(true) + .defaultValue("5") + .addValidator(INTEGER_VALIDATOR) Review Comment: Timeout settings should use the `TIME_PERIOD_VALIDATOR`, which allows various expressions for milliseconds, seconds, or minutes. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjSessionProviderService.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.transport.tcp.async.AsyncDirectTcpTransportFactory; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjSessionProviderService extends AbstractControllerService implements SmbSessionProviderService { Review Comment: With renaming the interface, recommend renaming this class to `StandardSmbSessionProviderService` since the interface itself ties the implementation to `SMBJ`. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjSessionProviderService.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.transport.tcp.async.AsyncDirectTcpTransportFactory; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjSessionProviderService extends AbstractControllerService implements SmbSessionProviderService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host of the SMB file server.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder() + .displayName("Domain") + .name("domain") + .description( + "The domain used for authentication. Optional, in most cases username and password is sufficient.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .displayName("Username") + .name("username") + .description( + "The username used for authentication.") + .required(false) + .defaultValue("Guest") + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .displayName("Password") + .name("password") + .description("The password used for authentication.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .displayName("Port") + .name("port") + .description("Port to use for connection.") + .required(true) + .addValidator(PORT_VALIDATOR) + .defaultValue("445") + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .displayName("Timeout") + .name("timeout") + .description("Timeout in seconds for read and write operations.") + .required(true) + .defaultValue("5") + .addValidator(INTEGER_VALIDATOR) + .build(); + private static final List<PropertyDescriptor> PROPERTIES = Collections + .unmodifiableList(asList( + HOSTNAME, + DOMAIN, + USERNAME, + PASSWORD, + PORT, + TIMEOUT + )); + private SMBClient smbClient; + private AuthenticationContext authenticationContext; + private ConfigurationContext context; + private String hostname; + private Integer port; + + @Override + public Session getSession() { + try { + return smbClient.connect(hostname, port).authenticate(authenticationContext); + } catch (IOException e) { + throw new UncheckedIOException(e); Review Comment: Recommend including a message with this exception: ```suggestion throw new UncheckedIOException("SMB connect failed", e); ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjSessionProviderService.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.transport.tcp.async.AsyncDirectTcpTransportFactory; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjSessionProviderService extends AbstractControllerService implements SmbSessionProviderService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host of the SMB file server.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder() + .displayName("Domain") + .name("domain") + .description( + "The domain used for authentication. Optional, in most cases username and password is sufficient.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .displayName("Username") + .name("username") + .description( + "The username used for authentication.") + .required(false) + .defaultValue("Guest") + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .displayName("Password") + .name("password") + .description("The password used for authentication.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .displayName("Port") + .name("port") + .description("Port to use for connection.") + .required(true) + .addValidator(PORT_VALIDATOR) + .defaultValue("445") + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .displayName("Timeout") + .name("timeout") + .description("Timeout in seconds for read and write operations.") + .required(true) + .defaultValue("5") + .addValidator(INTEGER_VALIDATOR) + .build(); + private static final List<PropertyDescriptor> PROPERTIES = Collections + .unmodifiableList(asList( + HOSTNAME, + DOMAIN, + USERNAME, + PASSWORD, + PORT, + TIMEOUT + )); + private SMBClient smbClient; + private AuthenticationContext authenticationContext; + private ConfigurationContext context; + private String hostname; + private Integer port; + + @Override + public Session getSession() { + try { + return smbClient.connect(hostname, port).authenticate(authenticationContext); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public URI getServiceLocation() { + return URI.create(String.format("smb://%s:%d", hostname, port)); + } + + @OnEnabled + public void onEnabled(ConfigurationContext context) throws IOException { + this.context = context; + this.hostname = context.getProperty(HOSTNAME).getValue(); + this.port = context.getProperty(PORT).asInteger(); + this.smbClient = new SMBClient(SmbConfig.builder() + .withTimeout(context.getProperty(TIMEOUT).asLong(), SECONDS) + .withTransportLayerFactory(new AsyncDirectTcpTransportFactory<>()) + .build()); + createAuthenticationContext(); + } + + @OnDisabled + public void onDisabled() { + smbClient.close(); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final String hostname = validationContext.getProperty(HOSTNAME).getValue(); + final String port = validationContext.getProperty(PORT).getValue(); + final String timeout = validationContext.getProperty(TIMEOUT).getValue(); + + return asList(HOSTNAME.validate(hostname, validationContext), + PORT.validate(port, validationContext), + TIMEOUT.validate(timeout, validationContext) + ); + } Review Comment: This method can be removed since the standard validators work for each property. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/pom.xml: ########## @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-smb-bundle</artifactId> + <version>1.17.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-smb-connection-pool</artifactId> Review Comment: Recommend renaming this module to `nifi-smb-session-service` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjSessionProviderService.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.transport.tcp.async.AsyncDirectTcpTransportFactory; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") Review Comment: ```suggestion @CapabilityDescription("Provides access to SMB Sessions with shared authentication credentials.") ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/pom.xml: ########## @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-smb-bundle</artifactId> + <version>1.17.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-smb-connection-pool</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-smb-connection-pool-api</artifactId> + <version>1.17.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> Review Comment: These dependencies do not appear to be used. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbSessionProviderService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file's content in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "createTime", description = + "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "lastAccessTime", description = + "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "changeTime", description = + "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), + @WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum File Age in milliseconds") Review Comment: The time period can be specified using milliseconds, seconds, minutes, or hours, so this should be changed ```suggestion .displayName("Minimum File Age") ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbSessionProviderService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file's content in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "createTime", description = + "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "lastAccessTime", description = + "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "changeTime", description = + "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), + @WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum File Age in milliseconds") + .name("min-file-age") + .description( + "Any file younger then the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(TIME_PERIOD_VALIDATOR) + .defaultValue("5ms") Review Comment: ```suggestion .defaultValue("5 ms") ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool-api/pom.xml: ########## @@ -0,0 +1,39 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-smb-bundle</artifactId> + <version>1.17.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-smb-connection-pool-api</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <version>1.17.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>com.hierynomus</groupId> + <artifactId>smbj</artifactId> + </dependency> + <dependency> + <groupId>net.engio</groupId> + <artifactId>mbassador</artifactId> + </dependency> Review Comment: This appears to be a transitive dependency of `smbj` and does not need to be declared directly. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbSessionProviderService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file's content in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "createTime", description = + "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "lastAccessTime", description = + "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "changeTime", description = + "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), + @WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum File Age in milliseconds") + .name("min-file-age") + .description( + "Any file younger then the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(TIME_PERIOD_VALIDATOR) + .defaultValue("5ms") + .build(); + + public static final PropertyDescriptor MINIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Minimum File Size in bytes") Review Comment: Corresponding to change the size validation, this can be changed: ```suggestion .displayName("Minimum File Size") ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbSessionProviderService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file's content in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "createTime", description = + "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "lastAccessTime", description = + "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "changeTime", description = + "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), + @WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum File Age in milliseconds") + .name("min-file-age") + .description( + "Any file younger then the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(TIME_PERIOD_VALIDATOR) + .defaultValue("5ms") + .build(); + + public static final PropertyDescriptor MINIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Minimum File Size in bytes") + .name("min-file-size") + .description("Any file smaller then the given value will be omitted.") + .required(false) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAXIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Maximum File Size in bytes") + .name("max-file-size") + .description("Any file bigger then the given value will be omitted.") + .required(false) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + + public static final PropertyDescriptor SMB_LISTING_STRATEGY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(LISTING_STRATEGY) + .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS) + .build(); + + public static final PropertyDescriptor SMB_CONNECTION_POOL_SERVICE = new Builder() + .name("smb-connection-pool-service") + .displayName("SMB Connection Pool Service") + .description("Specifies the SMB Connection Pool to use for creating SMB connections.") + .required(true) + .identifiesControllerService(SmbSessionProviderService.class) + .build(); + + public static final PropertyDescriptor SKIP_FILES_WITH_SUFFIX = new Builder() + .name("file-name-suffix-filter") + .displayName("File Name Suffix Filter") + .description("Files ends with the given suffix will be omitted. This is handy when writing large data into " + + "temporary files and then moved to a final one. Please be advised that writing data into files " + + "first is highly recommended when using Entity Tracking or Timestamp based listing strategies.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .addValidator(new MustNotContainDirectorySeparatorsValidator()) + .build(); + public static final PropertyDescriptor SHARE = new PropertyDescriptor.Builder() + .displayName("Share") + .name("share") + .description("The network share to which files should be listed from. This is the \"first folder\"" + + "after the hostname: smb://hostname:port\\[share]\\dir1\\dir2") + .required(false) Review Comment: It seems like `Share` should be a required property. Is it possible to connect without a Share? ```suggestion .required(true) ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbSessionProviderService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file's content in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "createTime", description = + "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "lastAccessTime", description = + "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "changeTime", description = + "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), + @WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum File Age in milliseconds") + .name("min-file-age") + .description( + "Any file younger then the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(TIME_PERIOD_VALIDATOR) + .defaultValue("5ms") + .build(); + + public static final PropertyDescriptor MINIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Minimum File Size in bytes") + .name("min-file-size") + .description("Any file smaller then the given value will be omitted.") + .required(false) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAXIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Maximum File Size in bytes") + .name("max-file-size") + .description("Any file bigger then the given value will be omitted.") + .required(false) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) Review Comment: ```suggestion .addValidator(DATA_SIZE_VALIDATOR) ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/NiFiSmbClientFactory.java: ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.share.DiskShare; +import com.hierynomus.smbj.share.Share; + +public class NiFiSmbClientFactory { + + NiFiSmbClient create(Session session, String shareName) { + final Share share = session.connectShare(shareName); + if (share instanceof DiskShare) { + return new NiFiSmbClient(session, (DiskShare) share); + } else { + throw new IllegalArgumentException("NiFi supports only disk shares but " + Review Comment: Recommend adjusting the wording: ```suggestion throw new IllegalArgumentException("SMB DiskShare not found. Share " + ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/NiFiSmbClientFactory.java: ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.share.DiskShare; +import com.hierynomus.smbj.share.Share; + +public class NiFiSmbClientFactory { Review Comment: Is there a reason for a separate class as opposed to just placing the method in `ListSmb` processor? ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbSessionProviderService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file's content in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "createTime", description = + "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "lastAccessTime", description = + "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "changeTime", description = + "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), + @WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum File Age in milliseconds") + .name("min-file-age") + .description( + "Any file younger then the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(TIME_PERIOD_VALIDATOR) + .defaultValue("5ms") + .build(); + + public static final PropertyDescriptor MINIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Minimum File Size in bytes") + .name("min-file-size") + .description("Any file smaller then the given value will be omitted.") + .required(false) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAXIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Maximum File Size in bytes") Review Comment: ```suggestion .displayName("Maximum File Size") ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/src/main/java/org/apache/nifi/services/smb/SmbjSessionProviderService.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.smb; + +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.PORT_VALIDATOR; + +import com.hierynomus.smbj.SMBClient; +import com.hierynomus.smbj.SmbConfig; +import com.hierynomus.smbj.auth.AuthenticationContext; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.transport.tcp.async.AsyncDirectTcpTransportFactory; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; + +@Tags({"microsoft", "samba"}) +@CapabilityDescription("Provides connection pool for ListSmb processor. ") +public class SmbjSessionProviderService extends AbstractControllerService implements SmbSessionProviderService { + + public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .displayName("Hostname") + .name("hostname") + .description("The network host of the SMB file server.") + .required(false) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + public static final PropertyDescriptor DOMAIN = new PropertyDescriptor.Builder() + .displayName("Domain") + .name("domain") + .description( + "The domain used for authentication. Optional, in most cases username and password is sufficient.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .displayName("Username") + .name("username") + .description( + "The username used for authentication.") + .required(false) + .defaultValue("Guest") + .addValidator(NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .displayName("Password") + .name("password") + .description("The password used for authentication.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .displayName("Port") + .name("port") + .description("Port to use for connection.") + .required(true) + .addValidator(PORT_VALIDATOR) + .defaultValue("445") + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .displayName("Timeout") + .name("timeout") + .description("Timeout in seconds for read and write operations.") + .required(true) + .defaultValue("5") Review Comment: Changing to use the Time Period Validator will allow changing this to `5 secs`. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool-api/pom.xml: ########## @@ -0,0 +1,39 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-smb-bundle</artifactId> + <version>1.17.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-smb-connection-pool-api</artifactId> Review Comment: With the interface change to `SmbSessionProviderService`, recommend renaming the module to `nifi-smb-session-api`, and renaming the NAR to `nifi-smb-session-api-nar`. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-connection-pool/pom.xml: ########## @@ -0,0 +1,72 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-smb-bundle</artifactId> + <version>1.17.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-smb-connection-pool</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-smb-connection-pool-api</artifactId> + <version>1.17.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>com.hierynomus</groupId> + <artifactId>smbj</artifactId> + </dependency> + <dependency> + <groupId>net.engio</groupId> + <artifactId>mbassador</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> Review Comment: These dependencies do not appear to be used. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbSessionProviderService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file's content in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "createTime", description = + "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "lastAccessTime", description = + "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "changeTime", description = + "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), + @WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum File Age in milliseconds") + .name("min-file-age") + .description( + "Any file younger then the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(TIME_PERIOD_VALIDATOR) + .defaultValue("5ms") + .build(); + + public static final PropertyDescriptor MINIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Minimum File Size in bytes") + .name("min-file-size") + .description("Any file smaller then the given value will be omitted.") + .required(false) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAXIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Maximum File Size in bytes") + .name("max-file-size") + .description("Any file bigger then the given value will be omitted.") + .required(false) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + + public static final PropertyDescriptor SMB_LISTING_STRATEGY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(LISTING_STRATEGY) + .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS) + .build(); + + public static final PropertyDescriptor SMB_CONNECTION_POOL_SERVICE = new Builder() + .name("smb-connection-pool-service") + .displayName("SMB Connection Pool Service") + .description("Specifies the SMB Connection Pool to use for creating SMB connections.") Review Comment: This should be adjusted based on the interface renaming: ```suggestion .name("smb-session-provider-service") .displayName("SMB Session Provider Service") .description("Specifies the SMB Session Provider to use for obtaining SMB connections.") ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/NiFiSmbClient.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.asList; +import static java.util.stream.StreamSupport.stream; + +import com.hierynomus.msdtyp.AccessMask; +import com.hierynomus.msfscc.FileAttributes; +import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation; +import com.hierynomus.mssmb2.SMB2CreateDisposition; +import com.hierynomus.mssmb2.SMB2CreateOptions; +import com.hierynomus.mssmb2.SMB2ShareAccess; +import com.hierynomus.mssmb2.SMBApiException; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.share.Directory; +import com.hierynomus.smbj.share.DiskShare; +import com.hierynomus.smbj.share.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.util.EnumSet; +import java.util.List; +import java.util.stream.Stream; + +public class NiFiSmbClient { Review Comment: Renaming this class would help communicate the purpose. Perhaps `SmbSessionClient` or `SmbShareClient`? Also seeing that this class is being mocked for testing, recommend defining it as an interface and then creating a standard implementation. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbSessionProviderService; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"microsoft", "storage", "samba"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Retrieves a listing of files shared via SMB protocol. For each file that is listed, " + + "creates a FlowFile that represents the file. This Processor is designed to run on Primary Node only in " + + "a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left " + + "off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "\"SHARE:\\DIRECTORY\\sub\\folder\\file\"."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file's content in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "createTime", description = + "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "lastAccessTime", description = + "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "changeTime", description = + "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), + @WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder to which files should be written. This is the remaining relative " + + "after the hostname: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + " to add subdirectories using this property. The given path on the remote file share must exists. " + + "The existence of the remote folder can be checked using verification. You may mix different " + + "directory separators in this property. If so NiFi will unify all of them and will use windows's" + + "directory separator: '\\' ") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum File Age in milliseconds") + .name("min-file-age") + .description( + "Any file younger then the given value will be omitted. Ideally this value should be greater then" + + "the amount of time needed to perform a list.") + .required(true) + .addValidator(TIME_PERIOD_VALIDATOR) + .defaultValue("5ms") + .build(); + + public static final PropertyDescriptor MINIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Minimum File Size in bytes") + .name("min-file-size") + .description("Any file smaller then the given value will be omitted.") + .required(false) + .addValidator(NON_NEGATIVE_INTEGER_VALIDATOR) Review Comment: This should be changed to use the `DATA_SIZE_VALIDATOR`, which supports various expressions. ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/NiFiSmbClientFactory.java: ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.share.DiskShare; +import com.hierynomus.smbj.share.Share; + +public class NiFiSmbClientFactory { + + NiFiSmbClient create(Session session, String shareName) { + final Share share = session.connectShare(shareName); + if (share instanceof DiskShare) { + return new NiFiSmbClient(session, (DiskShare) share); + } else { + throw new IllegalArgumentException("NiFi supports only disk shares but " + + share.getClass().getSimpleName() + " found on host " + session.getConnection().getRemoteHostname() + + "!"); Review Comment: ```suggestion share.getClass().getSimpleName() + " found on host " + session.getConnection().getRemoteHostname()); ``` ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/NiFiSmbClient.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.asList; +import static java.util.stream.StreamSupport.stream; + +import com.hierynomus.msdtyp.AccessMask; +import com.hierynomus.msfscc.FileAttributes; +import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation; +import com.hierynomus.mssmb2.SMB2CreateDisposition; +import com.hierynomus.mssmb2.SMB2CreateOptions; +import com.hierynomus.mssmb2.SMB2ShareAccess; +import com.hierynomus.mssmb2.SMBApiException; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.share.Directory; +import com.hierynomus.smbj.share.DiskShare; +import com.hierynomus.smbj.share.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.util.EnumSet; +import java.util.List; +import java.util.stream.Stream; + +public class NiFiSmbClient { + + private static final List<String> SPECIAL_DIRECTORIES = asList(".", ".."); Review Comment: Perhaps renaming this `HIDDEN_DIRECTORIES` would be better? ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/NiFiSmbClient.java: ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.util.Arrays.asList; +import static java.util.stream.StreamSupport.stream; + +import com.hierynomus.msdtyp.AccessMask; +import com.hierynomus.msfscc.FileAttributes; +import com.hierynomus.msfscc.fileinformation.FileIdBothDirectoryInformation; +import com.hierynomus.mssmb2.SMB2CreateDisposition; +import com.hierynomus.mssmb2.SMB2CreateOptions; +import com.hierynomus.mssmb2.SMB2ShareAccess; +import com.hierynomus.mssmb2.SMBApiException; +import com.hierynomus.smbj.session.Session; +import com.hierynomus.smbj.share.Directory; +import com.hierynomus.smbj.share.DiskShare; +import com.hierynomus.smbj.share.File; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.util.EnumSet; +import java.util.List; +import java.util.stream.Stream; + +public class NiFiSmbClient { + + private static final List<String> SPECIAL_DIRECTORIES = asList(".", ".."); + + private final Session session; + private final DiskShare share; + + NiFiSmbClient(Session session, DiskShare share) { + this.session = session; + this.share = share; + } + + static String unifyDirectorySeparators(String path) { + return path.replace('/', '\\'); + } + + public void close() { + try { + session.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); Review Comment: An exception message should be added: ```suggestion throw new UncheckedIOException("SMB Session close failed", e); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
