turcsanyip commented on code in PR #6192: URL: https://github.com/apache/nifi/pull/6192#discussion_r928320217
########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbClientProviderService; +import org.apache.nifi.services.smb.SmbClientService; +import org.apache.nifi.services.smb.SmbListableEntity; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"samba, smb, cifs, files", "list"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Lists concrete files shared via SMB protocol. " + + "Each listed file may result in one flowfile, the metadata being written as flowfile attributes. " + + "Or - in case the 'Record Writer' property is set - the entire result is written as records to a single flowfile. " + + "This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " + + "previous node left off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:\\DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "SHARE:\\DIRECTORY\\sub\\folder\\file."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file's content changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "createTime", description = + "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "lastAccessTime", description = + "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "changeTime", description = + "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), + @WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder from which to list files. This is the remaining relative path " + + "after the share: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + "to add subdirectories. The given path on the remote file share must exist. " + + "This can be checked using verification. You may mix Windows and Linux-style " + + "directory separators.") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum File Age") + .name("min-file-age") + .description("The minimum age that a file must be in order to be listed; any file younger than this " + + "amount of time will be ignored.") + .required(true) + .addValidator(TIME_PERIOD_VALIDATOR) + .defaultValue("5 secs") + .build(); + + public static final PropertyDescriptor MAXIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Maximum File Age") + .name("max-file-age") + .description("Any file older than the given value will be omitted. ") + .required(false) + .addValidator(TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Minimum File Size") + .name("min-file-size") + .description("Any file smaller than the given value will be omitted.") + .required(false) + .addValidator(DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAXIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Maximum File Size") + .name("max-file-size") + .description("Any file larger than the given value will be omitted.") + .required(false) + .addValidator(DATA_SIZE_VALIDATOR) + .build(); + + + public static final PropertyDescriptor SMB_LISTING_STRATEGY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(LISTING_STRATEGY) + .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS) + .build(); + + public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder() + .name("smb-client-provider-service") + .displayName("SMB Client Provider Service") + .description("Specifies the SMB client provider to use for creating SMB connections.") + .required(true) + .identifiesControllerService(SmbClientProviderService.class) + .build(); + + public static final PropertyDescriptor FILE_NAME_SUFFIX_FILTER = new Builder() + .name("file-name-suffix-filter") + .displayName("File Name Suffix Filter") + .description("Files ending with the given suffix will be omitted. Can be used to make sure that files " + + "that are still uploading are not listed multiple times, by having those files have a suffix " + + "and remove the suffix once the upload finishes. This is highly recommended when using " + + "'Tracking Entities' or 'Tracking Timestamps' listing strategies.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .addValidator(new MustNotContainDirectorySeparatorsValidator()) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = unmodifiableList(asList( + SMB_LISTING_STRATEGY, + SMB_CLIENT_PROVIDER_SERVICE, + DIRECTORY, + AbstractListProcessor.RECORD_WRITER, + FILE_NAME_SUFFIX_FILTER, + MINIMUM_AGE, + MAXIMUM_AGE, + MINIMUM_SIZE, + MAXIMUM_SIZE, + AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, + ListedEntityTracker.TRACKING_STATE_CACHE, + ListedEntityTracker.TRACKING_TIME_WINDOW, + ListedEntityTracker.INITIAL_LISTING_TARGET + )); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected Map<String, String> createAttributes(SmbListableEntity entity, ProcessContext context) { + final Map<String, String> attributes = new TreeMap<>(); + attributes.put("filename", entity.getName()); + attributes.put("shortname", entity.getShortName()); + attributes.put("path", entity.getPath()); + attributes.put("absolute.path", getPath(context) + entity.getPathWithName()); + attributes.put("identifier", entity.getIdentifier()); + attributes.put("timestamp", formatTimeStamp(entity.getTimestamp())); + attributes.put("creationTime", formatTimeStamp(entity.getCreationTime())); + attributes.put("lastAccessTime", formatTimeStamp(entity.getLastAccessTime())); + attributes.put("changeTime", formatTimeStamp(entity.getChangeTime())); + attributes.put("size", String.valueOf(entity.getSize())); + attributes.put("allocationSize", String.valueOf(entity.getAllocationSize())); + return unmodifiableMap(attributes); + } + + @Override + protected String getPath(ProcessContext context) { + final SmbClientProviderService clientProviderService = + context.getProperty(SMB_CLIENT_PROVIDER_SERVICE).asControllerService(SmbClientProviderService.class); + final URI serviceLocation = clientProviderService.getServiceLocation(); + final String directory = getDirectory(context); + return String.format("%s:\\%s", serviceLocation.toString(), directory.isEmpty() ? "" : directory + "\\"); Review Comment: The `:` after the share name seems to be a typo. There should not be a colon there. I would suggest using forward slashes (`/`) in SMB URIs instead of backslash (`\`). Mixing them looks to me a bit strange and forward slash seems to be the standard (though it is a draft only): https://datatracker.ietf.org/doc/html/draft-crhertel-smb-url-12#section-7 ########## nifi-nar-bundles/nifi-smb-bundle/nifi-smb-processors/src/main/java/org/apache/nifi/processors/smb/ListSmb.java: ########## @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.smb; + +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; +import static org.apache.nifi.components.state.Scope.CLUSTER; +import static org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.TIME_PERIOD_VALIDATOR; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.list.AbstractListProcessor; +import org.apache.nifi.processor.util.list.ListedEntityTracker; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.services.smb.SmbClientProviderService; +import org.apache.nifi.services.smb.SmbClientService; +import org.apache.nifi.services.smb.SmbListableEntity; + +@PrimaryNodeOnly +@TriggerSerially +@Tags({"samba, smb, cifs, files", "list"}) +@SeeAlso({PutSmbFile.class, GetSmbFile.class}) +@CapabilityDescription("Lists concrete files shared via SMB protocol. " + + "Each listed file may result in one flowfile, the metadata being written as flowfile attributes. " + + "Or - in case the 'Record Writer' property is set - the entire result is written as records to a single flowfile. " + + "This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the " + + "previous node left off without duplicating all of the data.") +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The name of the file that was read from filesystem."), + @WritesAttribute(attribute = "shortname", description = "The short name of the file that was read from filesystem."), + @WritesAttribute(attribute = "path", description = + "The path is set to the relative path of the file's directory " + + "on filesystem compared to the Share and Input Directory properties and the configured host " + + "and port inherited from the configured connection pool controller service. For example, for " + + "a given remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listed from " + + "smb://HOSTNAME:PORT/SHARE:\\DIRECTORY\\sub\\folder\\file then the path attribute will be set to \"sub\\folder\\file\"."), + @WritesAttribute(attribute = "absolute.path", description = + "The absolute.path is set to the absolute path of the file's directory on the remote location. For example, " + + "given a remote location smb://HOSTNAME:PORT/SHARE:\\DIRECTORY, and a file is being listen from " + + "SHARE:\\DIRECTORY\\sub\\folder\\file then the absolute.path attribute will be set to " + + "SHARE:\\DIRECTORY\\sub\\folder\\file."), + @WritesAttribute(attribute = "identifier", description = + "The identifier of the file. This equals to the path attribute so two files with the same relative path " + + "coming from different file shares considered to be identical."), + @WritesAttribute(attribute = "timestamp", description = + "The timestamp of when the file's content changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "createTime", description = + "The timestamp of when the file was created in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "lastAccessTime", description = + "The timestamp of when the file was accessed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "changeTime", description = + "The timestamp of when the file's attributes was changed in the filesystem as 'yyyy-MM-dd'T'HH:mm:ssZ'"), + @WritesAttribute(attribute = "size", description = "The number of bytes in the source file"), + @WritesAttribute(attribute = "allocationSize", description = "The number of bytes allocated for the file on the server"), +}) +@Stateful(scopes = {Scope.CLUSTER}, description = + "After performing a listing of files, the state of the previous listing can be stored in order to list files " + + "continuously without duplication." +) +public class ListSmb extends AbstractListProcessor<SmbListableEntity> { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .displayName("Input Directory") + .name("directory") + .description("The network folder from which to list files. This is the remaining relative path " + + "after the share: smb://HOSTNAME:PORT/SHARE/[DIRECTORY]\\sub\\directories. It is also possible " + + "to add subdirectories. The given path on the remote file share must exist. " + + "This can be checked using verification. You may mix Windows and Linux-style " + + "directory separators.") + .required(false) + .addValidator(NON_BLANK_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Minimum File Age") + .name("min-file-age") + .description("The minimum age that a file must be in order to be listed; any file younger than this " + + "amount of time will be ignored.") + .required(true) + .addValidator(TIME_PERIOD_VALIDATOR) + .defaultValue("5 secs") + .build(); + + public static final PropertyDescriptor MAXIMUM_AGE = new PropertyDescriptor.Builder() + .displayName("Maximum File Age") + .name("max-file-age") + .description("Any file older than the given value will be omitted. ") + .required(false) + .addValidator(TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor MINIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Minimum File Size") + .name("min-file-size") + .description("Any file smaller than the given value will be omitted.") + .required(false) + .addValidator(DATA_SIZE_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAXIMUM_SIZE = new PropertyDescriptor.Builder() + .displayName("Maximum File Size") + .name("max-file-size") + .description("Any file larger than the given value will be omitted.") + .required(false) + .addValidator(DATA_SIZE_VALIDATOR) + .build(); + + + public static final PropertyDescriptor SMB_LISTING_STRATEGY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(LISTING_STRATEGY) + .allowableValues(BY_ENTITIES, NO_TRACKING, BY_TIMESTAMPS) + .build(); + + public static final PropertyDescriptor SMB_CLIENT_PROVIDER_SERVICE = new Builder() + .name("smb-client-provider-service") + .displayName("SMB Client Provider Service") + .description("Specifies the SMB client provider to use for creating SMB connections.") + .required(true) + .identifiesControllerService(SmbClientProviderService.class) + .build(); + + public static final PropertyDescriptor FILE_NAME_SUFFIX_FILTER = new Builder() + .name("file-name-suffix-filter") + .displayName("File Name Suffix Filter") + .description("Files ending with the given suffix will be omitted. Can be used to make sure that files " + + "that are still uploading are not listed multiple times, by having those files have a suffix " + + "and remove the suffix once the upload finishes. This is highly recommended when using " + + "'Tracking Entities' or 'Tracking Timestamps' listing strategies.") + .required(false) + .addValidator(NON_EMPTY_VALIDATOR) + .addValidator(new MustNotContainDirectorySeparatorsValidator()) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = unmodifiableList(asList( + SMB_LISTING_STRATEGY, + SMB_CLIENT_PROVIDER_SERVICE, + DIRECTORY, + AbstractListProcessor.RECORD_WRITER, + FILE_NAME_SUFFIX_FILTER, + MINIMUM_AGE, + MAXIMUM_AGE, + MINIMUM_SIZE, + MAXIMUM_SIZE, + AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION, + ListedEntityTracker.TRACKING_STATE_CACHE, + ListedEntityTracker.TRACKING_TIME_WINDOW, + ListedEntityTracker.INITIAL_LISTING_TARGET + )); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected Map<String, String> createAttributes(SmbListableEntity entity, ProcessContext context) { + final Map<String, String> attributes = new TreeMap<>(); + attributes.put("filename", entity.getName()); + attributes.put("shortname", entity.getShortName()); + attributes.put("path", entity.getPath()); + attributes.put("absolute.path", getPath(context) + entity.getPathWithName()); Review Comment: If I specify a directory in the `Input Directory` property, then the directory appears twice in the SMB URL: `smb://localhost:445/public:\dir1\dir1\dir2\file3` Suggesting forward slashes here too. Also in `identifier` and `path`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
