This is an automated email from the ASF dual-hosted git repository.
phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new b821df8 MINIFICPP-786 - GetFile processor doesn't seem to work with
default configuration on Win, logged details are insufficient, too
b821df8 is described below
commit b821df856e334f488dfb6f0c8ca4c267262f8dcb
Author: Arpad Boda <[email protected]>
AuthorDate: Sat Mar 23 19:49:17 2019 +0100
MINIFICPP-786 - GetFile processor doesn't seem to work with default
configuration on Win, logged details are insufficient, too
This closes #522.
Signed-off-by: Marc Parisi <[email protected]>
---
libminifi/include/utils/file/FileUtils.h | 85 ++++++++++++++++++++++++++++++++
libminifi/src/processors/GetFile.cpp | 77 +++++------------------------
2 files changed, 97 insertions(+), 65 deletions(-)
diff --git a/libminifi/include/utils/file/FileUtils.h
b/libminifi/include/utils/file/FileUtils.h
index 89aa803..53bb054 100644
--- a/libminifi/include/utils/file/FileUtils.h
+++ b/libminifi/include/utils/file/FileUtils.h
@@ -80,6 +80,16 @@ class FileUtils {
* @param path current path to delete
* @param delete_files_recursively deletes recursively
*/
+
+ static char get_separator()
+ {
+#ifdef WIN32
+ return '\\';
+#else
+ return '/';
+#endif
+ }
+
static int64_t delete_dir(const std::string &path, bool
delete_files_recursively = true) {
#ifdef BOOST_VERSION
try {
@@ -301,6 +311,81 @@ class FileUtils {
#endif
}
+ /*
+ * Provides a platform-independent function to list a directory
+ * Callback is called for every file found: first argument is the path of
the directory, second is the filename
+ * Return value of the callback is used to continue (true) or stop (false)
listing
+ */
+ static void list_dir(const std::string& dir, std::function<bool (const
std::string&, const std::string&)> callback,
+ const std::shared_ptr<logging::Logger> &logger, bool
recursive = true) {
+
+ logger->log_debug("Performing file listing against %s", dir);
+#ifndef WIN32
+ DIR *d = opendir(dir.c_str());
+ if (!d) {
+ logger->log_warn("Failed to open directory: %s", dir.c_str());
+ return;
+ }
+
+ struct dirent *entry;
+ while ((entry = readdir(d)) != NULL) {
+
+ std::string d_name = entry->d_name;
+ std::string path = dir + get_separator() + d_name;
+
+ struct stat statbuf;
+ if (stat(path.c_str(), &statbuf) != 0) {
+ logger->log_warn("Failed to stat %s", path);
+ continue;
+ }
+
+ if (S_ISDIR(statbuf.st_mode)) {
+ // if this is a directory
+ if (recursive && strcmp(d_name.c_str(), "..") != 0 &&
strcmp(d_name.c_str(), ".") != 0) {
+ list_dir(path, callback, logger, recursive);
+ }
+ } else {
+ if (!callback(dir, d_name)) {
+ break;
+ }
+ }
+ }
+ closedir(d);
+#else
+ HANDLE hFind;
+ WIN32_FIND_DATA FindFileData;
+
+ std::string pathToSearch = dir + "\\*.*";
+ hFind = FindFirstFileA(pathToSearch.c_str(), &FindFileData);
+
+ if(hFind == INVALID_HANDLE_VALUE) {
+ logger->log_warn("Failed to open directory: %s", dir.c_str());
+ return;
+ }
+
+ do {
+ struct stat statbuf {};
+ if (strcmp(FindFileData.cFileName, ".") != 0 &&
strcmp(FindFileData.cFileName, "..") != 0) {
+ std::string path = dir + get_separator() + FindFileData.cFileName;
+ if (stat(path.c_str(), &statbuf) != 0) {
+ logger->log_warn("Failed to stat %s", path);
+ continue;
+ }
+ if (S_ISDIR(statbuf.st_mode)) {
+ if (recursive) {
+ list_dir(path, callback, logger, recursive);
+ }
+ }
+ else {
+ if (!callback(dir, FindFileData.cFileName)) {
+ break;
+ }
+ }
+ }
+ } while (FindNextFileA(hFind, &FindFileData));
+ FindClose(hFind);
+#endif
+ }
};
} /* namespace file */
diff --git a/libminifi/src/processors/GetFile.cpp
b/libminifi/src/processors/GetFile.cpp
index 3fbd41f..8d8feeb 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -41,9 +41,6 @@
#include "core/ProcessSession.h"
#include "core/TypedValues.h"
-#ifndef S_ISDIR
-#define S_ISDIR(mode) (((mode) & S_IFMT) == S_IFDIR)
-#endif
#define R_OK 4 /* Test for read permission. */
#define W_OK 2 /* Test for write permission. */
#define F_OK 0 /* Test for existence. */
@@ -211,6 +208,8 @@ bool GetFile::isListingEmpty() {
}
void GetFile::putListing(std::string fileName) {
+ logger_->log_trace("Adding file to queue: %s", fileName);
+
std::lock_guard<std::mutex> lock(mutex_);
_dirList.push(fileName);
@@ -220,15 +219,14 @@ void GetFile::pollListing(std::queue<std::string> &list,
const GetFileRequest &r
std::lock_guard<std::mutex> lock(mutex_);
while (!_dirList.empty() && (request.batchSize == 0 || list.size() <
request.batchSize)) {
- std::string fileName = _dirList.front();
+ list.push(_dirList.front());
_dirList.pop();
- list.push(fileName);
}
-
- return;
}
bool GetFile::acceptFile(std::string fullName, std::string name, const
GetFileRequest &request) {
+ logger_->log_trace("Checking file: %s", fullName);
+
struct stat statbuf;
if (stat(fullName.c_str(), &statbuf) == 0) {
@@ -277,65 +275,14 @@ bool GetFile::acceptFile(std::string fullName,
std::string name, const GetFileRe
}
void GetFile::performListing(std::string dir, const GetFileRequest &request) {
-#ifndef WIN32
- DIR *d;
- d = opendir(dir.c_str());
- if (!d)
- return;
- // only perform a listing while we are not empty
- logger_->log_debug("Performing file listing against %s", dir);
- while (isRunning()) {
- struct dirent *entry;
- entry = readdir(d);
- if (!entry)
- break;
- std::string d_name = entry->d_name;
- std::string path = dir + "/" + d_name;
- struct stat statbuf { };
- if (stat(path.c_str(), &statbuf) != 0) {
- logger_->log_warn("Failed to stat %s", path);
- break;
- }
- if (S_ISDIR(statbuf.st_mode)) {
- // if this is a directory
- if (request.recursive && strcmp(d_name.c_str(), "..") != 0 &&
strcmp(d_name.c_str(), ".") != 0) {
- performListing(path, request);
- }
- } else {
- if (acceptFile(path, d_name, request)) {
- // check whether we can take this file
- putListing(path);
- }
+ auto callback = [this, request](const std::string& dir, const std::string&
filename) -> bool{
+ std::string fullpath = dir + utils::file::FileUtils::get_separator() +
filename;
+ if (acceptFile(fullpath, filename, request)) {
+ putListing(fullpath);
}
- }
- closedir(d);
-#else
- HANDLE hFind;
- WIN32_FIND_DATA FindFileData;
-
- if ((hFind = FindFirstFile(dir.c_str(), &FindFileData)) !=
INVALID_HANDLE_VALUE) {
- do {
- struct stat statbuf {};
- if (stat(FindFileData.cFileName, &statbuf) != 0) {
- logger_->log_warn("Failed to stat %s", FindFileData.cFileName);
- break;
- }
-
- std::string path = dir + "/" + FindFileData.cFileName;
- if (S_ISDIR(statbuf.st_mode)) {
- if (request.recursive && strcmp(FindFileData.cFileName, "..") != 0 &&
strcmp(FindFileData.cFileName, ".") != 0) {
- performListing(path, request);
- }
- } else {
- if (acceptFile(path, FindFileData.cFileName, request)) {
- // check whether we can take this file
- putListing(path);
- }
- }
- }while (FindNextFile(hFind, &FindFileData));
- FindClose(hFind);
- }
-#endif
+ return isRunning();
+ };
+ utils::file::FileUtils::list_dir(dir, callback, logger_, request.recursive);
}
int16_t
GetFile::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>
&metric_vector) {