http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java index 0000000,2152d2e..2a2504c mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFile.java @@@ -1,0 -1,456 +1,456 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.File; + import java.io.FileFilter; + import java.io.IOException; + import java.nio.file.FileStore; + import java.nio.file.Files; + import java.nio.file.Path; + import java.nio.file.Paths; + import java.nio.file.attribute.BasicFileAttributeView; + import java.nio.file.attribute.BasicFileAttributes; + import java.nio.file.attribute.FileOwnerAttributeView; + import java.nio.file.attribute.PosixFileAttributeView; + import java.nio.file.attribute.PosixFilePermissions; + import java.text.DateFormat; + import java.text.SimpleDateFormat; + import java.util.ArrayList; + import java.util.Collections; + import java.util.Date; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.ListIterator; + import java.util.Locale; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.atomic.AtomicReference; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReentrantLock; + import java.util.regex.Pattern; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.DataUnit; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.Tags; -import org.apache.nifi.processor.annotation.TriggerWhenEmpty; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; ++import org.apache.nifi.annotation.documentation.Tags; ++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.util.StandardValidators; + + @TriggerWhenEmpty + @Tags({"local", "files", "filesystem", "ingest", "ingress", "get", "source", "input"}) + @CapabilityDescription("Creates FlowFiles from files in a directory. NiFi will ignore files it doesn't have at least read permissions for.") + public class GetFile extends AbstractProcessor { + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .name("Input Directory") + .description("The input directory from which to pull files") + .required(true) + .addValidator(StandardValidators.createDirectoryExistsValidator(true, false)) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor RECURSE = new PropertyDescriptor.Builder() + .name("Recurse Subdirectories") + .description("Indicates whether or not to pull files from subdirectories") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder() + .name("Keep Source File") + .description("If true, the file is not deleted after it has been copied to the Content Repository; " + + "this causes the file to be picked up continually and is useful for testing purposes. " + + "If not keeping original NiFi will need write permissions on the directory it is pulling " + + "from otherwise it will ignore the file.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder() + .name("File Filter") + .description("Only files whose names match the given regular expression will be picked up") + .required(true) + .defaultValue("[^\\.].*") + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + public static final PropertyDescriptor PATH_FILTER = new PropertyDescriptor.Builder() + .name("Path Filter") + .description("When " + RECURSE.getName() + " is true, then only subdirectories whose path matches the given regular expression will be scanned") + .required(false) + .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .build(); + public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder() + .name("Minimum File Age") + .description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 sec") + .build(); + public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder() + .name("Maximum File Age") + .description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored") + .required(false) + .addValidator(StandardValidators.createTimePeriodValidator(100, TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS)) + .build(); + public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder() + .name("Minimum File Size") + .description("The minimum size that a file must be in order to be pulled") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("0 B") + .build(); + public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder() + .name("Maximum File Size") + .description("The maximum size that a file can be in order to be pulled") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + public static final PropertyDescriptor IGNORE_HIDDEN_FILES = new PropertyDescriptor.Builder() + .name("Ignore Hidden Files") + .description("Indicates whether or not hidden files should be ignored") + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder() + .name("Polling Interval") + .description("Indicates how long to wait before performing a directory listing") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("0 sec") + .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of files to pull in each iteration") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10") + .build(); + + public static final String FILE_CREATION_TIME_ATTRIBUTE = "file.creationTime"; + public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime"; + public static final String FILE_LAST_ACCESS_TIME_ATTRIBUTE = "file.lastAccessTime"; + public static final String FILE_OWNER_ATTRIBUTE = "file.owner"; + public static final String FILE_GROUP_ATTRIBUTE = "file.group"; + public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions"; + public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files are routed to success").build(); + + private List<PropertyDescriptor> properties; + private Set<Relationship> relationships; + private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>(); + + private final BlockingQueue<File> fileQueue = new LinkedBlockingQueue<>(); + private final Set<File> inProcess = new HashSet<>(); // guarded by queueLock + private final Set<File> recentlyProcessed = new HashSet<>(); // guarded by queueLock + private final Lock queueLock = new ReentrantLock(); + + private final Lock listingLock = new ReentrantLock(); + + private final AtomicLong queueLastUpdated = new AtomicLong(0L); + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(DIRECTORY); + properties.add(FILE_FILTER); + properties.add(PATH_FILTER); + properties.add(BATCH_SIZE); + properties.add(KEEP_SOURCE_FILE); + properties.add(RECURSE); + properties.add(POLLING_INTERVAL); + properties.add(IGNORE_HIDDEN_FILES); + properties.add(MIN_AGE); + properties.add(MAX_AGE); + properties.add(MIN_SIZE); + properties.add(MAX_SIZE); + this.properties = Collections.unmodifiableList(properties); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + fileFilterRef.set(createFileFilter(context)); + fileQueue.clear(); + } + + private FileFilter createFileFilter(final ProcessContext context) { + final long minSize = context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue(); + final Double maxSize = context.getProperty(MAX_SIZE).asDataSize(DataUnit.B); + final long minAge = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final Long maxAge = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); + final boolean ignoreHidden = context.getProperty(IGNORE_HIDDEN_FILES).asBoolean(); + final Pattern filePattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue()); + final String indir = context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue(); + final boolean recurseDirs = context.getProperty(RECURSE).asBoolean(); + final String pathPatternStr = context.getProperty(PATH_FILTER).getValue(); + final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr); + final boolean keepOriginal = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); + + return new FileFilter() { + @Override + public boolean accept(final File file) { + if (minSize > file.length()) { + return false; + } + if (maxSize != null && maxSize < file.length()) { + return false; + } + final long fileAge = System.currentTimeMillis() - file.lastModified(); + if (minAge > fileAge) { + return false; + } + if (maxAge != null && maxAge < fileAge) { + return false; + } + if (ignoreHidden && file.isHidden()) { + return false; + } + if (pathPattern != null) { + Path reldir = Paths.get(indir).relativize(file.toPath()).getParent(); + if (reldir != null && !reldir.toString().isEmpty()) { + if (!pathPattern.matcher(reldir.toString()).matches()) { + return false; + } + } + } + //Verify that we have at least read permissions on the file we're considering grabbing + if(!Files.isReadable(file.toPath())){ + return false; + } + + //Verify that if we're not keeping original that we have write permissions on the directory the file is in + if(keepOriginal == false && !Files.isWritable(file.toPath().getParent())){ + return false; + } + return filePattern.matcher(file.getName()).matches(); + } + }; + } + + private Set<File> performListing(final File directory, final FileFilter filter, final boolean recurseSubdirectories) { + final Set<File> queue = new HashSet<>(); + if (!directory.exists()) { + return queue; + } + // this check doesn't work on Windows + if (!directory.canRead()) { + getLogger().warn("No read permission on directory {}", new Object[]{directory.toString()}); + } + + final File[] children = directory.listFiles(); + if (children == null) { + return queue; + } + + for (final File child : children) { + if (child.isDirectory()) { + if (recurseSubdirectories) { + queue.addAll(performListing(child, filter, recurseSubdirectories)); + } + } else if (filter.accept(child)) { + queue.add(child); + } + } + + return queue; + } + + protected Map<String, String> getAttributesFromFile(final Path file) { + Map<String, String> attributes = new HashMap<>(); + try { + FileStore store = Files.getFileStore(file); + if (store.supportsFileAttributeView("basic")) { + try { + final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US); + BasicFileAttributeView view = Files.getFileAttributeView(file, BasicFileAttributeView.class); + BasicFileAttributes attrs = view.readAttributes(); + attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastModifiedTime().toMillis()))); + attributes.put(FILE_CREATION_TIME_ATTRIBUTE, formatter.format(new Date(attrs.creationTime().toMillis()))); + attributes.put(FILE_LAST_ACCESS_TIME_ATTRIBUTE, formatter.format(new Date(attrs.lastAccessTime().toMillis()))); + } catch (Exception ignore) { + } // allow other attributes if these fail + } + if (store.supportsFileAttributeView("owner")) { + try { + FileOwnerAttributeView view = Files.getFileAttributeView(file, FileOwnerAttributeView.class); + attributes.put(FILE_OWNER_ATTRIBUTE, view.getOwner().getName()); + } catch (Exception ignore) { + } // allow other attributes if these fail + } + if (store.supportsFileAttributeView("posix")) { + try { + PosixFileAttributeView view = Files.getFileAttributeView(file, PosixFileAttributeView.class); + attributes.put(FILE_PERMISSIONS_ATTRIBUTE, PosixFilePermissions.toString(view.readAttributes().permissions())); + attributes.put(FILE_GROUP_ATTRIBUTE, view.readAttributes().group().getName()); + } catch (Exception ignore) { + } // allow other attributes if these fail + } + } catch (IOException ioe) { + // well then this FlowFile gets none of these attributes + } + + return attributes; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final File directory = new File(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue()); + final boolean keepingSourceFile = context.getProperty(KEEP_SOURCE_FILE).asBoolean(); + final ProcessorLog logger = getLogger(); + + if (fileQueue.size() < 100) { + final long pollingMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS); + if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingMillis) && listingLock.tryLock()) { + try { + final Set<File> listing = performListing(directory, fileFilterRef.get(), + context.getProperty(RECURSE).asBoolean().booleanValue()); + + queueLock.lock(); + try { + listing.removeAll(inProcess); + if (!keepingSourceFile) { + listing.removeAll(recentlyProcessed); + } + + fileQueue.clear(); + fileQueue.addAll(listing); + + queueLastUpdated.set(System.currentTimeMillis()); + recentlyProcessed.clear(); + + if (listing.isEmpty()) { + context.yield(); + } + } finally { + queueLock.unlock(); + } + } finally { + listingLock.unlock(); + } + } + } + + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final List<File> files = new ArrayList<>(batchSize); + queueLock.lock(); + try { + fileQueue.drainTo(files, batchSize); + if (files.isEmpty()) { + return; + } else { + inProcess.addAll(files); + } + } finally { + queueLock.unlock(); + } + + final ListIterator<File> itr = files.listIterator(); + FlowFile flowFile = null; + try { + final Path directoryPath = directory.toPath(); + while (itr.hasNext()) { + final File file = itr.next(); + final Path filePath = file.toPath(); + final Path relativePath = directoryPath.relativize(filePath.getParent()); + String relativePathString = relativePath.toString() + "/"; + if (relativePathString.isEmpty()) { + relativePathString = "./"; + } + final Path absPath = filePath.toAbsolutePath(); + final String absPathString = absPath.getParent().toString() + "/"; + + flowFile = session.create(); + final long importStart = System.nanoTime(); + flowFile = session.importFrom(filePath, keepingSourceFile, flowFile); + final long importNanos = System.nanoTime() - importStart; + final long importMillis = TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS); + + flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), file.getName()); + flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePathString); + flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString); + Map<String, String> attributes = getAttributesFromFile(filePath); + if (attributes.size() > 0) { + flowFile = session.putAllAttributes(flowFile, attributes); + } + + session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), importMillis); + session.transfer(flowFile, REL_SUCCESS); + logger.info("added {} to flow", new Object[]{flowFile}); + + if (!isScheduled()) { // if processor stopped, put the rest of the files back on the queue. + queueLock.lock(); + try { + while (itr.hasNext()) { + final File nextFile = itr.next(); + fileQueue.add(nextFile); + inProcess.remove(nextFile); + } + } finally { + queueLock.unlock(); + } + } + } + session.commit(); + } catch (final Exception e) { + logger.error("Failed to retrieve files due to {}", e); + + // anything that we've not already processed needs to be put back on the queue + if (flowFile != null) { + session.remove(flowFile); + } + } finally { + queueLock.lock(); + try { + inProcess.removeAll(files); + recentlyProcessed.addAll(files); + } finally { + queueLock.unlock(); + } + } + } + + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java index 0000000,3edebe8..1b2be26 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFileTransfer.java @@@ -1,0 -1,301 +1,301 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.File; + import java.io.IOException; + import java.io.InputStream; + import java.nio.file.Path; + import java.text.DateFormat; + import java.text.SimpleDateFormat; + import java.util.Collections; + import java.util.Date; + import java.util.HashMap; + import java.util.HashSet; + import java.util.List; + import java.util.Locale; + import java.util.Map; + import java.util.Set; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.LinkedBlockingQueue; + import java.util.concurrent.PriorityBlockingQueue; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + import java.util.concurrent.atomic.AtomicReference; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReadWriteLock; + import java.util.concurrent.locks.ReentrantLock; + import java.util.concurrent.locks.ReentrantReadWriteLock; + + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.OnScheduled; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; + import org.apache.nifi.processor.exception.FlowFileAccessException; + import org.apache.nifi.processors.standard.util.FileInfo; + import org.apache.nifi.processors.standard.util.FileTransfer; + import org.apache.nifi.util.StopWatch; + + /** + * Base class for GetSFTP and GetFTP + */ + public abstract class GetFileTransfer extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFiles that are received are routed to success").build(); + private final Set<Relationship> relationships; + + public static final String FILE_LAST_MODIFY_TIME_ATTRIBUTE = "file.lastModifiedTime"; + public static final String FILE_OWNER_ATTRIBUTE = "file.owner"; + public static final String FILE_GROUP_ATTRIBUTE = "file.group"; + public static final String FILE_PERMISSIONS_ATTRIBUTE = "file.permissions"; + public static final String FILE_MODIFY_DATE_ATTR_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ"; + + private final AtomicLong lastPollTime = new AtomicLong(-1L); + private final Lock listingLock = new ReentrantLock(); + private final AtomicReference<BlockingQueue<FileInfo>> fileQueueRef = new AtomicReference<>(); + private final Set<FileInfo> processing = Collections.synchronizedSet(new HashSet<FileInfo>()); + + // Used when transferring filenames from the File Queue to the processing queue; multiple threads can do this + // simultaneously using the sharableTransferLock; however, in order to check if either has a given file, the + // mutually exclusive lock is required. + private final ReadWriteLock transferLock = new ReentrantReadWriteLock(); + private final Lock sharableTransferLock = transferLock.readLock(); + private final Lock mutuallyExclusiveTransferLock = transferLock.writeLock(); + + public GetFileTransfer() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + protected abstract FileTransfer getFileTransfer(final ProcessContext context); + + @OnScheduled + public void onScheduled(final ProcessContext context) { + listingLock.lock(); + try { + final BlockingQueue<FileInfo> fileQueue = fileQueueRef.get(); + if (fileQueue != null) { + fileQueue.clear(); + } + fileQueueRef.set(null); // create new queue on next listing, in case queue type needs to change + } finally { + listingLock.unlock(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final long pollingIntervalMillis = context.getProperty(FileTransfer.POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS); + final long nextPollTime = lastPollTime.get() + pollingIntervalMillis; + final BlockingQueue<FileInfo> fileQueue = fileQueueRef.get(); + final ProcessorLog logger = getLogger(); + + // dont do the listing if there are already 100 or more items in our queue + // 100 is really just a magic number that seems to work out well in practice + FileTransfer transfer = null; + if (System.currentTimeMillis() >= nextPollTime && (fileQueue == null || fileQueue.size() < 100) && listingLock.tryLock()) { + try { + transfer = getFileTransfer(context); + try { + fetchListing(context, session, transfer); + lastPollTime.set(System.currentTimeMillis()); + } catch (final IOException e) { + context.yield(); + + try { + transfer.close(); + } catch (final IOException e1) { + logger.warn("Unable to close connection due to {}", new Object[]{e1}); + } + + logger.error("Unable to fetch listing from remote server due to {}", new Object[]{e}); + return; + } + } finally { + listingLock.unlock(); + } + } + + if (fileQueue == null || fileQueue.isEmpty()) { + // nothing to do! + context.yield(); + if (transfer != null) { + try { + transfer.close(); + } catch (final IOException e1) { + logger.warn("Unable to close connection due to {}", new Object[]{e1}); + } + } + return; + } + + final String hostname = context.getProperty(FileTransfer.HOSTNAME).evaluateAttributeExpressions().getValue(); + final boolean deleteOriginal = context.getProperty(FileTransfer.DELETE_ORIGINAL).asBoolean(); + final int maxSelects = context.getProperty(FileTransfer.MAX_SELECTS).asInteger(); + + if (transfer == null) { + transfer = getFileTransfer(context); + } + + try { + for (int i = 0; i < maxSelects && isScheduled(); i++) { + final FileInfo file; + sharableTransferLock.lock(); + try { + file = fileQueue.poll(); + if (file == null) { + return; + } + processing.add(file); + } finally { + sharableTransferLock.unlock(); + } + + File relativeFile = new File(file.getFullPathFileName()); + final String parentRelativePath = (null == relativeFile.getParent()) ? "" : relativeFile.getParent(); + final String parentRelativePathString = parentRelativePath + "/"; + + final Path absPath = relativeFile.toPath().toAbsolutePath(); + final String absPathString = absPath.getParent().toString() + "/"; + + try { + FlowFile flowFile = session.create(); + final StopWatch stopWatch = new StopWatch(false); + try (final InputStream in = transfer.getInputStream(file.getFullPathFileName())) { + stopWatch.start(); + flowFile = session.importFrom(in, flowFile); + stopWatch.stop(); + } + transfer.flush(); + final long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataRate = stopWatch.calculateDataRate(flowFile.getSize()); + flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", hostname); + flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), parentRelativePathString); + flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), relativeFile.getName()); + flowFile = session.putAttribute(flowFile, CoreAttributes.ABSOLUTE_PATH.key(), absPathString); + Map<String, String> attributes = getAttributesFromFile(file); + if (attributes.size() > 0) { + flowFile = session.putAllAttributes(flowFile, attributes); + } + + if (deleteOriginal) { + try { + transfer.deleteFile(null, file.getFullPathFileName()); + } catch (final IOException e) { + logger.error("Failed to remove remote file {} due to {}; deleting local copy", new Object[]{file.getFullPathFileName(), e}); + session.remove(flowFile); + return; + } + } + + session.getProvenanceReporter().receive(flowFile, transfer.getProtocolName() + "://" + hostname + "/" + file.getFullPathFileName(), millis); + session.transfer(flowFile, REL_SUCCESS); + logger.info("Successfully retrieved {} from {} in {} milliseconds at a rate of {} and transferred to success", + new Object[]{flowFile, hostname, millis, dataRate}); + + session.commit(); + } catch (final IOException e) { + context.yield(); + logger.error("Unable to retrieve file {} due to {}", new Object[]{file.getFullPathFileName(), e}); + try { + transfer.close(); + } catch (IOException e1) { + logger.warn("Unable to close connection to remote host due to {}", new Object[]{e1}); + } + + session.rollback(); + return; + } catch (final FlowFileAccessException e) { + context.yield(); + logger.error("Unable to retrieve file {} due to {}", new Object[]{file.getFullPathFileName(), e.getCause()}, e); + + try { + transfer.close(); + } catch (IOException e1) { + logger.warn("Unable to close connection to remote host due to {}", e1); + } + + session.rollback(); + return; + } finally { + processing.remove(file); + } + } + } finally { + try { + transfer.close(); + } catch (final IOException e) { + logger.warn("Failed to close connection to {} due to {}", new Object[]{hostname, e}); + } + } + } + + protected Map<String, String> getAttributesFromFile(FileInfo info) { + Map<String, String> attributes = new HashMap<>(); + if (info != null) { + final DateFormat formatter = new SimpleDateFormat(FILE_MODIFY_DATE_ATTR_FORMAT, Locale.US); + attributes.put(FILE_LAST_MODIFY_TIME_ATTRIBUTE, formatter.format(new Date(info.getLastModifiedTime()))); + attributes.put(FILE_PERMISSIONS_ATTRIBUTE, info.getPermissions()); + attributes.put(FILE_OWNER_ATTRIBUTE, info.getOwner()); + attributes.put(FILE_GROUP_ATTRIBUTE, info.getGroup()); + } + return attributes; + } + + // must be called while holding the listingLock + private void fetchListing(final ProcessContext context, final ProcessSession session, final FileTransfer transfer) throws IOException { + BlockingQueue<FileInfo> queue = fileQueueRef.get(); + if (queue == null) { + final boolean useNaturalOrdering = context.getProperty(FileTransfer.USE_NATURAL_ORDERING).asBoolean(); + queue = useNaturalOrdering ? new PriorityBlockingQueue<FileInfo>(25000) : new LinkedBlockingQueue<FileInfo>(25000); + fileQueueRef.set(queue); + } + + final StopWatch stopWatch = new StopWatch(true); + final List<FileInfo> listing = transfer.getListing(); + final long millis = stopWatch.getElapsed(TimeUnit.MILLISECONDS); + + int newItems = 0; + mutuallyExclusiveTransferLock.lock(); + try { + for (final FileInfo file : listing) { + if (!queue.contains(file) && !processing.contains(file)) { + if (!queue.offer(file)) { + break; + } + newItems++; + } + } + } finally { + mutuallyExclusiveTransferLock.unlock(); + } + + getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new", + new Object[]{millis, listing.size(), newItems}); + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java index 0000000,35873b1..fd70024 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetHTTP.java @@@ -1,0 -1,480 +1,480 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.io.File; + import java.io.FileInputStream; + import java.io.FileOutputStream; + import java.io.IOException; + import java.io.InputStream; + import java.net.URI; + import java.net.URISyntaxException; + import java.security.KeyManagementException; + import java.security.KeyStore; + import java.security.KeyStoreException; + import java.security.NoSuchAlgorithmException; + import java.security.UnrecoverableKeyException; + import java.security.cert.CertificateException; + import java.text.SimpleDateFormat; + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.Date; + import java.util.HashSet; + import java.util.List; + import java.util.Locale; + import java.util.Properties; + import java.util.Set; + import java.util.TimeZone; + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicReference; + import java.util.concurrent.locks.ReentrantReadWriteLock; + import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; + import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + import java.util.regex.Pattern; + + import javax.net.ssl.SSLContext; + + import org.apache.http.Header; + import org.apache.http.HttpResponse; + import org.apache.http.auth.AuthScope; + import org.apache.http.auth.UsernamePasswordCredentials; + import org.apache.http.client.CredentialsProvider; + import org.apache.http.client.HttpClient; + import org.apache.http.client.config.RequestConfig; + import org.apache.http.client.methods.HttpGet; + import org.apache.http.config.Registry; + import org.apache.http.config.RegistryBuilder; + import org.apache.http.conn.HttpClientConnectionManager; + import org.apache.http.conn.socket.ConnectionSocketFactory; + import org.apache.http.conn.ssl.SSLConnectionSocketFactory; + import org.apache.http.conn.ssl.SSLContexts; + import org.apache.http.conn.ssl.TrustSelfSignedStrategy; + import org.apache.http.impl.client.BasicCredentialsProvider; + import org.apache.http.impl.client.HttpClientBuilder; + import org.apache.http.impl.conn.BasicHttpClientConnectionManager; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.flowfile.FlowFile; + import org.apache.nifi.flowfile.attributes.CoreAttributes; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.AbstractSessionFactoryProcessor; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; + import org.apache.nifi.processor.ProcessSessionFactory; + import org.apache.nifi.processor.ProcessorInitializationContext; + import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnShutdown; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.lifecycle.OnShutdown; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processor.util.StandardValidators; + import org.apache.nifi.ssl.SSLContextService; + import org.apache.nifi.ssl.SSLContextService.ClientAuth; + import org.apache.nifi.util.StopWatch; + + @Tags({"get", "fetch", "poll", "http", "https", "ingest", "source", "input"}) + @CapabilityDescription("Fetches a file via HTTP") + public class GetHTTP extends AbstractSessionFactoryProcessor { + + static final int PERSISTENCE_INTERVAL_MSEC = 10000; + + public static final String HEADER_IF_NONE_MATCH = "If-None-Match"; + public static final String HEADER_IF_MODIFIED_SINCE = "If-Modified-Since"; + public static final String HEADER_ACCEPT = "Accept"; + public static final String HEADER_LAST_MODIFIED = "Last-Modified"; + public static final String HEADER_ETAG = "ETag"; + public static final int NOT_MODIFIED = 304; + + public static final PropertyDescriptor URL = new PropertyDescriptor.Builder() + .name("URL") + .description("The URL to pull from") + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("https?\\://.*"))) + .build(); + public static final PropertyDescriptor FOLLOW_REDIRECTS = new PropertyDescriptor.Builder() + .name("Follow Redirects") + .description( + "If we receive a 3xx HTTP Status Code from the server, indicates whether or not we should follow the redirect that the server specifies") + .defaultValue("false") + .allowableValues("true", "false") + .build(); + public static final PropertyDescriptor CONNECTION_TIMEOUT = new PropertyDescriptor.Builder() + .name("Connection Timeout") + .description("How long to wait when attempting to connect to the remote server before giving up") + .required(true) + .defaultValue("30 sec") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + public static final PropertyDescriptor ACCEPT_CONTENT_TYPE = new PropertyDescriptor.Builder() + .name("Accept Content-Type") + .description("If specified, requests will only accept the provided Content-Type") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor DATA_TIMEOUT = new PropertyDescriptor.Builder() + .name("Data Timeout") + .description( + "How long to wait between receiving segments of data from the remote server before giving up and discarding the partial file") + .required(true) + .defaultValue("30 sec") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + public static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder() + .name("Filename") + .description("The filename to assign to the file when pulled") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(true) + .build(); + public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() + .name("Username") + .description("Username required to access the URL") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() + .name("Password") + .description("Password required to access the URL") + .required(false) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USER_AGENT = new PropertyDescriptor.Builder() + .name("User Agent") + .description("What to report as the User Agent when we connect to the remote server") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All files are transferred to the success relationship").build(); + + public static final String LAST_MODIFIED_DATE_PATTERN_RFC1123 = "EEE, dd MMM yyyy HH:mm:ss zzz"; + + // package access to enable unit testing + static final String UNINITIALIZED_LAST_MODIFIED_VALUE; + + private static final String HTTP_CACHE_FILE_PREFIX = "conf/.httpCache-"; + + static final String ETAG = "ETag"; + + static final String LAST_MODIFIED = "LastModified"; + + static { + SimpleDateFormat sdf = new SimpleDateFormat(LAST_MODIFIED_DATE_PATTERN_RFC1123, Locale.US); + sdf.setTimeZone(TimeZone.getTimeZone("GMT")); + UNINITIALIZED_LAST_MODIFIED_VALUE = sdf.format(new Date(1L)); + } + final AtomicReference<String> lastModifiedRef = new AtomicReference<>(UNINITIALIZED_LAST_MODIFIED_VALUE); + final AtomicReference<String> entityTagRef = new AtomicReference<>(""); + // end + + private Set<Relationship> relationships; + private List<PropertyDescriptor> properties; + + private volatile long timeToPersist = 0; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReadLock readLock = lock.readLock(); + private final WriteLock writeLock = lock.writeLock(); + + @Override + protected void init(final ProcessorInitializationContext context) { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(URL); + properties.add(FILENAME); + properties.add(SSL_CONTEXT_SERVICE); + properties.add(USERNAME); + properties.add(PASSWORD); + properties.add(CONNECTION_TIMEOUT); + properties.add(DATA_TIMEOUT); + properties.add(USER_AGENT); + properties.add(ACCEPT_CONTENT_TYPE); + properties.add(FOLLOW_REDIRECTS); + this.properties = Collections.unmodifiableList(properties); + + // load etag and lastModified from file + File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); + try (FileInputStream fis = new FileInputStream(httpCache)) { + Properties props = new Properties(); + props.load(fis); + entityTagRef.set(props.getProperty(ETAG)); + lastModifiedRef.set(props.getProperty(LAST_MODIFIED)); + } catch (IOException swallow) { + } + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + entityTagRef.set(""); + lastModifiedRef.set(UNINITIALIZED_LAST_MODIFIED_VALUE); + } + + @OnShutdown + public void onShutdown() { + File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); + try (FileOutputStream fos = new FileOutputStream(httpCache)) { + Properties props = new Properties(); + props.setProperty(ETAG, entityTagRef.get()); + props.setProperty(LAST_MODIFIED, lastModifiedRef.get()); + props.store(fos, "GetHTTP file modification values"); + } catch (IOException swallow) { + } + + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext context) { + final Collection<ValidationResult> results = new ArrayList<>(); + + if (context.getProperty(URL).getValue().startsWith("https") && context.getProperty(SSL_CONTEXT_SERVICE).getValue() == null) { + results.add(new ValidationResult.Builder() + .explanation("URL is set to HTTPS protocol but no SSLContext has been specified") + .valid(false) + .subject("SSL Context") + .build()); + } + + return results; + } + + + private SSLContext createSSLContext(final SSLContextService service) throws KeyStoreException, IOException, NoSuchAlgorithmException, + CertificateException, KeyManagementException, UnrecoverableKeyException + { + final KeyStore truststore = KeyStore.getInstance(service.getTrustStoreType()); + try (final InputStream in = new FileInputStream(new File(service.getTrustStoreFile()))) { + truststore.load(in, service.getTrustStorePassword().toCharArray()); + } + + final KeyStore keystore = KeyStore.getInstance(service.getKeyStoreType()); + try (final InputStream in = new FileInputStream(new File(service.getKeyStoreFile()))) { + keystore.load(in, service.getKeyStorePassword().toCharArray()); + } + + SSLContext sslContext = SSLContexts.custom() + .loadTrustMaterial(truststore, new TrustSelfSignedStrategy()) + .loadKeyMaterial(keystore, service.getKeyStorePassword().toCharArray()) + .build(); + + return sslContext; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + final ProcessorLog logger = getLogger(); + + final ProcessSession session = sessionFactory.createSession(); + final FlowFile incomingFlowFile = session.get(); + if (incomingFlowFile != null) { + session.transfer(incomingFlowFile, REL_SUCCESS); + logger.warn("found FlowFile {} in input queue; transferring to success", new Object[]{incomingFlowFile}); + } + + // get the URL + final String url = context.getProperty(URL).getValue(); + final URI uri; + String source = url; + try { + uri = new URI(url); + source = uri.getHost(); + } catch (URISyntaxException swallow) { + // this won't happen as the url has already been validated + } + + // get the ssl context service + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + // create the connection manager + final HttpClientConnectionManager conMan; + if ( sslContextService == null ) { + conMan = new BasicHttpClientConnectionManager(); + } else { + final SSLContext sslContext; + try { + sslContext = createSSLContext(sslContextService); + } catch (final Exception e) { + throw new ProcessException(e); + } + + final SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new String[] { "TLSv1" }, null, + SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER); + + final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create() + .register("https", sslsf).build(); + + conMan = new BasicHttpClientConnectionManager(socketFactoryRegistry); + } + + try { + // build the request configuration + final RequestConfig.Builder requestConfigBuilder = RequestConfig.custom(); + requestConfigBuilder.setConnectionRequestTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + requestConfigBuilder.setConnectTimeout(context.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + requestConfigBuilder.setRedirectsEnabled(false); + requestConfigBuilder.setSocketTimeout(context.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + requestConfigBuilder.setRedirectsEnabled(context.getProperty(FOLLOW_REDIRECTS).asBoolean()); + + // build the http client + final HttpClientBuilder clientBuilder = HttpClientBuilder.create(); + clientBuilder.setConnectionManager(conMan); + + // include the user agent + final String userAgent = context.getProperty(USER_AGENT).getValue(); + if (userAgent != null) { + clientBuilder.setUserAgent(userAgent); + } + + // set the ssl context if necessary + if (sslContextService != null) { + clientBuilder.setSslcontext(sslContextService.createSSLContext(ClientAuth.REQUIRED)); + } + + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + + // set the credentials if appropriate + if (username != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + if (password == null) { + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username)); + } else { + credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); + } + clientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + + // create the http client + final HttpClient client = clientBuilder.build(); + + // create request + final HttpGet get = new HttpGet(url); + get.setConfig(requestConfigBuilder.build()); + + get.addHeader(HEADER_IF_MODIFIED_SINCE, lastModifiedRef.get()); + get.addHeader(HEADER_IF_NONE_MATCH, entityTagRef.get()); + + final String accept = context.getProperty(ACCEPT_CONTENT_TYPE).getValue(); + if (accept != null) { + get.addHeader(HEADER_ACCEPT, accept); + } + + try { + final StopWatch stopWatch = new StopWatch(true); + final HttpResponse response = client.execute(get); + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == NOT_MODIFIED) { + logger.info("content not retrieved because server returned HTTP Status Code {}: Not Modified", new Object[]{NOT_MODIFIED}); + context.yield(); + // doing a commit in case there were flow files in the input queue + session.commit(); + return; + } + final String statusExplanation = response.getStatusLine().getReasonPhrase(); + + if (statusCode >= 300) { + logger.error("received status code {}:{} from {}", new Object[]{statusCode, statusExplanation, url}); + // doing a commit in case there were flow files in the input queue + session.commit(); + return; + } + + FlowFile flowFile = session.create(); + flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), context.getProperty(FILENAME).getValue()); + flowFile = session.putAttribute(flowFile, this.getClass().getSimpleName().toLowerCase() + ".remote.source", source); + flowFile = session.importFrom(response.getEntity().getContent(), flowFile); + final long flowFileSize = flowFile.getSize(); + stopWatch.stop(); + final String dataRate = stopWatch.calculateDataRate(flowFileSize); + session.getProvenanceReporter().receive(flowFile, url, stopWatch.getDuration(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); + logger.info("Successfully received {} from {} at a rate of {}; transferred to success", new Object[]{flowFile, url, dataRate}); + session.commit(); + final Header lastModified = response.getFirstHeader(HEADER_LAST_MODIFIED); + if (lastModified != null) { + lastModifiedRef.set(lastModified.getValue()); + } + + final Header etag = response.getFirstHeader(HEADER_ETAG); + if (etag != null) { + entityTagRef.set(etag.getValue()); + } + if ((etag != null || lastModified != null) && readLock.tryLock()) { + try { + if (timeToPersist < System.currentTimeMillis()) { + readLock.unlock(); + writeLock.lock(); + if (timeToPersist < System.currentTimeMillis()) { + try { + timeToPersist = System.currentTimeMillis() + PERSISTENCE_INTERVAL_MSEC; + File httpCache = new File(HTTP_CACHE_FILE_PREFIX + getIdentifier()); + try (FileOutputStream fos = new FileOutputStream(httpCache)) { + Properties props = new Properties(); + props.setProperty(ETAG, entityTagRef.get()); + props.setProperty(LAST_MODIFIED, lastModifiedRef.get()); + props.store(fos, "GetHTTP file modification values"); + } catch (IOException e) { + getLogger().error("Failed to persist ETag and LastMod due to " + e, e); + } + } finally { + readLock.lock(); + writeLock.unlock(); + } + } + } + } finally { + readLock.unlock(); + } + } + } catch (final IOException e) { + context.yield(); + session.rollback(); + logger.error("Failed to retrieve file from {} due to {}; rolling back session", new Object[]{url, e.getMessage()}, e); + throw new ProcessException(e); + } catch (final Throwable t) { + context.yield(); + session.rollback(); + logger.error("Failed to process due to {}; rolling back session", new Object[]{t.getMessage()}, t); + throw t; + } + + } finally { + conMan.shutdown(); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSQueue.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSQueue.java index 0000000,35e2292..9676d93 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSQueue.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSQueue.java @@@ -1,0 -1,75 +1,75 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.util.Queue; + import java.util.concurrent.LinkedBlockingQueue; + + import javax.jms.JMSException; + ++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.documentation.Tags; ++import org.apache.nifi.annotation.lifecycle.OnStopped; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnStopped; -import org.apache.nifi.processor.annotation.Tags; -import org.apache.nifi.processor.annotation.TriggerWhenEmpty; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processors.standard.util.JmsFactory; + import org.apache.nifi.processors.standard.util.WrappedMessageConsumer; + + @TriggerWhenEmpty + @Tags({"jms", "queue", "listen", "get", "pull", "source", "consume", "consumer"}) + @CapabilityDescription("Pulls messages from a JMS Queue, creating a FlowFile for each JMS Message or bundle of messages, as configured") + public class GetJMSQueue extends JmsConsumer { + + private final Queue<WrappedMessageConsumer> consumerQueue = new LinkedBlockingQueue<>(); + + @OnStopped + public void cleanupResources() { + WrappedMessageConsumer wrappedConsumer = consumerQueue.poll(); + while (wrappedConsumer != null) { + wrappedConsumer.close(getLogger()); + wrappedConsumer = consumerQueue.poll(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ProcessorLog logger = getLogger(); + + WrappedMessageConsumer wrappedConsumer = consumerQueue.poll(); + if (wrappedConsumer == null) { + try { + wrappedConsumer = JmsFactory.createQueueMessageConsumer(context); + } catch (JMSException e) { + logger.error("Failed to connect to JMS Server due to {}", e); + context.yield(); + return; + } + } + + try { + super.consume(context, session, wrappedConsumer); + } finally { + if (!wrappedConsumer.isClosed()) { + consumerQueue.offer(wrappedConsumer); + } + } + } + + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java index 0000000,185ed61..8e22376 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetJMSTopic.java @@@ -1,0 -1,359 +1,359 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import static org.apache.nifi.processors.standard.util.JmsProperties.CLIENT_ID_PREFIX; + import static org.apache.nifi.processors.standard.util.JmsProperties.DURABLE_SUBSCRIPTION; + import static org.apache.nifi.processors.standard.util.JmsProperties.JMS_PROVIDER; + import static org.apache.nifi.processors.standard.util.JmsProperties.PASSWORD; + import static org.apache.nifi.processors.standard.util.JmsProperties.URL; + import static org.apache.nifi.processors.standard.util.JmsProperties.USERNAME; + + import java.io.IOException; + import java.io.InputStream; + import java.io.OutputStream; + import java.nio.file.Files; + import java.nio.file.Path; + import java.nio.file.Paths; + import java.util.ArrayList; + import java.util.Collections; + import java.util.List; + import java.util.Map; + import java.util.Properties; + import java.util.concurrent.TimeUnit; + + import javax.jms.Connection; + import javax.jms.InvalidDestinationException; + import javax.jms.JMSException; + import javax.jms.Session; + ++import org.apache.nifi.annotation.behavior.TriggerSerially; ++import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.documentation.Tags; ++import org.apache.nifi.annotation.lifecycle.OnRemoved; ++import org.apache.nifi.annotation.lifecycle.OnScheduled; ++import org.apache.nifi.annotation.lifecycle.OnStopped; + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.logging.ProcessorLog; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.OnRemoved; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.OnStopped; -import org.apache.nifi.processor.annotation.Tags; -import org.apache.nifi.processor.annotation.TriggerSerially; -import org.apache.nifi.processor.annotation.TriggerWhenEmpty; + import org.apache.nifi.processor.exception.ProcessException; + import org.apache.nifi.processors.standard.util.JmsFactory; + import org.apache.nifi.processors.standard.util.JmsProperties; + import org.apache.nifi.processors.standard.util.WrappedMessageConsumer; + + @TriggerSerially + @TriggerWhenEmpty + @Tags({"jms", "topic", "subscription", "durable", "non-durable", "listen", "get", "pull", "source", "consume", "consumer"}) + @CapabilityDescription("Pulls messages from a JMS Topic, creating a FlowFile for each JMS Message or bundle of messages, as configured") + public class GetJMSTopic extends JmsConsumer { + + public static final String SUBSCRIPTION_NAME_PROPERTY = "subscription.name"; + private volatile WrappedMessageConsumer wrappedConsumer = null; + + private final List<PropertyDescriptor> properties; + + public GetJMSTopic() { + super(); + + final List<PropertyDescriptor> props = new ArrayList<>(super.getSupportedPropertyDescriptors()); + props.add(JmsProperties.DURABLE_SUBSCRIPTION); + properties = Collections.unmodifiableList(props); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @OnStopped + public void cleanupResources() { + final WrappedMessageConsumer consumer = this.wrappedConsumer; + if (consumer != null) { + try { + consumer.close(getLogger()); + } finally { + this.wrappedConsumer = null; + } + } + } + + private Path getSubscriptionPath() { + return Paths.get("conf").resolve("jms-subscription-" + getIdentifier()); + } + + @OnScheduled + public void handleSubscriptions(final ProcessContext context) throws IOException, JMSException { + boolean usingDurableSubscription = context.getProperty(DURABLE_SUBSCRIPTION).asBoolean(); + final Properties persistedProps = getSubscriptionPropertiesFromFile(); + final Properties currentProps = getSubscriptionPropertiesFromContext(context); + if (persistedProps == null) { + if (usingDurableSubscription) { + persistSubscriptionInfo(context); // properties have not yet been persisted. + } + + return; + } + + // decrypt the passwords so the persisted and current properties can be compared... + // we can modify this properties instance since the unsubscribe method will reload + // the properties from disk + decryptPassword(persistedProps, context); + decryptPassword(currentProps, context); + + // check if current values are the same as the persisted values. + boolean same = true; + for (final Map.Entry<Object, Object> entry : persistedProps.entrySet()) { + final Object key = entry.getKey(); + + final Object value = entry.getValue(); + final Object curVal = currentProps.get(key); + if (value == null && curVal == null) { + continue; + } + if (value == null || curVal == null) { + same = false; + break; + } + if (SUBSCRIPTION_NAME_PROPERTY.equals(key)) { + // ignore the random UUID part of the subscription name + if (!JmsFactory.clientIdPrefixEquals(value.toString(), curVal.toString())) { + same = false; + break; + } + } else if (!value.equals(curVal)) { + same = false; + break; + } + } + + if (same && usingDurableSubscription) { + return; // properties are the same. + } + + // unsubscribe from the old subscription. + try { + unsubscribe(context); + } catch (final InvalidDestinationException e) { + getLogger().warn("Failed to unsubscribe from subscription due to {}; subscription does not appear to be active, so ignoring it", new Object[]{e}); + } + + // we've now got a new subscription, so we must persist that new info before we create the subscription. + if (usingDurableSubscription) { + persistSubscriptionInfo(context); + } else { + // remove old subscription info if it was persisted + try { + Files.delete(getSubscriptionPath()); + } catch (Exception ignore) { + } + } + } + + /** + * Attempts to locate the password in the specified properties. If found, + * decrypts it using the specified context. + * + * @param properties + * @param context + */ + public void decryptPassword(final Properties properties, final ProcessContext context) { + final String encryptedPassword = properties.getProperty(PASSWORD.getName()); + + // if the is in the properties, decrypt it + if (encryptedPassword != null) { + properties.put(PASSWORD.getName(), context.decrypt(encryptedPassword)); + } + } + + @OnRemoved + public void onRemoved(final ProcessContext context) throws IOException, JMSException { + // unsubscribe from the old subscription. + unsubscribe(context); + } + + /** + * Persists the subscription details for future use. + * + * @param context + * @throws IOException + */ + private void persistSubscriptionInfo(final ProcessContext context) throws IOException { + final Properties props = getSubscriptionPropertiesFromContext(context); + try (final OutputStream out = Files.newOutputStream(getSubscriptionPath())) { + props.store(out, null); + } + } + + /** + * Returns the subscription details from the specified context. Note: if a + * password is set, the resulting entry will be encrypted. + * + * @param context + * @return + */ + private Properties getSubscriptionPropertiesFromContext(final ProcessContext context) { + final String unencryptedPassword = context.getProperty(PASSWORD).getValue(); + final String encryptedPassword = (unencryptedPassword == null) ? null : context.encrypt(unencryptedPassword); + + final Properties props = new Properties(); + props.setProperty(URL.getName(), context.getProperty(URL).getValue()); + + if (context.getProperty(USERNAME).isSet()) { + props.setProperty(USERNAME.getName(), context.getProperty(USERNAME).getValue()); + } + + if (encryptedPassword != null) { + props.setProperty(PASSWORD.getName(), encryptedPassword); + } + + props.setProperty(SUBSCRIPTION_NAME_PROPERTY, JmsFactory.createClientId(context)); + props.setProperty(JMS_PROVIDER.getName(), context.getProperty(JMS_PROVIDER).getValue()); + + if (context.getProperty(CLIENT_ID_PREFIX).isSet()) { + props.setProperty(CLIENT_ID_PREFIX.getName(), context.getProperty(CLIENT_ID_PREFIX).getValue()); + } + + return props; + } + + /** + * Loads the subscription details from disk. Since the details are coming + * from disk, if a password is set, the resulting entry will be encrypted. + * + * @return + * @throws IOException + */ + private Properties getSubscriptionPropertiesFromFile() throws IOException { + final Path subscriptionPath = getSubscriptionPath(); + final boolean exists = Files.exists(subscriptionPath); + if (!exists) { + return null; + } + + final Properties props = new Properties(); + try (final InputStream in = Files.newInputStream(subscriptionPath)) { + props.load(in); + } + + return props; + } + + /** + * Loads subscription info from the Subscription File and unsubscribes from + * the subscription, if the file exists; otherwise, does nothing + * + * @throws IOException + * @throws JMSException + */ + private void unsubscribe(final ProcessContext context) throws IOException, JMSException { + final Properties props = getSubscriptionPropertiesFromFile(); + if (props == null) { + return; + } + + final String serverUrl = props.getProperty(URL.getName()); + final String username = props.getProperty(USERNAME.getName()); + final String encryptedPassword = props.getProperty(PASSWORD.getName()); + final String subscriptionName = props.getProperty(SUBSCRIPTION_NAME_PROPERTY); + final String jmsProvider = props.getProperty(JMS_PROVIDER.getName()); + + final String password = encryptedPassword == null ? null : context.decrypt(encryptedPassword); + + final int timeoutMillis = context.getProperty(JmsProperties.TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + unsubscribe(serverUrl, username, password, subscriptionName, jmsProvider, timeoutMillis); + } + + /** + * Unsubscribes from a subscription using the supplied parameters + * + * @param url + * @param username + * @param password + * @param subscriptionId + * @throws JMSException + */ + private void unsubscribe(final String url, final String username, final String password, final String subscriptionId, final String jmsProvider, final int timeoutMillis) throws JMSException { + final Connection connection; + if (username == null && password == null) { + connection = JmsFactory.createConnectionFactory(url, timeoutMillis, jmsProvider).createConnection(); + } else { + connection = JmsFactory.createConnectionFactory(url, timeoutMillis, jmsProvider).createConnection(username, password); + } + + Session session = null; + try { + connection.setClientID(subscriptionId); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.unsubscribe(subscriptionId); + + getLogger().info("Successfully unsubscribed from {}, Subscription Identifier {}", new Object[]{url, subscriptionId}); + } finally { + if (session != null) { + try { + session.close(); + } catch (final Exception e1) { + getLogger().warn("Unable to close session with JMS Server due to {}; resources may not be cleaned up appropriately", new Object[]{e1}); + } + } + + try { + connection.close(); + } catch (final Exception e1) { + getLogger().warn("Unable to close connection to JMS Server due to {}; resources may not be cleaned up appropriately", new Object[]{e1}); + } + } + } + + @OnStopped + public void onStopped() { + final WrappedMessageConsumer consumer = this.wrappedConsumer; + if (consumer != null) { + consumer.close(getLogger()); + this.wrappedConsumer = null; + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ProcessorLog logger = getLogger(); + + WrappedMessageConsumer consumer = this.wrappedConsumer; + if (consumer == null || consumer.isClosed()) { + try { + Properties props = null; + try { + props = getSubscriptionPropertiesFromFile(); + } catch (IOException ignore) { + } + if (props == null) { + props = getSubscriptionPropertiesFromContext(context); + } + String subscriptionName = props.getProperty(SUBSCRIPTION_NAME_PROPERTY); + consumer = JmsFactory.createTopicMessageConsumer(context, subscriptionName); + this.wrappedConsumer = consumer; + } catch (final JMSException e) { + logger.error("Failed to connect to JMS Server due to {}", new Object[]{e}); + context.yield(); + return; + } + } + + super.consume(context, session, consumer); + } + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java index 0000000,077b32f..dd9f519 mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetSFTP.java @@@ -1,0 -1,92 +1,92 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.processors.standard; + + import java.util.ArrayList; + import java.util.Collection; + import java.util.Collections; + import java.util.List; + + import org.apache.nifi.components.PropertyDescriptor; + import org.apache.nifi.components.ValidationContext; + import org.apache.nifi.components.ValidationResult; + import org.apache.nifi.processor.ProcessContext; + import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.Tags; ++import org.apache.nifi.annotation.documentation.CapabilityDescription; ++import org.apache.nifi.annotation.behavior.SideEffectFree; ++import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.processors.standard.util.FileTransfer; + import org.apache.nifi.processors.standard.util.SFTPTransfer; + + @SideEffectFree + @Tags({"sftp", "get", "retrieve", "files", "fetch", "remote", "ingest", "source", "input"}) + @CapabilityDescription("Fetches files from an SFTP Server and creates FlowFiles from them") + public class GetSFTP extends GetFileTransfer { + + private List<PropertyDescriptor> properties; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(SFTPTransfer.HOSTNAME); + properties.add(SFTPTransfer.PORT); + properties.add(SFTPTransfer.USERNAME); + properties.add(SFTPTransfer.PASSWORD); + properties.add(SFTPTransfer.PRIVATE_KEY_PATH); + properties.add(SFTPTransfer.PRIVATE_KEY_PASSPHRASE); + properties.add(SFTPTransfer.REMOTE_PATH); + properties.add(SFTPTransfer.FILE_FILTER_REGEX); + properties.add(SFTPTransfer.PATH_FILTER_REGEX); + properties.add(SFTPTransfer.POLLING_INTERVAL); + properties.add(SFTPTransfer.RECURSIVE_SEARCH); + properties.add(SFTPTransfer.IGNORE_DOTTED_FILES); + properties.add(SFTPTransfer.DELETE_ORIGINAL); + properties.add(SFTPTransfer.CONNECTION_TIMEOUT); + properties.add(SFTPTransfer.DATA_TIMEOUT); + properties.add(SFTPTransfer.HOST_KEY_FILE); + properties.add(SFTPTransfer.MAX_SELECTS); + properties.add(SFTPTransfer.REMOTE_POLL_BATCH_SIZE); + properties.add(SFTPTransfer.STRICT_HOST_KEY_CHECKING); + properties.add(SFTPTransfer.USE_KEEPALIVE_ON_TIMEOUT); + properties.add(SFTPTransfer.USE_COMPRESSION); + properties.add(SFTPTransfer.USE_NATURAL_ORDERING); + this.properties = Collections.unmodifiableList(properties); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext context) { + final List<ValidationResult> results = new ArrayList<>(super.customValidate(context)); + final boolean passwordSpecified = context.getProperty(SFTPTransfer.PASSWORD).getValue() != null; + final boolean privateKeySpecified = context.getProperty(SFTPTransfer.PRIVATE_KEY_PATH).getValue() != null; + + if (!passwordSpecified && !privateKeySpecified) { + results.add(new ValidationResult.Builder().subject("Password").explanation("Either the Private Key Passphrase or the Password must be supplied").valid(false).build()); + } + + return results; + } + + @Override + protected FileTransfer getFileTransfer(final ProcessContext context) { + return new SFTPTransfer(context, getLogger()); + } + }
