arpadboda commented on a change in pull request #558: MINIFICPP-542 - Add 
PutSFTP processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/558#discussion_r285053455
 
 

 ##########
 File path: extensions/sftp/processors/PutSFTP.cpp
 ##########
 @@ -0,0 +1,846 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutSFTP.h"
+
+#include <memory>
+#include <algorithm>
+#include <cctype>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <iterator>
+#include <limits>
+#include <map>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "utils/ByteArrayCallback.h"
+#include "core/FlowFile.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessContext.h"
+#include "core/Relationship.h"
+#include "io/DataStream.h"
+#include "io/StreamFactory.h"
+#include "ResourceClaim.h"
+#include "utils/StringUtils.h"
+#include "utils/ScopeGuard.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Property PutSFTP::Hostname(
+    core::PropertyBuilder::createProperty("Hostname")->withDescription("The 
fully qualified hostname or IP address of the remote system")
+        ->supportsExpressionLanguage(true)->build());
+core::Property PutSFTP::Port(
+    core::PropertyBuilder::createProperty("Port")->withDescription("The port 
that the remote system is listening on for file transfers")
+        ->supportsExpressionLanguage(true)->build());
+core::Property PutSFTP::Username(
+    
core::PropertyBuilder::createProperty("Username")->withDescription("Username")
+        ->supportsExpressionLanguage(true)->build());
+core::Property PutSFTP::Password(
+    
core::PropertyBuilder::createProperty("Password")->withDescription("Password 
for the user account")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+core::Property PutSFTP::PrivateKeyPath(
+    core::PropertyBuilder::createProperty("Private Key 
Path")->withDescription("The fully qualified path to the Private Key file")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+core::Property PutSFTP::PrivateKeyPassphrase(
+    core::PropertyBuilder::createProperty("Private Key 
Passphrase")->withDescription("Password for the private key")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+core::Property PutSFTP::RemotePath(
+    core::PropertyBuilder::createProperty("Remote Path")->withDescription("The 
path on the remote system from which to pull or push files")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+core::Property PutSFTP::CreateDirectory(
+    core::PropertyBuilder::createProperty("Create 
Directory")->withDescription("Specifies whether or not the remote directory 
should be created if it does not exist.")
+        ->withDefaultValue<bool>(false)->build());
+core::Property PutSFTP::DisableDirectoryListing(
+    core::PropertyBuilder::createProperty("Disable Directory 
Listing")->withDescription("If set to 'true', directory listing is not 
performed prior to create missing directories. "
+                                                                               
         "By default, this processor executes a directory listing command to 
see target directory existence before creating missing directories. "
+                                                                               
         "However, there are situations that you might need to disable the 
directory listing such as the following. "
+                                                                               
         "Directory listing might fail with some permission setups (e.g. chmod 
100) on a directory. "
+                                                                               
         "Also, if any other SFTP client created the directory after this 
processor performed a listing and before a directory creation request by this 
processor is finished, "
+                                                                               
         "then an error is returned because the directory already exists.")
+                                                                               
         ->isRequired(false)->withDefaultValue<bool>(false)->build());
+core::Property PutSFTP::BatchSize(
+    core::PropertyBuilder::createProperty("Batch Size")->withDescription("The 
maximum number of FlowFiles to send in a single connection")
+        ->withDefaultValue<int>(500)->build());
+core::Property PutSFTP::ConnectionTimeout(
+    core::PropertyBuilder::createProperty("Connection 
Timeout")->withDescription("Amount of time to wait before timing out while 
creating a connection")
+        ->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+core::Property PutSFTP::DataTimeout(
+    core::PropertyBuilder::createProperty("Data 
Timeout")->withDescription("When transferring a file between the local and 
remote system, this value specifies how long is allowed to elapse without any 
data being transferred between systems")
+        ->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
+core::Property PutSFTP::ConflictResolution(
+    core::PropertyBuilder::createProperty("Conflict 
Resolution")->withDescription("Determines how to handle the problem of filename 
collisions")
+        ->withAllowableValues<std::string>({CONFLICT_RESOLUTION_REPLACE,
+                                            CONFLICT_RESOLUTION_IGNORE,
+                                            CONFLICT_RESOLUTION_RENAME,
+                                            CONFLICT_RESOLUTION_REJECT,
+                                            CONFLICT_RESOLUTION_FAIL,
+                                            CONFLICT_RESOLUTION_NONE})
+        ->withDefaultValue(CONFLICT_RESOLUTION_NONE)->build());
+core::Property PutSFTP::RejectZeroByte(
+    core::PropertyBuilder::createProperty("Reject Zero-Byte 
Files")->withDescription("Determines whether or not Zero-byte files should be 
rejected without attempting to transfer")
+        ->isRequired(false)->withDefaultValue<bool>(true)->build());
+core::Property PutSFTP::DotRename(
+    core::PropertyBuilder::createProperty("Dot Rename")->withDescription("If 
true, then the filename of the sent file is prepended with a \".\" and then 
renamed back to the original once the file is completely sent. "
+                                                                         
"Otherwise, there is no rename. This property is ignored if the Temporary 
Filename property is set.")
+        ->isRequired(false)->withDefaultValue<bool>(true)->build());
+core::Property PutSFTP::TempFilename(
+    core::PropertyBuilder::createProperty("Temporary 
Filename")->withDescription("If set, the filename of the sent file will be 
equal to the value specified during the transfer and after successful 
completion will be renamed to the original filename. "
+                                                                               
  "If this value is set, the Dot Rename property is ignored.")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+core::Property PutSFTP::HostKeyFile(
+    core::PropertyBuilder::createProperty("Host Key 
File")->withDescription("If supplied, the given file will be used as the Host 
Key; otherwise, no use host key file will be used")
+        ->isRequired(false)->build());
+core::Property PutSFTP::LastModifiedTime(
+    core::PropertyBuilder::createProperty("Last Modified 
Time")->withDescription("The lastModifiedTime to assign to the file after 
transferring it. "
+                                                                               
   "If not set, the lastModifiedTime will not be changed. "
+                                                                               
   "Format must be yyyy-MM-dd'T'HH:mm:ssZ. "
+                                                                               
   "You may also use expression language such as ${file.lastModifiedTime}. "
+                                                                               
   "If the value is invalid, the processor will not be invalid but will fail to 
change lastModifiedTime of the file.")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+core::Property PutSFTP::Permissions(
+    core::PropertyBuilder::createProperty("Permissions")->withDescription("The 
permissions to assign to the file after transferring it. "
+                                                                          
"Format must be either UNIX rwxrwxrwx with a - in place of denied permissions 
(e.g. rw-r--r--) or an octal number (e.g. 644). "
+                                                                          "If 
not set, the permissions will not be changed. "
+                                                                          "You 
may also use expression language such as ${file.permissions}. "
+                                                                          "If 
the value is invalid, the processor will not be invalid but will fail to change 
permissions of the file.")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+core::Property PutSFTP::RemoteOwner(
+    core::PropertyBuilder::createProperty("Remote 
Owner")->withDescription("Integer value representing the User ID to set on the 
file after transferring it. "
+                                                                           "If 
not set, the owner will not be set. You may also use expression language such 
as ${file.owner}. "
+                                                                           "If 
the value is invalid, the processor will not be invalid but will fail to change 
the owner of the file.")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+core::Property PutSFTP::RemoteGroup(
+    core::PropertyBuilder::createProperty("Remote 
Group")->withDescription("Integer value representing the Group ID to set on the 
file after transferring it. "
+                                                                           "If 
not set, the group will not be set. You may also use expression language such 
as ${file.group}. "
+                                                                           "If 
the value is invalid, the processor will not be invalid but will fail to change 
the group of the file.")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+core::Property PutSFTP::StrictHostKeyChecking(
+    core::PropertyBuilder::createProperty("Strict Host Key 
Checking")->withDescription("Indicates whether or not strict enforcement of 
hosts keys should be applied")
+        ->withDefaultValue<bool>(false)->build());
+core::Property PutSFTP::UseKeepaliveOnTimeout(
+    core::PropertyBuilder::createProperty("Send Keep Alive On 
Timeout")->withDescription("Indicates whether or not to send a single Keep 
Alive message when SSH socket times out")
+        ->withDefaultValue<bool>(true)->build());
+core::Property PutSFTP::UseCompression(
+    core::PropertyBuilder::createProperty("Use 
Compression")->withDescription("Indicates whether or not ZLIB compression 
should be used when transferring files")
+        ->withDefaultValue<bool>(false)->build());
+core::Property PutSFTP::ProxyType(
+    core::PropertyBuilder::createProperty("Proxy 
Type")->withDescription("Specifies the Proxy Configuration Controller Service 
to proxy network requests. If set, it supersedes proxy settings configured per 
component. "
+                                                                         
"Supported proxies: HTTP + AuthN, SOCKS + AuthN")
+        ->isRequired(false)
+        ->withAllowableValues<std::string>({PROXY_TYPE_DIRECT,
+                                            PROXY_TYPE_HTTP,
+                                            PROXY_TYPE_SOCKS})
+        ->withDefaultValue(PROXY_TYPE_DIRECT)->build());
+core::Property PutSFTP::ProxyHost(
+    core::PropertyBuilder::createProperty("Proxy Host")->withDescription("The 
fully qualified hostname or IP address of the proxy server")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+core::Property PutSFTP::ProxyPort(
+    core::PropertyBuilder::createProperty("Proxy Port")->withDescription("The 
port of the proxy server")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+core::Property PutSFTP::HttpProxyUsername(
+    core::PropertyBuilder::createProperty("Http Proxy 
Username")->withDescription("Http Proxy Username")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+core::Property PutSFTP::HttpProxyPassword(
+    core::PropertyBuilder::createProperty("Http Proxy 
Password")->withDescription("Http Proxy Password")
+        ->supportsExpressionLanguage(true)->isRequired(false)->build());
+
+core::Relationship PutSFTP::Success("success", "FlowFiles that are 
successfully sent will be routed to success");
+core::Relationship PutSFTP::Reject("reject", "FlowFiles that were rejected by 
the destination system");
+core::Relationship PutSFTP::Failure("failure", "FlowFiles that failed to send 
to the remote system; failure is usually looped back to this processor");
+
+void PutSFTP::initialize() {
+  logger_->log_trace("Initializing PutSFTP");
+
+  // Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(Hostname);
+  properties.insert(Port);
+  properties.insert(Username);
+  properties.insert(Password);
+  properties.insert(PrivateKeyPath);
+  properties.insert(PrivateKeyPassphrase);
+  properties.insert(RemotePath);
+  properties.insert(CreateDirectory);
+  properties.insert(DisableDirectoryListing);
+  properties.insert(BatchSize);
+  properties.insert(ConnectionTimeout);
+  properties.insert(DataTimeout);
+  properties.insert(ConflictResolution);
+  properties.insert(RejectZeroByte);
+  properties.insert(DotRename);
+  properties.insert(TempFilename);
+  properties.insert(HostKeyFile);
+  properties.insert(LastModifiedTime);
+  properties.insert(Permissions);
+  properties.insert(RemoteOwner);
+  properties.insert(RemoteGroup);
+  properties.insert(StrictHostKeyChecking);
+  properties.insert(UseKeepaliveOnTimeout);
+  properties.insert(UseCompression);
+  properties.insert(ProxyType);
+  properties.insert(ProxyHost);
+  properties.insert(ProxyPort);
+  properties.insert(HttpProxyUsername);
+  properties.insert(HttpProxyPassword);
+  setSupportedProperties(properties);
+  
+  // Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  relationships.insert(Reject);
+  relationships.insert(Failure);
+  setSupportedRelationships(relationships);
+}
+
+constexpr size_t PutSFTP::CONNECTION_CACHE_MAX_SIZE;
+
+PutSFTP::PutSFTP(std::string name, utils::Identifier uuid /*= 
utils::Identifier()*/)
+  : Processor(name, uuid),
+    logger_(logging::LoggerFactory<PutSFTP>::getLogger()),
+    create_directory_(false),
+    batch_size_(0),
+    connection_timeout_(0),
+    data_timeout_(0),
+    reject_zero_byte_(false),
+    dot_rename_(false),
+    strict_host_checking_(false),
+    use_keepalive_on_timeout_(false),
+    use_compression_(false),
+    running_(true) {
+  utils::SFTPClientInitializer::getInstance()->initialize();
+}
+
+PutSFTP::~PutSFTP() {
+  if (keepalive_thread_.joinable()) {
+    {
+      std::lock_guard<std::mutex> lock(connections_mutex_);
+      running_ = false;
+      keepalive_cv_.notify_one();
+    }
+    keepalive_thread_.join();
+  }
+}
+
+bool PutSFTP::ConnectionCacheKey::operator<(const PutSFTP::ConnectionCacheKey& 
other) const {
+  return std::tie(hostname, port, username, proxy_type, proxy_host, 
proxy_port) <
+          std::tie(other.hostname, other.port, other.username, 
other.proxy_type, other.proxy_host, other.proxy_port);
+}
+
+bool PutSFTP::ConnectionCacheKey::operator==(const 
PutSFTP::ConnectionCacheKey& other) const {
+  return std::tie(hostname, port, username, proxy_type, proxy_host, 
proxy_port) ==
+         std::tie(other.hostname, other.port, other.username, 
other.proxy_type, other.proxy_host, other.proxy_port);
+}
+
+std::shared_ptr<utils::SFTPClient> PutSFTP::getConnectionFromCache(const 
PutSFTP::ConnectionCacheKey& key) {
 
 Review comment:
   This should work with uniques, too. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to