This is an automated email from the ASF dual-hosted git repository.
szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 3a3615d MINIFICPP-1631 Add ListAzureDataLakeStorage processor
3a3615d is described below
commit 3a3615debb9129e7b954827debccaecc68b66006
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Jan 26 11:50:58 2022 +0100
MINIFICPP-1631 Add ListAzureDataLakeStorage processor
and extract ListingStateManager
Closes #1242
Signed-off-by: Marton Szasz <[email protected]>
---
PROCESSORS.md | 96 ++++++----
README.md | 2 +-
extensions/aws/processors/ListS3.cpp | 78 +-------
extensions/aws/processors/ListS3.h | 18 +-
extensions/aws/s3/S3Wrapper.cpp | 4 +-
extensions/aws/s3/S3Wrapper.h | 14 +-
.../AzureDataLakeStorageFileProcessorBase.cpp | 49 +++++
.../AzureDataLakeStorageFileProcessorBase.h | 54 ++++++
.../AzureDataLakeStorageProcessorBase.cpp | 11 --
.../processors/AzureDataLakeStorageProcessorBase.h | 1 -
.../processors/DeleteAzureDataLakeStorage.cpp | 2 +-
.../azure/processors/DeleteAzureDataLakeStorage.h | 8 +-
.../azure/processors/FetchAzureDataLakeStorage.cpp | 4 +-
.../azure/processors/FetchAzureDataLakeStorage.h | 8 +-
.../azure/processors/ListAzureDataLakeStorage.cpp | 172 +++++++++++++++++
.../azure/processors/ListAzureDataLakeStorage.h | 70 +++++++
.../azure/processors/PutAzureDataLakeStorage.cpp | 4 +-
.../azure/processors/PutAzureDataLakeStorage.h | 6 +-
extensions/azure/storage/AzureDataLakeStorage.cpp | 65 +++++++
extensions/azure/storage/AzureDataLakeStorage.h | 23 ++-
.../azure/storage/AzureDataLakeStorageClient.cpp | 23 ++-
.../azure/storage/AzureDataLakeStorageClient.h | 11 +-
extensions/azure/storage/DataLakeStorageClient.h | 27 ++-
libminifi/include/utils/GeneralUtils.h | 17 +-
libminifi/include/utils/ListingStateManager.h | 72 +++++++
libminifi/src/utils/ListingStateManager.cpp | 100 ++++++++++
libminifi/test/aws-tests/ListS3Tests.cpp | 8 +-
.../azure-tests/ListAzureDataLakeStorageTests.cpp | 208 +++++++++++++++++++++
.../test/azure-tests/MockDataLakeStorageClient.h | 34 ++++
29 files changed, 1008 insertions(+), 181 deletions(-)
diff --git a/PROCESSORS.md b/PROCESSORS.md
index d941df6..5b7882f 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -34,10 +34,11 @@
- [GetUSBCamera](#getusbcamera)
- [HashContent](#hashcontent)
- [InvokeHTTP](#invokehttp)
-- [ListSFTP](#listsftp)
+- [ListAzureDataLakeStorage](#listazuredatalakestorage)
- [ListenHTTP](#listenhttp)
- [ListenSyslog](#listensyslog)
- [ListS3](#lists3)
+- [ListSFTP](#listsftp)
- [LogAttribute](#logattribute)
- [ManipulateArchive](#manipulatearchive)
- [MergeContent](#mergecontent)
@@ -881,48 +882,25 @@ In the list below, the names of required properties
appear in bold. Any other pr
|failure|The original FlowFile will be routed on any type of connection
failure, timeout or general exception. It will have new attributes detailing
the request.|
-## ListSFTP
+## ListAzureDataLakeStorage
### Description
-Performs a listing of the files residing on an SFTP server. For each file that
is found on the remote server, a new FlowFile will be created with the filename
attribute set to the name of the file on the remote server. This can then be
used in conjunction with FetchSFTP in order to fetch those files.
+Lists directory in an Azure Data Lake Storage Gen 2 filesystem
### Properties
In the list below, the names of required properties appear in bold. Any other
properties (not in bold) are considered optional. The table also indicates any
default values, and whether a property supports the NiFi Expression Language.
| Name | Default Value | Allowable Values | Description |
| - | - | - | - |
-|**Connection Timeout**|30 sec||Amount of time to wait before timing out while
creating a connection|
-|**Data Timeout**|30 sec||When transferring a file between the local and
remote system, this value specifies how long is allowed to elapse without any
data being transferred between systems|
-|Entity Tracking Initial Listing Target|All Available|All
Available<br>Tracking Time Window<br>|Specify how initial listing should be
handled. Used by 'Tracking Entities' strategy.|
-|Entity Tracking Time Window|||Specify how long this processor should track
already-listed entities. 'Tracking Entities' strategy can pick any entity whose
timestamp is inside the specified time window. For example, if set to '30
minutes', any entity having timestamp in recent 30 minutes will be the listing
target when this processor runs. A listed entity is considered 'new/updated'
and a FlowFile is emitted if one of following condition meets: 1. does not
exist in the already-listed ent [...]
-|File Filter Regex|||Provides a Java Regular Expression for filtering
Filenames; if a filter is supplied, only files whose names match that Regular
Expression will be fetched|
-|**Follow symlink**|false||If true, will pull even symbolic files and also
nested symbolic subdirectories; otherwise, will not read symbolic files and
will not traverse symbolic link subdirectories|
-|Host Key File|||If supplied, the given file will be used as the Host Key;
otherwise, no use host key file will be used|
-|**Hostname**|||The fully qualified hostname or IP address of the remote
system<br/>**Supports Expression Language: true**|
-|Http Proxy Password|||Http Proxy Password<br/>**Supports Expression Language:
true**|
-|Http Proxy Username|||Http Proxy Username<br/>**Supports Expression Language:
true**|
-|**Ignore Dotted Files**|true||If true, files whose names begin with a dot
(".") will be ignored|
-|**Listing Strategy**|Tracking Timestamps|Tracking Entities<br>Tracking
Timestamps<br>|Specify how to determine new/updated entities. See each strategy
descriptions for detail.|
-|Maximum File Age|||The maximum age that a file must be in order to be pulled;
any file older than this amount of time (according to last modification date)
will be ignored|
-|Maximum File Size|||The maximum size that a file must be in order to be
pulled|
-|**Minimum File Age**|0 sec||The minimum age that a file must be in order to
be pulled; any file younger than this amount of time (according to last
modification date) will be ignored|
-|**Minimum File Size**|0 B||The minimum size that a file must be in order to
be pulled|
-|Password|||Password for the user account<br/>**Supports Expression Language:
true**|
-|Path Filter Regex|||When Search Recursively is true, then only subdirectories
whose path matches the given Regular Expression will be scanned|
-|**Port**|||The port that the remote system is listening on for file
transfers<br/>**Supports Expression Language: true**|
-|Private Key Passphrase|||Password for the private key<br/>**Supports
Expression Language: true**|
-|Private Key Path|||The fully qualified path to the Private Key
file<br/>**Supports Expression Language: true**|
-|Proxy Host|||The fully qualified hostname or IP address of the proxy
server<br/>**Supports Expression Language: true**|
-|Proxy Port|||The port of the proxy server<br/>**Supports Expression Language:
true**|
-|Proxy Type|DIRECT|DIRECT<br>HTTP<br>SOCKS<br>|Specifies the Proxy
Configuration Controller Service to proxy network requests. If set, it
supersedes proxy settings configured per component. Supported proxies: HTTP +
AuthN, SOCKS + AuthN|
-|Remote Path|||The fully qualified filename on the remote
system<br/>**Supports Expression Language: true**|
-|**Search Recursively**|false||If true, will pull files from arbitrarily
nested subdirectories; otherwise, will not traverse subdirectories|
-|**Send Keep Alive On Timeout**|true||Indicates whether or not to send a
single Keep Alive message when SSH socket times out|
-|**State File**|ListSFTP||Specifies the file that should be used for storing
state about what data has been ingested so that upon restart MiNiFi can resume
from where it left off|
-|**Strict Host Key Checking**|false||Indicates whether or not strict
enforcement of hosts keys should be applied|
-|**Target System Timestamp Precision**|Auto Detect|Auto
Detect<br>Milliseconds<br>Minutes<br>Seconds<br>|Specify timestamp precision at
the target system. Since this processor uses timestamp of entities to decide
which should be listed, it is crucial to use the right timestamp precision.|
-|**Username**|||Username<br/>**Supports Expression Language: true**|
+|**Azure Storage Credentials Service**|||Name of the Azure Storage Credentials
Service used to retrieve the connection string from.|
+|**Filesystem Name**|||Name of the Azure Storage File System. It is assumed to
be already existing.<br/>**Supports Expression Language: true**|
+|Directory Name|||Name of the Azure Storage Directory. The Directory Name
cannot contain a leading '/'. If left empty it designates the root directory.
The directory will be created if not already existing.<br/>**Supports
Expression Language: true**|
+|**Recurse Subdirectories**|true||Indicates whether to list files from
subdirectories of the directory|
+|File Filter|||Only files whose names match the given regular expression will
be listed|
+|Path Filter|||When 'Recurse Subdirectories' is true, then only subdirectories
whose paths match the given regular expression will be scanned|
+|Listing Strategy|timestamps|none<br/>timestamps|Specify how to determine
new/updated entities. If 'timestamps' is selected it tracks the latest
timestamp of listed entity to determine new/updated entities. If 'none' is
selected it lists an entity without any tracking, the same entity will be
listed each time on executing this processor.|
+
### Relationships
| Name | Description |
@@ -1016,6 +994,56 @@ In the list below, the names of required properties
appear in bold. Any other pr
|**Write User Metadata**|false||If set to 'true', the user defined metadata
associated with the S3 object will be added to FlowFile attributes/records.|
|**Requester Pays**|false||If true, indicates that the requester consents to
pay any charges associated with listing the S3 bucket. This sets the
'x-amz-request-payer' header to 'requester'. Note that this setting is only
used if Write User Metadata is true.|
+
+## ListSFTP
+
+### Description
+
+Performs a listing of the files residing on an SFTP server. For each file that
is found on the remote server, a new FlowFile will be created with the filename
attribute set to the name of the file on the remote server. This can then be
used in conjunction with FetchSFTP in order to fetch those files.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
properties (not in bold) are considered optional. The table also indicates any
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Connection Timeout**|30 sec||Amount of time to wait before timing out while
creating a connection|
+|**Data Timeout**|30 sec||When transferring a file between the local and
remote system, this value specifies how long is allowed to elapse without any
data being transferred between systems|
+|Entity Tracking Initial Listing Target|All Available|All
Available<br>Tracking Time Window<br>|Specify how initial listing should be
handled. Used by 'Tracking Entities' strategy.|
+|Entity Tracking Time Window|||Specify how long this processor should track
already-listed entities. 'Tracking Entities' strategy can pick any entity whose
timestamp is inside the specified time window. For example, if set to '30
minutes', any entity having timestamp in recent 30 minutes will be the listing
target when this processor runs. A listed entity is considered 'new/updated'
and a FlowFile is emitted if one of following condition meets: 1. does not
exist in the already-listed ent [...]
+|File Filter Regex|||Provides a Java Regular Expression for filtering
Filenames; if a filter is supplied, only files whose names match that Regular
Expression will be fetched|
+|**Follow symlink**|false||If true, will pull even symbolic files and also
nested symbolic subdirectories; otherwise, will not read symbolic files and
will not traverse symbolic link subdirectories|
+|Host Key File|||If supplied, the given file will be used as the Host Key;
otherwise, no use host key file will be used|
+|**Hostname**|||The fully qualified hostname or IP address of the remote
system<br/>**Supports Expression Language: true**|
+|Http Proxy Password|||Http Proxy Password<br/>**Supports Expression Language:
true**|
+|Http Proxy Username|||Http Proxy Username<br/>**Supports Expression Language:
true**|
+|**Ignore Dotted Files**|true||If true, files whose names begin with a dot
(".") will be ignored|
+|**Listing Strategy**|Tracking Timestamps|Tracking Entities<br>Tracking
Timestamps<br>|Specify how to determine new/updated entities. See each strategy
descriptions for detail.|
+|Maximum File Age|||The maximum age that a file must be in order to be pulled;
any file older than this amount of time (according to last modification date)
will be ignored|
+|Maximum File Size|||The maximum size that a file must be in order to be
pulled|
+|**Minimum File Age**|0 sec||The minimum age that a file must be in order to
be pulled; any file younger than this amount of time (according to last
modification date) will be ignored|
+|**Minimum File Size**|0 B||The minimum size that a file must be in order to
be pulled|
+|Password|||Password for the user account<br/>**Supports Expression Language:
true**|
+|Path Filter Regex|||When Search Recursively is true, then only subdirectories
whose path matches the given Regular Expression will be scanned|
+|**Port**|||The port that the remote system is listening on for file
transfers<br/>**Supports Expression Language: true**|
+|Private Key Passphrase|||Password for the private key<br/>**Supports
Expression Language: true**|
+|Private Key Path|||The fully qualified path to the Private Key
file<br/>**Supports Expression Language: true**|
+|Proxy Host|||The fully qualified hostname or IP address of the proxy
server<br/>**Supports Expression Language: true**|
+|Proxy Port|||The port of the proxy server<br/>**Supports Expression Language:
true**|
+|Proxy Type|DIRECT|DIRECT<br>HTTP<br>SOCKS<br>|Specifies the Proxy
Configuration Controller Service to proxy network requests. If set, it
supersedes proxy settings configured per component. Supported proxies: HTTP +
AuthN, SOCKS + AuthN|
+|Remote Path|||The fully qualified filename on the remote
system<br/>**Supports Expression Language: true**|
+|**Search Recursively**|false||If true, will pull files from arbitrarily
nested subdirectories; otherwise, will not traverse subdirectories|
+|**Send Keep Alive On Timeout**|true||Indicates whether or not to send a
single Keep Alive message when SSH socket times out|
+|**State File**|ListSFTP||Specifies the file that should be used for storing
state about what data has been ingested so that upon restart MiNiFi can resume
from where it left off|
+|**Strict Host Key Checking**|false||Indicates whether or not strict
enforcement of hosts keys should be applied|
+|**Target System Timestamp Precision**|Auto Detect|Auto
Detect<br>Milliseconds<br>Minutes<br>Seconds<br>|Specify timestamp precision at
the target system. Since this processor uses timestamp of entities to decide
which should be listed, it is crucial to use the right timestamp precision.|
+|**Username**|||Username<br/>**Supports Expression Language: true**|
+### Relationships
+
+| Name | Description |
+| - | - |
+|success|All FlowFiles that are received are routed to success|
+
+
### Relationships
| Name | Description |
diff --git a/README.md b/README.md
index c7d99f8..6011698 100644
--- a/README.md
+++ b/README.md
@@ -77,7 +77,7 @@ Through JNI extensions you can run NiFi processors using
NARs. The JNI extension
| ------------- |:-------------| :-----|
| Archive Extensions |
[ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry)
| -DBUILD_LIBARCHIVE=ON |
| AWS |
[AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3)
| -DENABLE_AWS=ON |
-| Azure |
[AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[DeleteAzureBlobStorage](#deleteazureblobstorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)<br/>[FetchAzureDataLakeStorage](#fetchazuredatalakestorage)
| -DENABLE_AZURE=ON |
+| Azure |
[AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[DeleteAzureBlobStorage](#deleteazureblobstorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)<br/>[FetchAzureDataLakeStorage](#fetchazuredatalakestorage)<br/>[ListAzureDataLakeStorage](#listazuredatalakestorage)
| -DENABLE_AZURE=ON |
| CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) | -DDISABLE_CIVET=ON |
| CURL | [InvokeHTTP](PROCESSORS.md#invokehttp) | -DDISABLE_CURL=ON |
| GPS | GetGPS | -DENABLE_GPS=ON |
diff --git a/extensions/aws/processors/ListS3.cpp
b/extensions/aws/processors/ListS3.cpp
index 0f958eb..9573aa4 100644
--- a/extensions/aws/processors/ListS3.cpp
+++ b/extensions/aws/processors/ListS3.cpp
@@ -36,9 +36,6 @@ namespace minifi {
namespace aws {
namespace processors {
-const std::string ListS3::LATEST_LISTED_KEY_PREFIX = "listed_key.";
-const std::string ListS3::LATEST_LISTED_KEY_TIMESTAMP = "listed_timestamp";
-
const core::Property ListS3::Delimiter(
core::PropertyBuilder::createProperty("Delimiter")
->withDescription("The string used to delimit directories within the
bucket. Please consult the AWS documentation for the correct use of this
field.")
@@ -93,10 +90,11 @@ void ListS3::initialize() {
void ListS3::onSchedule(const std::shared_ptr<core::ProcessContext> &context,
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
S3Processor::onSchedule(context, sessionFactory);
- state_manager_ = context->getStateManager();
- if (state_manager_ == nullptr) {
+ auto state_manager = context->getStateManager();
+ if (state_manager == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
+ state_manager_ =
std::make_unique<minifi::utils::ListingStateManager>(state_manager);
auto common_properties = getCommonELSupportedProperties(context, nullptr);
if (!common_properties) {
@@ -177,54 +175,6 @@ void ListS3::writeUserMetadata(
}
}
-std::vector<std::string> ListS3::getLatestListedKeys(const
std::unordered_map<std::string, std::string> &state) {
- std::vector<std::string> latest_listed_keys;
- for (const auto& kvp : state) {
- if (kvp.first.rfind(LATEST_LISTED_KEY_PREFIX, 0) == 0) {
- latest_listed_keys.push_back(kvp.second);
- }
- }
- return latest_listed_keys;
-}
-
-uint64_t ListS3::getLatestListedKeyTimestamp(const
std::unordered_map<std::string, std::string> &state) {
- std::string stored_listed_key_timestamp_str;
- auto it = state.find(LATEST_LISTED_KEY_TIMESTAMP);
- if (it != state.end()) {
- stored_listed_key_timestamp_str = it->second;
- }
-
- int64_t stored_listed_key_timestamp = 0;
- core::Property::StringToInt(stored_listed_key_timestamp_str,
stored_listed_key_timestamp);
-
- return stored_listed_key_timestamp;
-}
-
-ListS3::ListingState ListS3::getCurrentState(const
std::shared_ptr<core::ProcessContext>& /*context*/) {
- ListS3::ListingState current_listing_state;
- std::unordered_map<std::string, std::string> state;
- if (!state_manager_->get(state)) {
- logger_->log_info("No stored state for listed objects was found");
- return current_listing_state;
- }
-
- current_listing_state.listed_key_timestamp =
getLatestListedKeyTimestamp(state);
- logger_->log_debug("Restored previous listed timestamp %lld",
current_listing_state.listed_key_timestamp);
-
- current_listing_state.listed_keys = getLatestListedKeys(state);
- return current_listing_state;
-}
-
-void ListS3::storeState(const ListS3::ListingState &latest_listing_state) {
- std::unordered_map<std::string, std::string> state;
- state[LATEST_LISTED_KEY_TIMESTAMP] =
std::to_string(latest_listing_state.listed_key_timestamp);
- for (std::size_t i = 0; i < latest_listing_state.listed_keys.size(); ++i) {
- state[LATEST_LISTED_KEY_PREFIX + std::to_string(i)] =
latest_listing_state.listed_keys.at(i);
- }
- logger_->log_debug("Stored new listed timestamp %lld",
latest_listing_state.listed_key_timestamp);
- state_manager_->set(state);
-}
-
void ListS3::createNewFlowFile(
core::ProcessSession &session,
const aws::s3::ListedObjectAttributes &object_attributes) {
@@ -233,7 +183,7 @@ void ListS3::createNewFlowFile(
session.putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME,
object_attributes.filename);
session.putAttribute(flow_file, "s3.etag", object_attributes.etag);
session.putAttribute(flow_file, "s3.isLatest", object_attributes.is_latest ?
"true" : "false");
- session.putAttribute(flow_file, "s3.lastModified",
std::to_string(object_attributes.last_modified));
+ session.putAttribute(flow_file, "s3.lastModified",
std::to_string(object_attributes.last_modified.time_since_epoch() /
std::chrono::milliseconds(1)));
session.putAttribute(flow_file, "s3.length",
std::to_string(object_attributes.length));
session.putAttribute(flow_file, "s3.storeClass",
object_attributes.store_class);
if (!object_attributes.version.empty()) {
@@ -255,7 +205,7 @@ void ListS3::onTrigger(const
std::shared_ptr<core::ProcessContext> &context, con
return;
}
- auto stored_listing_state = getCurrentState(context);
+ auto stored_listing_state = state_manager_->getCurrentState();
auto latest_listing_state = stored_listing_state;
std::size_t files_transferred = 0;
@@ -270,7 +220,7 @@ void ListS3::onTrigger(const
std::shared_ptr<core::ProcessContext> &context, con
}
logger_->log_debug("ListS3 transferred %zu flow files", files_transferred);
- storeState(latest_listing_state);
+ state_manager_->storeState(latest_listing_state);
if (files_transferred == 0) {
logger_->log_debug("No new S3 objects were found in bucket %s to list",
list_request_params_->bucket);
@@ -279,22 +229,6 @@ void ListS3::onTrigger(const
std::shared_ptr<core::ProcessContext> &context, con
}
}
-bool ListS3::ListingState::wasObjectListedAlready(const
aws::s3::ListedObjectAttributes &object_attributes) const {
- return listed_key_timestamp > object_attributes.last_modified ||
- (listed_key_timestamp == object_attributes.last_modified &&
- std::find(listed_keys.begin(), listed_keys.end(),
object_attributes.filename) != listed_keys.end());
-}
-
-void ListS3::ListingState::updateState(const aws::s3::ListedObjectAttributes
&object_attributes) {
- if (listed_key_timestamp < object_attributes.last_modified) {
- listed_key_timestamp = object_attributes.last_modified;
- listed_keys.clear();
- listed_keys.push_back(object_attributes.filename);
- } else if (listed_key_timestamp == object_attributes.last_modified) {
- listed_keys.push_back(object_attributes.filename);
- }
-}
-
REGISTER_RESOURCE(ListS3, "This Processor retrieves a listing of objects from
an Amazon S3 bucket.");
} // namespace processors
diff --git a/extensions/aws/processors/ListS3.h
b/extensions/aws/processors/ListS3.h
index d8c2e3b..ff3a53c 100644
--- a/extensions/aws/processors/ListS3.h
+++ b/extensions/aws/processors/ListS3.h
@@ -39,8 +39,6 @@ namespace processors {
class ListS3 : public S3Processor {
public:
static constexpr char const* ProcessorName = "ListS3";
- static const std::string LATEST_LISTED_KEY_PREFIX;
- static const std::string LATEST_LISTED_KEY_TIMESTAMP;
// Supported Properties
static const core::Property Delimiter;
@@ -57,7 +55,6 @@ class ListS3 : public S3Processor {
explicit ListS3(const std::string& name, const minifi::utils::Identifier&
uuid = minifi::utils::Identifier())
: S3Processor(name, uuid,
core::logging::LoggerFactory<ListS3>::getLogger()) {
}
-
explicit ListS3(const std::string& name, minifi::utils::Identifier uuid,
std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
: S3Processor(name, uuid,
core::logging::LoggerFactory<ListS3>::getLogger(),
std::move(s3_request_sender)) {
}
@@ -73,17 +70,6 @@ class ListS3 : public S3Processor {
return core::annotation::Input::INPUT_FORBIDDEN;
}
- struct ListingState {
- int64_t listed_key_timestamp = 0;
- std::vector<std::string> listed_keys;
-
- bool wasObjectListedAlready(const aws::s3::ListedObjectAttributes
&object_attributes) const;
- void updateState(const aws::s3::ListedObjectAttributes &object_attributes);
- };
-
- static std::vector<std::string> getLatestListedKeys(const
std::unordered_map<std::string, std::string> &state);
- static uint64_t getLatestListedKeyTimestamp(const
std::unordered_map<std::string, std::string> &state);
-
void writeObjectTags(
const aws::s3::ListedObjectAttributes &object_attributes,
core::ProcessSession &session,
@@ -92,8 +78,6 @@ class ListS3 : public S3Processor {
const aws::s3::ListedObjectAttributes &object_attributes,
core::ProcessSession &session,
const std::shared_ptr<core::FlowFile> &flow_file);
- ListingState getCurrentState(const std::shared_ptr<core::ProcessContext>
&context);
- void storeState(const ListingState &latest_listing_state);
void createNewFlowFile(
core::ProcessSession &session,
const aws::s3::ListedObjectAttributes &object_attributes);
@@ -102,7 +86,7 @@ class ListS3 : public S3Processor {
bool write_object_tags_ = false;
bool write_user_metadata_ = false;
bool requester_pays_ = false;
- std::shared_ptr<core::CoreComponentStateManager> state_manager_ = nullptr;
+ std::unique_ptr<minifi::utils::ListingStateManager> state_manager_;
};
} // namespace processors
diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp
index c283791..d78a2e4 100644
--- a/extensions/aws/s3/S3Wrapper.cpp
+++ b/extensions/aws/s3/S3Wrapper.cpp
@@ -161,7 +161,7 @@ void S3Wrapper::addListResults(const
Aws::Vector<Aws::S3::Model::ObjectVersion>&
attributes.etag =
minifi::utils::StringUtils::removeFramingCharacters(version.GetETag(), '"');
attributes.filename = version.GetKey();
attributes.is_latest = version.GetIsLatest();
- attributes.last_modified = version.GetLastModified().Millis();
+ attributes.last_modified = version.GetLastModified().UnderlyingTimestamp();
attributes.length = version.GetSize();
attributes.store_class =
VERSION_STORAGE_CLASS_MAP.at(version.GetStorageClass());
attributes.version = version.GetVersionId();
@@ -180,7 +180,7 @@ void S3Wrapper::addListResults(const
Aws::Vector<Aws::S3::Model::Object>& conten
attributes.etag =
minifi::utils::StringUtils::removeFramingCharacters(object.GetETag(), '"');
attributes.filename = object.GetKey();
attributes.is_latest = true;
- attributes.last_modified = object.GetLastModified().Millis();
+ attributes.last_modified = object.GetLastModified().UnderlyingTimestamp();
attributes.length = object.GetSize();
attributes.store_class =
OBJECT_STORAGE_CLASS_MAP.at(object.GetStorageClass());
listed_objects.push_back(attributes);
diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3Wrapper.h
index 8dd9ce1..3c547b1 100644
--- a/extensions/aws/s3/S3Wrapper.h
+++ b/extensions/aws/s3/S3Wrapper.h
@@ -38,6 +38,8 @@
#include "utils/AWSInitializer.h"
#include "utils/OptionalUtils.h"
#include "utils/StringUtils.h"
+#include "utils/ListingStateManager.h"
+#include "utils/gsl.h"
#include "io/BaseStream.h"
#include "S3RequestSender.h"
@@ -177,11 +179,19 @@ struct ListRequestParameters : public RequestParameters {
uint64_t min_object_age = 0;
};
-struct ListedObjectAttributes {
+struct ListedObjectAttributes : public minifi::utils::ListedObject {
+ std::chrono::time_point<std::chrono::system_clock> getLastModified() const
override {
+ return last_modified;
+ }
+
+ std::string getKey() const override {
+ return filename;
+ }
+
std::string filename;
std::string etag;
bool is_latest = false;
- int64_t last_modified = 0;
+ std::chrono::time_point<std::chrono::system_clock> last_modified;
int64_t length = 0;
std::string store_class;
std::string version;
diff --git
a/extensions/azure/processors/AzureDataLakeStorageFileProcessorBase.cpp
b/extensions/azure/processors/AzureDataLakeStorageFileProcessorBase.cpp
new file mode 100644
index 0000000..1ddee04
--- /dev/null
+++ b/extensions/azure/processors/AzureDataLakeStorageFileProcessorBase.cpp
@@ -0,0 +1,49 @@
+/**
+ * @file AzureDataLakeStorageFileProcessorBase.cpp
+ * AzureDataLakeStorageFileProcessorBase class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AzureDataLakeStorageFileProcessorBase.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property AzureDataLakeStorageFileProcessorBase::FileName(
+ core::PropertyBuilder::createProperty("File Name")
+ ->withDescription("The filename in Azure Storage. If left empty the
filename attribute will be used by default.")
+ ->supportsExpressionLanguage(true)
+ ->build());
+
+bool AzureDataLakeStorageFileProcessorBase::setFileOperationCommonParameters(
+ storage::AzureDataLakeStorageFileOperationParameters& params,
core::ProcessContext& context, const std::shared_ptr<core::FlowFile>&
flow_file) {
+ if (!setCommonParameters(params, context, flow_file)) {
+ return false;
+ }
+
+ context.getProperty(FileName, params.filename, flow_file);
+ if (params.filename.empty() && (!flow_file->getAttribute("filename",
params.filename) || params.filename.empty())) {
+ logger_->log_error("No File Name is set and default object key 'filename'
attribute could not be found!");
+ return false;
+ }
+
+ return true;
+}
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git
a/extensions/azure/processors/AzureDataLakeStorageFileProcessorBase.h
b/extensions/azure/processors/AzureDataLakeStorageFileProcessorBase.h
new file mode 100644
index 0000000..0f9a003
--- /dev/null
+++ b/extensions/azure/processors/AzureDataLakeStorageFileProcessorBase.h
@@ -0,0 +1,54 @@
+/**
+ * @file AzureDataLakeStorageFileProcessorBase.h
+ * AzureDataLakeStorageFileProcessorBase class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class AzureDataLakeStorageFileProcessorBase : public
AzureDataLakeStorageProcessorBase {
+ public:
+ // Supported Properties
+ EXTENSIONAPI static const core::Property FileName;
+
+ explicit AzureDataLakeStorageFileProcessorBase(const std::string& name,
const minifi::utils::Identifier& uuid, const
std::shared_ptr<core::logging::Logger> &logger)
+ : AzureDataLakeStorageProcessorBase(name, uuid, logger) {
+ }
+
+ ~AzureDataLakeStorageFileProcessorBase() override = default;
+
+ protected:
+ explicit AzureDataLakeStorageFileProcessorBase(const std::string& name,
const minifi::utils::Identifier& uuid, const
std::shared_ptr<core::logging::Logger> &logger,
+ std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+ : AzureDataLakeStorageProcessorBase(name, uuid, logger,
std::move(data_lake_storage_client)) {
+ }
+
+ bool setFileOperationCommonParameters(
+ storage::AzureDataLakeStorageFileOperationParameters& params,
+ core::ProcessContext& context,
+ const std::shared_ptr<core::FlowFile>& flow_file);
+};
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp
b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp
index 04d7664..ed526f3 100644
--- a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp
+++ b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp
@@ -37,11 +37,6 @@ const core::Property
AzureDataLakeStorageProcessorBase::DirectoryName(
"If left empty it designates the root directory. The
directory will be created if not already existing.")
->supportsExpressionLanguage(true)
->build());
-const core::Property AzureDataLakeStorageProcessorBase::FileName(
- core::PropertyBuilder::createProperty("File Name")
- ->withDescription("The filename in Azure Storage. If left empty the
filename attribute will be used by default.")
- ->supportsExpressionLanguage(true)
- ->build());
void AzureDataLakeStorageProcessorBase::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
gsl_Expects(context);
@@ -69,12 +64,6 @@ bool AzureDataLakeStorageProcessorBase::setCommonParameters(
context.getProperty(DirectoryName, params.directory_name, flow_file);
- context.getProperty(FileName, params.filename, flow_file);
- if (params.filename.empty() && (!flow_file->getAttribute("filename",
params.filename) || params.filename.empty())) {
- logger_->log_error("No File Name is set and default object key 'filename'
attribute could not be found!");
- return false;
- }
-
return true;
}
diff --git a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h
b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h
index 14f2a92..65c3232 100644
--- a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h
+++ b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h
@@ -38,7 +38,6 @@ class AzureDataLakeStorageProcessorBase : public
AzureStorageProcessorBase {
// Supported Properties
EXTENSIONAPI static const core::Property FilesystemName;
EXTENSIONAPI static const core::Property DirectoryName;
- EXTENSIONAPI static const core::Property FileName;
explicit AzureDataLakeStorageProcessorBase(const std::string& name, const
minifi::utils::Identifier& uuid, const std::shared_ptr<core::logging::Logger>
&logger)
: AzureStorageProcessorBase(name, uuid, logger) {
diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp
b/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp
index 6146a31..c43e327 100644
--- a/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp
@@ -48,7 +48,7 @@ void DeleteAzureDataLakeStorage::initialize() {
std::optional<storage::DeleteAzureDataLakeStorageParameters>
DeleteAzureDataLakeStorage::buildDeleteParameters(
core::ProcessContext& context, const std::shared_ptr<core::FlowFile>&
flow_file) {
storage::DeleteAzureDataLakeStorageParameters params;
- if (!setCommonParameters(params, context, flow_file)) {
+ if (!setFileOperationCommonParameters(params, context, flow_file)) {
return std::nullopt;
}
diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.h
b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
index cb26a2f..3813387 100644
--- a/extensions/azure/processors/DeleteAzureDataLakeStorage.h
+++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
@@ -24,21 +24,21 @@
#include <utility>
#include <memory>
-#include "AzureDataLakeStorageProcessorBase.h"
+#include "AzureDataLakeStorageFileProcessorBase.h"
template<typename AzureDataLakeStorageProcessorBase>
class AzureDataLakeStorageTestsFixture;
namespace org::apache::nifi::minifi::azure::processors {
-class DeleteAzureDataLakeStorage final : public
AzureDataLakeStorageProcessorBase {
+class DeleteAzureDataLakeStorage final : public
AzureDataLakeStorageFileProcessorBase {
public:
// Supported Relationships
static const core::Relationship Failure;
static const core::Relationship Success;
explicit DeleteAzureDataLakeStorage(const std::string& name, const
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
- : AzureDataLakeStorageProcessorBase(name, uuid,
core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger()) {
+ : AzureDataLakeStorageFileProcessorBase(name, uuid,
core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger()) {
}
~DeleteAzureDataLakeStorage() override = default;
@@ -58,7 +58,7 @@ class DeleteAzureDataLakeStorage final : public
AzureDataLakeStorageProcessorBas
}
explicit DeleteAzureDataLakeStorage(const std::string& name, const
minifi::utils::Identifier& uuid,
std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
- : AzureDataLakeStorageProcessorBase(name, uuid,
core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger(),
std::move(data_lake_storage_client)) {
+ : AzureDataLakeStorageFileProcessorBase(name, uuid,
core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger(),
std::move(data_lake_storage_client)) {
}
std::optional<storage::DeleteAzureDataLakeStorageParameters>
buildDeleteParameters(core::ProcessContext& context, const
std::shared_ptr<core::FlowFile>& flow_file);
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
index 4885190..b41aede 100644
--- a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
@@ -68,7 +68,7 @@ void FetchAzureDataLakeStorage::initialize() {
std::optional<storage::FetchAzureDataLakeStorageParameters>
FetchAzureDataLakeStorage::buildFetchParameters(
core::ProcessContext& context, const std::shared_ptr<core::FlowFile>&
flow_file) {
storage::FetchAzureDataLakeStorageParameters params;
- if (!setCommonParameters(params, context, flow_file)) {
+ if (!setFileOperationCommonParameters(params, context, flow_file)) {
return std::nullopt;
}
@@ -93,7 +93,7 @@ std::optional<storage::FetchAzureDataLakeStorageParameters>
FetchAzureDataLakeSt
void FetchAzureDataLakeStorage::onTrigger(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSession>& session) {
gsl_Expects(context && session);
- logger_->log_debug("FetchAzureDataLakeStorage onTrigger");
+ logger_->log_trace("FetchAzureDataLakeStorage onTrigger");
std::shared_ptr<core::FlowFile> flow_file = session->get();
if (!flow_file) {
context->yield();
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.h
b/extensions/azure/processors/FetchAzureDataLakeStorage.h
index 4768e49..74a2917 100644
--- a/extensions/azure/processors/FetchAzureDataLakeStorage.h
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.h
@@ -24,14 +24,14 @@
#include <utility>
#include <memory>
-#include "AzureDataLakeStorageProcessorBase.h"
+#include "AzureDataLakeStorageFileProcessorBase.h"
template<typename AzureDataLakeStorageProcessor>
class AzureDataLakeStorageTestsFixture;
namespace org::apache::nifi::minifi::azure::processors {
-class FetchAzureDataLakeStorage final : public
AzureDataLakeStorageProcessorBase {
+class FetchAzureDataLakeStorage final : public
AzureDataLakeStorageFileProcessorBase {
public:
// Supported Properties
EXTENSIONAPI static const core::Property RangeStart;
@@ -43,7 +43,7 @@ class FetchAzureDataLakeStorage final : public
AzureDataLakeStorageProcessorBase
static const core::Relationship Success;
explicit FetchAzureDataLakeStorage(const std::string& name, const
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
- : AzureDataLakeStorageProcessorBase(name, uuid,
core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger()) {
+ : AzureDataLakeStorageFileProcessorBase(name, uuid,
core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger()) {
}
~FetchAzureDataLakeStorage() override = default;
@@ -91,7 +91,7 @@ class FetchAzureDataLakeStorage final : public
AzureDataLakeStorageProcessorBase
}
explicit FetchAzureDataLakeStorage(const std::string& name, const
minifi::utils::Identifier& uuid,
std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
- : AzureDataLakeStorageProcessorBase(name, uuid,
core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger(),
std::move(data_lake_storage_client)) {
+ : AzureDataLakeStorageFileProcessorBase(name, uuid,
core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger(),
std::move(data_lake_storage_client)) {
}
std::optional<storage::FetchAzureDataLakeStorageParameters>
buildFetchParameters(core::ProcessContext& context, const
std::shared_ptr<core::FlowFile>& flow_file);
diff --git a/extensions/azure/processors/ListAzureDataLakeStorage.cpp
b/extensions/azure/processors/ListAzureDataLakeStorage.cpp
new file mode 100644
index 0000000..4ee4e3d
--- /dev/null
+++ b/extensions/azure/processors/ListAzureDataLakeStorage.cpp
@@ -0,0 +1,172 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+ core::PropertyBuilder::createProperty("Recurse Subdirectories")
+ ->isRequired(true)
+ ->withDefaultValue<bool>(true)
+ ->withDescription("Indicates whether to list files from subdirectories
of the directory")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+ core::PropertyBuilder::createProperty("File Filter")
+ ->withDescription("Only files whose names match the given regular
expression will be listed")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+ core::PropertyBuilder::createProperty("Path Filter")
+ ->withDescription("When 'Recurse Subdirectories' is true, then only
subdirectories whose paths match the given regular expression will be scanned")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+ core::PropertyBuilder::createProperty("Listing Strategy")
+ ->withDescription("Specify how to determine new/updated entities. If
'timestamps' is selected it tracks the latest timestamp of listed entity to "
+ "determine new/updated entities. If 'none' is selected
it lists an entity without any tracking, the same entity will be listed each
time on executing this processor.")
+
->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+ ->withAllowableValues<std::string>(storage::EntityTracking::values())
+ ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All
FlowFiles that are received are routed to success");
+
+namespace {
+std::shared_ptr<core::FlowFile> createNewFlowFile(core::ProcessSession
&session, const storage::ListDataLakeStorageElement &element) {
+ auto flow_file = session.create();
+ session.putAttribute(flow_file, "azure.filesystem", element.filesystem);
+ session.putAttribute(flow_file, "azure.filePath", element.file_path);
+ session.putAttribute(flow_file, "azure.directory", element.directory);
+ session.putAttribute(flow_file, "azure.filename", element.filename);
+ session.putAttribute(flow_file, "azure.length",
std::to_string(element.length));
+ session.putAttribute(flow_file, "azure.lastModified",
std::to_string(element.last_modified.time_since_epoch() /
std::chrono::milliseconds(1)));
+ session.putAttribute(flow_file, "azure.etag", element.etag);
+ return flow_file;
+}
+} // namespace
+
+void ListAzureDataLakeStorage::initialize() {
+ setSupportedProperties({
+ AzureStorageCredentialsService,
+ FilesystemName,
+ DirectoryName,
+ RecurseSubdirectories,
+ FileFilter,
+ PathFilter,
+ ListingStrategy
+ });
+ setSupportedRelationships({
+ Success
+ });
+}
+
+void ListAzureDataLakeStorage::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ gsl_Expects(context && sessionFactory);
+ AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
+
+ auto state_manager = context->getStateManager();
+ if (state_manager == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+ }
+ state_manager_ =
std::make_unique<minifi::utils::ListingStateManager>(state_manager);
+
+ auto params = buildListParameters(*context);
+ if (!params) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required parameters for
ListAzureDataLakeStorage processor are missing or invalid");
+ }
+
+ list_parameters_ = *std::move(params);
+ tracking_strategy_ =
utils::parseEnumProperty<storage::EntityTracking>(*context, ListingStrategy);
+}
+
+std::optional<storage::ListAzureDataLakeStorageParameters>
ListAzureDataLakeStorage::buildListParameters(core::ProcessContext& context) {
+ storage::ListAzureDataLakeStorageParameters params;
+ if (!setCommonParameters(params, context, nullptr)) {
+ return std::nullopt;
+ }
+
+ if (!context.getProperty(RecurseSubdirectories.getName(),
params.recurse_subdirectories)) {
+ logger_->log_error("Recurse Subdirectories property missing or invalid");
+ return std::nullopt;
+ }
+
+ auto createFilterRegex = [&context](const std::string& property_name) ->
std::optional<std::regex> {
+ try {
+ std::string filter_str;
+ context.getProperty(property_name, filter_str);
+ if (!filter_str.empty()) {
+ return std::regex(filter_str);
+ }
+
+ return std::nullopt;
+ } catch (const std::regex_error&) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " regex is
invalid");
+ }
+ };
+
+ params.file_regex = createFilterRegex(FileFilter.getName());
+ params.path_regex = createFilterRegex(PathFilter.getName());
+
+ return params;
+}
+
+void ListAzureDataLakeStorage::onTrigger(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSession>& session) {
+ gsl_Expects(context && session);
+ logger_->log_trace("ListAzureDataLakeStorage onTrigger");
+
+ auto list_result = azure_data_lake_storage_.listDirectory(list_parameters_);
+ if (!list_result || list_result->empty()) {
+ context->yield();
+ return;
+ }
+
+ auto stored_listing_state = state_manager_->getCurrentState();
+ auto latest_listing_state = stored_listing_state;
+ std::size_t files_transferred = 0;
+
+ for (const auto& element : *list_result) {
+ if (tracking_strategy_ == storage::EntityTracking::TIMESTAMPS &&
stored_listing_state.wasObjectListedAlready(element)) {
+ continue;
+ }
+
+ auto flow_file = createNewFlowFile(*session, element);
+ session->transfer(flow_file, Success);
+ ++files_transferred;
+ latest_listing_state.updateState(element);
+ }
+
+ state_manager_->storeState(latest_listing_state);
+
+ logger_->log_debug("ListAzureDataLakeStorage transferred %zu flow files",
files_transferred);
+
+ if (files_transferred == 0) {
+ logger_->log_debug("No new Azure Data Lake Storage files were found in
directory '%s' of filesystem '%s'", list_parameters_.directory_name,
list_parameters_.file_system_name);
+ context->yield();
+ return;
+ }
+}
+
+REGISTER_RESOURCE(ListAzureDataLakeStorage, "Lists directory in an Azure Data
Lake Storage Gen 2 filesystem");
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/ListAzureDataLakeStorage.h
b/extensions/azure/processors/ListAzureDataLakeStorage.h
new file mode 100644
index 0000000..c08bf95
--- /dev/null
+++ b/extensions/azure/processors/ListAzureDataLakeStorage.h
@@ -0,0 +1,70 @@
+/**
+ * @file ListAzureDataLakeStorage.h
+ * ListAzureDataLakeStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+class ListAzureDataLakeStorageTestsFixture;
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class ListAzureDataLakeStorage final : public
AzureDataLakeStorageProcessorBase {
+ public:
+ EXTENSIONAPI static const core::Property RecurseSubdirectories;
+ EXTENSIONAPI static const core::Property FileFilter;
+ EXTENSIONAPI static const core::Property PathFilter;
+ EXTENSIONAPI static const core::Property ListingStrategy;
+
+ static const core::Relationship Success;
+
+ explicit ListAzureDataLakeStorage(const std::string& name, const
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+ : AzureDataLakeStorageProcessorBase(name, uuid,
core::logging::LoggerFactory<ListAzureDataLakeStorage>::getLogger()) {
+ }
+
+ ~ListAzureDataLakeStorage() override = default;
+
+ void initialize() override;
+ void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+ friend class ::ListAzureDataLakeStorageTestsFixture;
+
+ core::annotation::Input getInputRequirement() const override {
+ return core::annotation::Input::INPUT_FORBIDDEN;
+ }
+
+ explicit ListAzureDataLakeStorage(const std::string& name, const
minifi::utils::Identifier& uuid,
std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+ : AzureDataLakeStorageProcessorBase(name, uuid,
core::logging::LoggerFactory<ListAzureDataLakeStorage>::getLogger(),
std::move(data_lake_storage_client)) {
+ }
+
+ std::optional<storage::ListAzureDataLakeStorageParameters>
buildListParameters(core::ProcessContext& context);
+
+ storage::EntityTracking tracking_strategy_ =
storage::EntityTracking::TIMESTAMPS;
+ storage::ListAzureDataLakeStorageParameters list_parameters_;
+ std::unique_ptr<minifi::utils::ListingStateManager> state_manager_;
+};
+
+} // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.cpp
b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
index e1c0a4b..06d3912 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
@@ -57,7 +57,7 @@ void PutAzureDataLakeStorage::initialize() {
void PutAzureDataLakeStorage::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
gsl_Expects(context && sessionFactory);
- AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
+ AzureDataLakeStorageFileProcessorBase::onSchedule(context, sessionFactory);
std::optional<storage::AzureStorageCredentials> credentials;
std::tie(std::ignore, credentials) =
getCredentialsFromControllerService(*context);
if (!credentials) {
@@ -75,7 +75,7 @@ void PutAzureDataLakeStorage::onSchedule(const
std::shared_ptr<core::ProcessCont
std::optional<storage::PutAzureDataLakeStorageParameters>
PutAzureDataLakeStorage::buildUploadParameters(
core::ProcessContext& context, const std::shared_ptr<core::FlowFile>&
flow_file) {
storage::PutAzureDataLakeStorageParameters params;
- if (!setCommonParameters(params, context, flow_file)) {
+ if (!setFileOperationCommonParameters(params, context, flow_file)) {
return std::nullopt;
}
params.replace_file = conflict_resolution_strategy_ ==
FileExistsResolutionStrategy::REPLACE_FILE;
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h
b/extensions/azure/processors/PutAzureDataLakeStorage.h
index a268711..79bff78 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -24,7 +24,7 @@
#include <utility>
#include <memory>
-#include "AzureDataLakeStorageProcessorBase.h"
+#include "AzureDataLakeStorageFileProcessorBase.h"
#include "utils/Enum.h"
#include "utils/Export.h"
@@ -34,7 +34,7 @@ class AzureDataLakeStorageTestsFixture;
namespace org::apache::nifi::minifi::azure::processors {
-class PutAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase
{
+class PutAzureDataLakeStorage final : public
AzureDataLakeStorageFileProcessorBase {
public:
// Supported Properties
EXTENSIONAPI static const core::Property ConflictResolutionStrategy;
@@ -86,7 +86,7 @@ class PutAzureDataLakeStorage final : public
AzureDataLakeStorageProcessorBase {
}
explicit PutAzureDataLakeStorage(const std::string& name, const
minifi::utils::Identifier& uuid,
std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
- : AzureDataLakeStorageProcessorBase(name, uuid,
core::logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger(),
std::move(data_lake_storage_client)) {
+ : AzureDataLakeStorageFileProcessorBase(name, uuid,
core::logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger(),
std::move(data_lake_storage_client)) {
}
std::optional<storage::PutAzureDataLakeStorageParameters>
buildUploadParameters(core::ProcessContext& context, const
std::shared_ptr<core::FlowFile>& flow_file);
diff --git a/extensions/azure/storage/AzureDataLakeStorage.cpp
b/extensions/azure/storage/AzureDataLakeStorage.cpp
index 34e2d3b..9bcb393 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorage.cpp
@@ -20,11 +20,41 @@
#include "AzureDataLakeStorage.h"
+#include <regex>
+#include <string_view>
+
#include "AzureDataLakeStorageClient.h"
#include "io/StreamPipe.h"
+#include "utils/file/FileUtils.h"
+#include "utils/StringUtils.h"
+#include "utils/gsl.h"
+#include "utils/GeneralUtils.h"
namespace org::apache::nifi::minifi::azure::storage {
+namespace {
+bool matchesPathFilter(std::string_view base_directory, const
std::optional<std::regex>& path_regex, std::string path) {
+ gsl_Expects(utils::implies(!base_directory.empty(),
minifi::utils::StringUtils::startsWith(path, base_directory)));
+ if (!path_regex) {
+ return true;
+ }
+
+ if (!base_directory.empty()) {
+ path = path.size() == base_directory.size() ? "" :
path.substr(base_directory.size() + 1);
+ }
+
+ return std::regex_match(path, *path_regex);
+}
+
+bool matchesFileFilter(const std::optional<std::regex>& file_regex, const
std::string& filename) {
+ if (!file_regex) {
+ return true;
+ }
+
+ return std::regex_match(filename, *file_regex);
+}
+} // namespace
+
AzureDataLakeStorage::AzureDataLakeStorage(std::unique_ptr<DataLakeStorageClient>
data_lake_storage_client)
: data_lake_storage_client_(data_lake_storage_client ?
std::move(data_lake_storage_client) :
std::make_unique<AzureDataLakeStorageClient>()) {
}
@@ -72,4 +102,39 @@ std::optional<uint64_t>
AzureDataLakeStorage::fetchFile(const FetchAzureDataLake
}
}
+std::optional<ListDataLakeStorageResult>
AzureDataLakeStorage::listDirectory(const ListAzureDataLakeStorageParameters&
params) {
+ try {
+ auto list_res = data_lake_storage_client_->listDirectory(params);
+
+ ListDataLakeStorageResult result;
+ for (const auto& azure_element : list_res) {
+ if (azure_element.IsDirectory) {
+ continue;
+ }
+ ListDataLakeStorageElement element;
+ auto [directory, filename] =
minifi::utils::file::FileUtils::split_path(azure_element.Name, true
/*force_posix*/);
+ if (!directory.empty()) {
+ directory = directory.substr(0, directory.size() - 1); // Remove
ending '/' character
+ }
+
+ if (!matchesPathFilter(params.directory_name, params.path_regex,
directory) || !matchesFileFilter(params.file_regex, filename)) {
+ continue;
+ }
+
+ element.filename = filename;
+ element.last_modified =
static_cast<std::chrono::system_clock::time_point>(azure_element.LastModified);
+ element.etag = azure_element.ETag;
+ element.length = azure_element.FileSize;
+ element.filesystem = params.file_system_name;
+ element.file_path = azure_element.Name;
+ element.directory = directory;
+ result.push_back(element);
+ }
+ return result;
+ } catch (const std::exception& ex) {
+ logger_->log_error("An exception occurred while listing directory '%s' of
filesystem '%s': %s", params.directory_name, params.file_system_name,
ex.what());
+ return std::nullopt;
+ }
+}
+
} // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorage.h
b/extensions/azure/storage/AzureDataLakeStorage.h
index d1291a2..7839d68 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.h
+++ b/extensions/azure/storage/AzureDataLakeStorage.h
@@ -28,7 +28,7 @@
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
#include "DataLakeStorageClient.h"
-#include "azure/core/io/body_stream.hpp"
+#include "utils/ListingStateManager.h"
namespace org::apache::nifi::minifi::azure::storage {
@@ -43,6 +43,26 @@ struct UploadDataLakeStorageResult {
std::string primary_uri;
};
+struct ListDataLakeStorageElement : public minifi::utils::ListedObject {
+ std::string filesystem;
+ std::string file_path;
+ std::string directory;
+ std::string filename;
+ uint64_t length = 0;
+ std::chrono::time_point<std::chrono::system_clock> last_modified;
+ std::string etag;
+
+ std::chrono::time_point<std::chrono::system_clock> getLastModified() const
override {
+ return last_modified;
+ }
+
+ std::string getKey() const override {
+ return file_path;
+ }
+};
+
+using ListDataLakeStorageResult = std::vector<ListDataLakeStorageElement>;
+
class AzureDataLakeStorage {
public:
explicit AzureDataLakeStorage(std::unique_ptr<DataLakeStorageClient>
data_lake_storage_client = nullptr);
@@ -50,6 +70,7 @@ class AzureDataLakeStorage {
storage::UploadDataLakeStorageResult uploadFile(const
storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t>
buffer);
bool deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params);
std::optional<uint64_t> fetchFile(const FetchAzureDataLakeStorageParameters&
params, io::BaseStream& stream);
+ std::optional<ListDataLakeStorageResult> listDirectory(const
ListAzureDataLakeStorageParameters& params);
private:
std::shared_ptr<core::logging::Logger>
logger_{core::logging::LoggerFactory<AzureDataLakeStorage>::getLogger()};
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp
b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
index e542432..cfb9adf 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
@@ -61,10 +61,13 @@ void AzureDataLakeStorageClient::resetClientIfNeeded(const
AzureStorageCredentia
number_of_retries_ = number_of_retries;
}
-Azure::Storage::Files::DataLake::DataLakeFileClient
AzureDataLakeStorageClient::getFileClient(const AzureDataLakeStorageParameters&
params) {
+Azure::Storage::Files::DataLake::DataLakeDirectoryClient
AzureDataLakeStorageClient::getDirectoryClient(const
AzureDataLakeStorageParameters& params) {
resetClientIfNeeded(params.credentials, params.file_system_name,
params.number_of_retries);
+ return client_->GetDirectoryClient(params.directory_name);
+}
- auto directory_client = client_->GetDirectoryClient(params.directory_name);
+Azure::Storage::Files::DataLake::DataLakeFileClient
AzureDataLakeStorageClient::getFileClient(const
AzureDataLakeStorageFileOperationParameters& params) {
+ auto directory_client = getDirectoryClient(params);
if (!params.directory_name.empty()) {
directory_client.CreateIfNotExists();
}
@@ -107,4 +110,20 @@ std::unique_ptr<io::InputStream>
AzureDataLakeStorageClient::fetchFile(const Fet
return
std::make_unique<AzureDataLakeStorageInputStream>(std::move(result.Value));
}
+std::vector<Azure::Storage::Files::DataLake::Models::PathItem>
AzureDataLakeStorageClient::listDirectory(const
ListAzureDataLakeStorageParameters& params) {
+ std::vector<Azure::Storage::Files::DataLake::Models::PathItem> result;
+ if (params.directory_name.empty()) {
+ resetClientIfNeeded(params.credentials, params.file_system_name,
params.number_of_retries);
+ for (auto page_result = client_->ListPaths(params.recurse_subdirectories);
page_result.HasPage(); page_result.MoveToNextPage()) {
+ result.insert(result.end(), page_result.Paths.begin(),
page_result.Paths.end());
+ }
+ } else {
+ auto directory_client = getDirectoryClient(params);
+ for (auto page_result =
directory_client.ListPaths(params.recurse_subdirectories);
page_result.HasPage(); page_result.MoveToNextPage()) {
+ result.insert(result.end(), page_result.Paths.begin(),
page_result.Paths.end());
+ }
+ }
+ return result;
+}
+
} // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h
b/extensions/azure/storage/AzureDataLakeStorageClient.h
index cd8d791..9dfcd9f 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.h
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.h
@@ -22,6 +22,7 @@
#include <string>
#include <memory>
#include <utility>
+#include <vector>
#include <azure/storage/files/datalake.hpp>
@@ -64,6 +65,13 @@ class AzureDataLakeStorageClient : public
DataLakeStorageClient {
*/
std::unique_ptr<io::InputStream> fetchFile(const
FetchAzureDataLakeStorageParameters& params) override;
+ /**
+ * Lists a directory in Azure Data Lake Storage
+ * @param params Parameters required for connecting and directory acces on
Azure
+ * @return The list of paths present in the directory
+ */
+ std::vector<Azure::Storage::Files::DataLake::Models::PathItem>
listDirectory(const ListAzureDataLakeStorageParameters& params) override;
+
private:
class AzureDataLakeStorageInputStream : public io::InputStream {
public:
@@ -84,7 +92,8 @@ class AzureDataLakeStorageClient : public
DataLakeStorageClient {
};
void resetClientIfNeeded(const AzureStorageCredentials& credentials, const
std::string& file_system_name, std::optional<uint64_t> number_of_retries);
- Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const
AzureDataLakeStorageParameters& params);
+ Azure::Storage::Files::DataLake::DataLakeDirectoryClient
getDirectoryClient(const AzureDataLakeStorageParameters& params);
+ Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const
AzureDataLakeStorageFileOperationParameters& params);
AzureStorageCredentials credentials_;
std::string file_system_name_;
diff --git a/extensions/azure/storage/DataLakeStorageClient.h
b/extensions/azure/storage/DataLakeStorageClient.h
index eb57d62..f804168 100644
--- a/extensions/azure/storage/DataLakeStorageClient.h
+++ b/extensions/azure/storage/DataLakeStorageClient.h
@@ -22,39 +22,58 @@
#include <string>
#include <optional>
#include <memory>
+#include <vector>
+#include <regex>
#include "AzureStorageCredentials.h"
#include "utils/gsl.h"
#include "io/InputStream.h"
+#include "azure/storage/files/datalake/protocol/datalake_rest_client.hpp"
+#include "utils/Enum.h"
namespace org::apache::nifi::minifi::azure::storage {
+SMART_ENUM(EntityTracking,
+ (NONE, "none"),
+ (TIMESTAMPS, "timestamps")
+)
+
struct AzureDataLakeStorageParameters {
AzureStorageCredentials credentials;
std::string file_system_name;
std::string directory_name;
- std::string filename;
std::optional<uint64_t> number_of_retries;
};
-struct PutAzureDataLakeStorageParameters : public
AzureDataLakeStorageParameters {
+struct AzureDataLakeStorageFileOperationParameters : public
AzureDataLakeStorageParameters {
+ std::string filename;
+};
+
+struct PutAzureDataLakeStorageParameters : public
AzureDataLakeStorageFileOperationParameters {
bool replace_file = false;
};
-using DeleteAzureDataLakeStorageParameters = AzureDataLakeStorageParameters;
+using DeleteAzureDataLakeStorageParameters =
AzureDataLakeStorageFileOperationParameters;
-struct FetchAzureDataLakeStorageParameters : public
AzureDataLakeStorageParameters {
+struct FetchAzureDataLakeStorageParameters : public
AzureDataLakeStorageFileOperationParameters {
std::optional<uint64_t> range_start;
std::optional<uint64_t> range_length;
};
+struct ListAzureDataLakeStorageParameters : public
AzureDataLakeStorageParameters {
+ bool recurse_subdirectories = true;
+ std::optional<std::regex> path_regex;
+ std::optional<std::regex> file_regex;
+};
+
class DataLakeStorageClient {
public:
virtual bool createFile(const PutAzureDataLakeStorageParameters& params) = 0;
virtual std::string uploadFile(const PutAzureDataLakeStorageParameters&
params, gsl::span<const uint8_t> buffer) = 0;
virtual bool deleteFile(const DeleteAzureDataLakeStorageParameters& params)
= 0;
virtual std::unique_ptr<io::InputStream> fetchFile(const
FetchAzureDataLakeStorageParameters& params) = 0;
+ virtual std::vector<Azure::Storage::Files::DataLake::Models::PathItem>
listDirectory(const ListAzureDataLakeStorageParameters& params) = 0;
virtual ~DataLakeStorageClient() = default;
};
diff --git a/libminifi/include/utils/GeneralUtils.h
b/libminifi/include/utils/GeneralUtils.h
index 5d8def7..002a7c9 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -16,8 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#ifndef LIBMINIFI_INCLUDE_UTILS_GENERALUTILS_H_
-#define LIBMINIFI_INCLUDE_UTILS_GENERALUTILS_H_
+#pragma once
#include <memory>
#include <type_traits>
@@ -25,11 +24,7 @@
#include "gsl.h"
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
+namespace org::apache::nifi::minifi::utils {
template<typename T, typename = typename
std::enable_if<std::is_integral<T>::value>::type>
constexpr T intdiv_ceil(T numerator, T denominator) {
@@ -111,10 +106,6 @@ template<typename T>
using remove_cvref_t = typename std::remove_cv<typename
std::remove_reference<T>::type>::type;
#endif
-} // namespace utils
-} // namespace minifi
-} // namespace nifi
-} // namespace apache
-} // namespace org
+inline constexpr bool implies(bool a, bool b) noexcept { return !a || b; }
-#endif // LIBMINIFI_INCLUDE_UTILS_GENERALUTILS_H_
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/ListingStateManager.h
b/libminifi/include/utils/ListingStateManager.h
new file mode 100644
index 0000000..280155a
--- /dev/null
+++ b/libminifi/include/utils/ListingStateManager.h
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <memory>
+#include <chrono>
+#include <utility>
+
+#include "core/CoreComponentState.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+class ListedObject {
+ public:
+ [[nodiscard]] virtual std::chrono::time_point<std::chrono::system_clock>
getLastModified() const = 0;
+ [[nodiscard]] virtual std::string getKey() const = 0;
+ virtual ~ListedObject() = default;
+};
+
+struct ListingState {
+ [[nodiscard]] bool wasObjectListedAlready(const ListedObject
&object_attributes) const;
+ void updateState(const ListedObject &object_attributes);
+ uint64_t getListedKeyTimeStampInMilliseconds() const;
+
+ std::chrono::time_point<std::chrono::system_clock> listed_key_timestamp;
+ std::unordered_set<std::string> listed_keys;
+};
+
+class ListingStateManager {
+ public:
+ explicit
ListingStateManager(std::shared_ptr<core::CoreComponentStateManager>
state_manager)
+ : state_manager_(std::move(state_manager)) {
+ }
+
+ [[nodiscard]] ListingState getCurrentState() const;
+ void storeState(const ListingState &latest_listing_state);
+
+ private:
+ static const std::string LATEST_LISTED_OBJECT_PREFIX;
+ static const std::string LATEST_LISTED_OBJECT_TIMESTAMP;
+
+ [[nodiscard]] static uint64_t
getLatestListedKeyTimestampInMilliseconds(const std::unordered_map<std::string,
std::string> &state);
+ [[nodiscard]] static std::unordered_set<std::string>
getLatestListedKeys(const std::unordered_map<std::string, std::string> &state);
+
+ std::shared_ptr<core::CoreComponentStateManager> state_manager_;
+ const std::string timestamp_key_;
+ const std::string listed_object_prefix_;
+ std::shared_ptr<core::logging::Logger>
logger_{core::logging::LoggerFactory<ListingStateManager>::getLogger()};
+};
+
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/utils/ListingStateManager.cpp
b/libminifi/src/utils/ListingStateManager.cpp
new file mode 100644
index 0000000..385709b
--- /dev/null
+++ b/libminifi/src/utils/ListingStateManager.cpp
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ListingStateManager.h"
+
+#include "core/Property.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+const std::string ListingStateManager::LATEST_LISTED_OBJECT_PREFIX =
"listed_key.";
+const std::string ListingStateManager::LATEST_LISTED_OBJECT_TIMESTAMP =
"listed_timestamp";
+
+bool ListingState::wasObjectListedAlready(const ListedObject &object) const {
+ return listed_key_timestamp > object.getLastModified() ||
+ (listed_key_timestamp == object.getLastModified() &&
listed_keys.find(object.getKey()) != listed_keys.end());
+}
+
+void ListingState::updateState(const ListedObject &object) {
+ if (listed_key_timestamp < object.getLastModified()) {
+ listed_key_timestamp = object.getLastModified();
+ listed_keys.clear();
+ listed_keys.insert(object.getKey());
+ } else if (listed_key_timestamp == object.getLastModified()) {
+ listed_keys.insert(object.getKey());
+ }
+}
+
+uint64_t ListingState::getListedKeyTimeStampInMilliseconds() const {
+ return listed_key_timestamp.time_since_epoch() /
std::chrono::milliseconds(1);
+}
+
+uint64_t ListingStateManager::getLatestListedKeyTimestampInMilliseconds(const
std::unordered_map<std::string, std::string> &state) {
+ std::string stored_listed_key_timestamp_str;
+ auto it = state.find(LATEST_LISTED_OBJECT_TIMESTAMP);
+ if (it != state.end()) {
+ stored_listed_key_timestamp_str = it->second;
+ }
+
+ int64_t stored_listed_key_timestamp = 0;
+ core::Property::StringToInt(stored_listed_key_timestamp_str,
stored_listed_key_timestamp);
+
+ return stored_listed_key_timestamp;
+}
+
+std::unordered_set<std::string> ListingStateManager::getLatestListedKeys(const
std::unordered_map<std::string, std::string> &state) {
+ std::unordered_set<std::string> latest_listed_keys;
+ for (const auto& kvp : state) {
+ if (kvp.first.rfind(LATEST_LISTED_OBJECT_PREFIX, 0) == 0) {
+ latest_listed_keys.insert(kvp.second);
+ }
+ }
+ return latest_listed_keys;
+}
+
+ListingState ListingStateManager::getCurrentState() const {
+ ListingState current_listing_state;
+ std::unordered_map<std::string, std::string> state;
+ if (!state_manager_->get(state)) {
+ logger_->log_info("No stored state for listed objects was found");
+ return current_listing_state;
+ }
+
+ auto milliseconds = getLatestListedKeyTimestampInMilliseconds(state);
+ current_listing_state.listed_key_timestamp =
std::chrono::time_point<std::chrono::system_clock>(std::chrono::milliseconds(milliseconds));
+ logger_->log_debug("Restored previous listed timestamp %lld", milliseconds);
+
+ current_listing_state.listed_keys = getLatestListedKeys(state);
+ return current_listing_state;
+}
+
+void ListingStateManager::storeState(const ListingState &latest_listing_state)
{
+ std::unordered_map<std::string, std::string> state;
+ state[LATEST_LISTED_OBJECT_TIMESTAMP] =
std::to_string(latest_listing_state.getListedKeyTimeStampInMilliseconds());
+
+ uint64_t id = 0;
+ for (const auto& key : latest_listing_state.listed_keys) {
+ state[LATEST_LISTED_OBJECT_PREFIX + std::to_string(id)] = key;
+ ++id;
+ }
+
+ logger_->log_debug("Stored new listed timestamp %s",
state[LATEST_LISTED_OBJECT_TIMESTAMP]);
+ state_manager_->set(state);
+}
+
+} // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/test/aws-tests/ListS3Tests.cpp
b/libminifi/test/aws-tests/ListS3Tests.cpp
index 8ddec26..1792f23 100644
--- a/libminifi/test/aws-tests/ListS3Tests.cpp
+++ b/libminifi/test/aws-tests/ListS3Tests.cpp
@@ -94,7 +94,7 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Test listing without
versioning", "[awsS3L
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.bucket
value:" + S3_BUCKET) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest
value:true") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified
value:") == S3_OBJECT_COUNT);
-
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS)) == S3_OBJECT_COUNT /
2);
+
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS) + "\n") ==
S3_OBJECT_COUNT / 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length
value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version")
== 0);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass
value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT);
@@ -122,7 +122,7 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Test listing with
versioning", "[awsS3List
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest
value:true") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest
value:false") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified
value:") == S3_OBJECT_COUNT * 2);
-
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS)) == S3_OBJECT_COUNT);
+
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS) + "\n") ==
S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length
value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT * 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass
value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT * 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") ==
0);
@@ -215,7 +215,7 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Test truncated
listing without versioning"
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.bucket
value:" + S3_BUCKET) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest
value:true") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified
value:") == S3_OBJECT_COUNT);
-
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS)) == S3_OBJECT_COUNT /
2);
+
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS) + "\n") ==
S3_OBJECT_COUNT / 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length
value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version")
== 0);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass
value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT);
@@ -241,7 +241,7 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Test truncated
listing with versioning", "
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest
value:true") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest
value:false") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified
value:") == S3_OBJECT_COUNT * 2);
-
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS)) == S3_OBJECT_COUNT);
+
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS) + "\n") ==
S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length
value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT * 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass
value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT * 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") ==
0);
diff --git a/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp
b/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp
new file mode 100644
index 0000000..cb96d08
--- /dev/null
+++ b/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp
@@ -0,0 +1,208 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "MockDataLakeStorageClient.h"
+#include "utils/IntegrationTestUtils.h"
+#include "processors/LogAttribute.h"
+#include "processors/ListAzureDataLakeStorage.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+const std::string FILESYSTEM_NAME = "testfilesystem";
+const std::string DIRECTORY_NAME = "testdir";
+const std::string CONNECTION_STRING = "test-connectionstring";
+
+class ListAzureDataLakeStorageTestsFixture {
+ public:
+ ListAzureDataLakeStorageTestsFixture() {
+ LogTestController::getInstance().setDebug<TestPlan>();
+ LogTestController::getInstance().setDebug<minifi::core::Processor>();
+ LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
LogTestController::getInstance().setDebug<minifi::utils::ListingStateManager>();
+
LogTestController::getInstance().setTrace<minifi::azure::processors::ListAzureDataLakeStorage>();
+
+ // Build MiNiFi processing graph
+ plan_ = test_controller_.createPlan();
+ auto mock_data_lake_storage_client =
std::make_unique<MockDataLakeStorageClient>();
+ mock_data_lake_storage_client_ptr_ = mock_data_lake_storage_client.get();
+ list_azure_data_lake_storage_ =
std::shared_ptr<minifi::azure::processors::ListAzureDataLakeStorage>(
+ new
minifi::azure::processors::ListAzureDataLakeStorage("ListAzureDataLakeStorage",
utils::Identifier(), std::move(mock_data_lake_storage_client)));
+
+ plan_->addProcessor(list_azure_data_lake_storage_,
"ListAzureDataLakeStorage", { {"success", "d"} });
+ auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", {
{"success", "d"} }, true);
+ plan_->setProperty(logattribute,
minifi::processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+ azure_storage_cred_service_ =
plan_->addController("AzureStorageCredentialsService",
"AzureStorageCredentialsService");
+ setDefaultProperties();
+ }
+
+ void setDefaultProperties() {
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::AzureStorageCredentialsService.getName(),
"AzureStorageCredentialsService");
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::FilesystemName.getName(),
FILESYSTEM_NAME);
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::DirectoryName.getName(),
DIRECTORY_NAME);
+ plan_->setProperty(azure_storage_cred_service_,
minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(),
CONNECTION_STRING);
+ }
+
+ virtual ~ListAzureDataLakeStorageTestsFixture() {
+ LogTestController::getInstance().reset();
+ }
+
+ protected:
+ TestController test_controller_;
+ std::shared_ptr<TestPlan> plan_;
+ MockDataLakeStorageClient* mock_data_lake_storage_client_ptr_;
+ std::shared_ptr<core::Processor> list_azure_data_lake_storage_;
+ std::shared_ptr<core::controller::ControllerServiceNode>
azure_storage_cred_service_;
+};
+
+namespace {
+
+using namespace std::chrono_literals;
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Azure storage
credentials service is empty", "[azureDataLakeStorageParameters]") {
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::AzureStorageCredentialsService.getName(),
"");
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true),
minifi::Exception);
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Filesystem name is not
set", "[azureDataLakeStorageParameters]") {
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::FilesystemName.getName(),
"");
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true),
minifi::Exception);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ REQUIRE(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid
or empty!"));
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Connection String is
empty", "[azureDataLakeStorageParameters]") {
+ plan_->setProperty(azure_storage_cred_service_,
minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(),
"");
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true),
minifi::Exception);
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "List all files every
time", "[listAzureDataLakeStorage]") {
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::ListingStrategy.getName(),
toString(minifi::azure::storage::EntityTracking::NONE));
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::RecurseSubdirectories.getName(),
"false");
+ test_controller_.runSession(plan_, true);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ auto run_assertions = [this]() {
+ auto passed_params =
mock_data_lake_storage_client_ptr_->getPassedListParams();
+ CHECK(passed_params.credentials.buildConnectionString() ==
CONNECTION_STRING);
+ CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+ CHECK(passed_params.directory_name == DIRECTORY_NAME);
+ CHECK(passed_params.recurse_subdirectories == false);
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" +
DIRECTORY_NAME));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" +
FILESYSTEM_NAME));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filePath
value:testdir/item1.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filePath
value:testdir/sub/item2.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename
value:item1.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename
value:item2.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:128"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:256"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag1"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag2"));
+ };
+ run_assertions();
+ plan_->reset();
+
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+ test_controller_.runSession(plan_, true);
+ run_assertions();
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Do not list same files
the second time when timestamps are tracked", "[listAzureDataLakeStorage]") {
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::ListingStrategy.getName(),
toString(minifi::azure::storage::EntityTracking::TIMESTAMPS));
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::RecurseSubdirectories.getName(),
"false");
+ test_controller_.runSession(plan_, true);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ auto passed_params =
mock_data_lake_storage_client_ptr_->getPassedListParams();
+ CHECK(passed_params.credentials.buildConnectionString() ==
CONNECTION_STRING);
+ CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+ CHECK(passed_params.directory_name == DIRECTORY_NAME);
+ CHECK(passed_params.recurse_subdirectories == false);
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" +
DIRECTORY_NAME));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" +
FILESYSTEM_NAME));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filePath
value:testdir/item1.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filePath
value:testdir/sub/item2.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename
value:item1.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename
value:item2.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:128"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:256"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag1"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag2"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.lastModified value:" +
mock_data_lake_storage_client_ptr_->ITEM1_LAST_MODIFIED + "\n"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.lastModified value:" +
mock_data_lake_storage_client_ptr_->ITEM2_LAST_MODIFIED + "\n"));
+ plan_->reset();
+
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+ test_controller_.runSession(plan_, true);
+ REQUIRE_FALSE(LogTestController::getInstance().contains("key:azure", 0s,
0ms));
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Do not list filtered
files", "[listAzureDataLakeStorage]") {
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::FileFilter.getName(),
"item1.*g");
+ test_controller_.runSession(plan_, true);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ auto passed_params =
mock_data_lake_storage_client_ptr_->getPassedListParams();
+ CHECK(passed_params.credentials.buildConnectionString() ==
CONNECTION_STRING);
+ CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+ CHECK(passed_params.directory_name == DIRECTORY_NAME);
+ CHECK(passed_params.recurse_subdirectories == true);
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" +
DIRECTORY_NAME));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" +
FILESYSTEM_NAME));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filePath
value:testdir/item1.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename
value:item1.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:128"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag1"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.lastModified value:" +
mock_data_lake_storage_client_ptr_->ITEM1_LAST_MODIFIED + "\n"));
+ CHECK_FALSE(LogTestController::getInstance().contains("key:azure.filePath
value:testdir/sub/item2.log", 0s, 0ms));
+ CHECK_FALSE(LogTestController::getInstance().contains("key:azure.filename
value:item2.log", 0s, 0ms));
+ CHECK_FALSE(LogTestController::getInstance().contains("key:azure.length
value:256", 0s, 0ms));
+ CHECK_FALSE(LogTestController::getInstance().contains("key:azure.etag
value:etag2", 0s, 0ms));
+
CHECK_FALSE(LogTestController::getInstance().contains("key:azure.lastModified
value:" + mock_data_lake_storage_client_ptr_->ITEM2_LAST_MODIFIED, 0s, 0ms));
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Do not list filtered
paths", "[listAzureDataLakeStorage]") {
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::PathFilter.getName(),
"su.*");
+ test_controller_.runSession(plan_, true);
+ using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+ auto passed_params =
mock_data_lake_storage_client_ptr_->getPassedListParams();
+ CHECK(passed_params.credentials.buildConnectionString() ==
CONNECTION_STRING);
+ CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+ CHECK(passed_params.directory_name == DIRECTORY_NAME);
+ CHECK(passed_params.recurse_subdirectories == true);
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" +
DIRECTORY_NAME));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" +
FILESYSTEM_NAME));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filePath
value:testdir/sub/item2.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename
value:item2.log"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:256"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag2"));
+ CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.lastModified value:" +
mock_data_lake_storage_client_ptr_->ITEM2_LAST_MODIFIED + "\n"));
+ CHECK_FALSE(LogTestController::getInstance().contains("key:azure.filePath
value:testdir/item1.log", 0s, 0ms));
+ CHECK_FALSE(LogTestController::getInstance().contains("key:azure.filename
value:item1.log", 0s, 0ms));
+ CHECK_FALSE(LogTestController::getInstance().contains("key:azure.length
value:128", 0s, 0ms));
+ CHECK_FALSE(LogTestController::getInstance().contains("key:azure.etag
value:etag1", 0s, 0ms));
+
CHECK_FALSE(LogTestController::getInstance().contains("key:azure.lastModified
value:" + mock_data_lake_storage_client_ptr_->ITEM1_LAST_MODIFIED, 0s, 0ms));
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Throw on invalid file
filter", "[listAzureDataLakeStorage]") {
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::FileFilter.getName(),
"(item1][].*g");
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true),
minifi::Exception);
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Throw on invalid path
filter", "[listAzureDataLakeStorage]") {
+ plan_->setProperty(list_azure_data_lake_storage_,
minifi::azure::processors::ListAzureDataLakeStorage::PathFilter.getName(),
"su.([[*");
+ REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true),
minifi::Exception);
+}
+
+} // namespace
diff --git a/libminifi/test/azure-tests/MockDataLakeStorageClient.h
b/libminifi/test/azure-tests/MockDataLakeStorageClient.h
index 8969e1e..b100697 100644
--- a/libminifi/test/azure-tests/MockDataLakeStorageClient.h
+++ b/libminifi/test/azure-tests/MockDataLakeStorageClient.h
@@ -31,6 +31,8 @@ class MockDataLakeStorageClient : public
org::apache::nifi::minifi::azure::stora
public:
const std::string PRIMARY_URI = "http://test-uri/file";
const std::string FETCHED_DATA = "test azure data for stream";
+ const std::string ITEM1_LAST_MODIFIED = "1631292120000";
+ const std::string ITEM2_LAST_MODIFIED = "1634127120000";
bool createFile(const
org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters&
/*params*/) override {
if (file_creation_error_) {
@@ -81,6 +83,33 @@ class MockDataLakeStorageClient : public
org::apache::nifi::minifi::azure::stora
return
std::make_unique<org::apache::nifi::minifi::io::BufferStream>(buffer_.data(),
buffer_.size());
}
+ std::vector<Azure::Storage::Files::DataLake::Models::PathItem>
listDirectory(const
org::apache::nifi::minifi::azure::storage::ListAzureDataLakeStorageParameters&
params) override {
+ list_params_ = params;
+ std::vector<Azure::Storage::Files::DataLake::Models::PathItem> result;
+ Azure::Storage::Files::DataLake::Models::PathItem diritem;
+ diritem.IsDirectory = true;
+ diritem.Name = "testdir/";
+
+ Azure::Storage::Files::DataLake::Models::PathItem item1;
+ item1.IsDirectory = false;
+ item1.Name = "testdir/item1.log";
+ item1.LastModified = Azure::DateTime(2021, 9, 10, 16, 42, 0);
+ item1.ETag = "etag1";
+ item1.FileSize = 128;
+
+ Azure::Storage::Files::DataLake::Models::PathItem item2;
+ item2.IsDirectory = false;
+ item2.Name = "testdir/sub/item2.log";
+ item2.LastModified = Azure::DateTime(2021, 10, 13, 12, 12, 0);
+ item2.ETag = "etag2";
+ item2.FileSize = 256;
+
+ result.push_back(diritem);
+ result.push_back(item1);
+ result.push_back(item2);
+ return result;
+ }
+
void setFileCreation(bool create_file) {
create_file_ = create_file;
}
@@ -117,6 +146,10 @@ class MockDataLakeStorageClient : public
org::apache::nifi::minifi::azure::stora
return fetch_params_;
}
+
org::apache::nifi::minifi::azure::storage::ListAzureDataLakeStorageParameters
getPassedListParams() const {
+ return list_params_;
+ }
+
private:
const std::string RETURNED_PRIMARY_URI = "http://test-uri/file?secret-sas";
bool create_file_ = true;
@@ -130,4 +163,5 @@ class MockDataLakeStorageClient : public
org::apache::nifi::minifi::azure::stora
org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters
put_params_;
org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters
delete_params_;
org::apache::nifi::minifi::azure::storage::FetchAzureDataLakeStorageParameters
fetch_params_;
+
org::apache::nifi::minifi::azure::storage::ListAzureDataLakeStorageParameters
list_params_;
};