This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a3615d  MINIFICPP-1631 Add ListAzureDataLakeStorage processor
3a3615d is described below

commit 3a3615debb9129e7b954827debccaecc68b66006
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Jan 26 11:50:58 2022 +0100

    MINIFICPP-1631 Add ListAzureDataLakeStorage processor
    
    and extract ListingStateManager
    
    Closes #1242
    Signed-off-by: Marton Szasz <[email protected]>
---
 PROCESSORS.md                                      |  96 ++++++----
 README.md                                          |   2 +-
 extensions/aws/processors/ListS3.cpp               |  78 +-------
 extensions/aws/processors/ListS3.h                 |  18 +-
 extensions/aws/s3/S3Wrapper.cpp                    |   4 +-
 extensions/aws/s3/S3Wrapper.h                      |  14 +-
 .../AzureDataLakeStorageFileProcessorBase.cpp      |  49 +++++
 .../AzureDataLakeStorageFileProcessorBase.h        |  54 ++++++
 .../AzureDataLakeStorageProcessorBase.cpp          |  11 --
 .../processors/AzureDataLakeStorageProcessorBase.h |   1 -
 .../processors/DeleteAzureDataLakeStorage.cpp      |   2 +-
 .../azure/processors/DeleteAzureDataLakeStorage.h  |   8 +-
 .../azure/processors/FetchAzureDataLakeStorage.cpp |   4 +-
 .../azure/processors/FetchAzureDataLakeStorage.h   |   8 +-
 .../azure/processors/ListAzureDataLakeStorage.cpp  | 172 +++++++++++++++++
 .../azure/processors/ListAzureDataLakeStorage.h    |  70 +++++++
 .../azure/processors/PutAzureDataLakeStorage.cpp   |   4 +-
 .../azure/processors/PutAzureDataLakeStorage.h     |   6 +-
 extensions/azure/storage/AzureDataLakeStorage.cpp  |  65 +++++++
 extensions/azure/storage/AzureDataLakeStorage.h    |  23 ++-
 .../azure/storage/AzureDataLakeStorageClient.cpp   |  23 ++-
 .../azure/storage/AzureDataLakeStorageClient.h     |  11 +-
 extensions/azure/storage/DataLakeStorageClient.h   |  27 ++-
 libminifi/include/utils/GeneralUtils.h             |  17 +-
 libminifi/include/utils/ListingStateManager.h      |  72 +++++++
 libminifi/src/utils/ListingStateManager.cpp        | 100 ++++++++++
 libminifi/test/aws-tests/ListS3Tests.cpp           |   8 +-
 .../azure-tests/ListAzureDataLakeStorageTests.cpp  | 208 +++++++++++++++++++++
 .../test/azure-tests/MockDataLakeStorageClient.h   |  34 ++++
 29 files changed, 1008 insertions(+), 181 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index d941df6..5b7882f 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -34,10 +34,11 @@
 - [GetUSBCamera](#getusbcamera)
 - [HashContent](#hashcontent)
 - [InvokeHTTP](#invokehttp)
-- [ListSFTP](#listsftp)
+- [ListAzureDataLakeStorage](#listazuredatalakestorage)
 - [ListenHTTP](#listenhttp)
 - [ListenSyslog](#listensyslog)
 - [ListS3](#lists3)
+- [ListSFTP](#listsftp)
 - [LogAttribute](#logattribute)
 - [ManipulateArchive](#manipulatearchive)
 - [MergeContent](#mergecontent)
@@ -881,48 +882,25 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 |failure|The original FlowFile will be routed on any type of connection 
failure, timeout or general exception. It will have new attributes detailing 
the request.|
 
 
-## ListSFTP
+## ListAzureDataLakeStorage
 
 ### Description
 
-Performs a listing of the files residing on an SFTP server. For each file that 
is found on the remote server, a new FlowFile will be created with the filename 
attribute set to the name of the file on the remote server. This can then be 
used in conjunction with FetchSFTP in order to fetch those files.
+Lists directory in an Azure Data Lake Storage Gen 2 filesystem
 ### Properties
 
 In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
 
 | Name | Default Value | Allowable Values | Description |
 | - | - | - | - |
-|**Connection Timeout**|30 sec||Amount of time to wait before timing out while 
creating a connection|
-|**Data Timeout**|30 sec||When transferring a file between the local and 
remote system, this value specifies how long is allowed to elapse without any 
data being transferred between systems|
-|Entity Tracking Initial Listing Target|All Available|All 
Available<br>Tracking Time Window<br>|Specify how initial listing should be 
handled. Used by 'Tracking Entities' strategy.|
-|Entity Tracking Time Window|||Specify how long this processor should track 
already-listed entities. 'Tracking Entities' strategy can pick any entity whose 
timestamp is inside the specified time window. For example, if set to '30 
minutes', any entity having timestamp in recent 30 minutes will be the listing 
target when this processor runs. A listed entity is considered 'new/updated' 
and a FlowFile is emitted if one of following condition meets: 1. does not 
exist in the already-listed ent [...]
-|File Filter Regex|||Provides a Java Regular Expression for filtering 
Filenames; if a filter is supplied, only files whose names match that Regular 
Expression will be fetched|
-|**Follow symlink**|false||If true, will pull even symbolic files and also 
nested symbolic subdirectories; otherwise, will not read symbolic files and 
will not traverse symbolic link subdirectories|
-|Host Key File|||If supplied, the given file will be used as the Host Key; 
otherwise, no use host key file will be used|
-|**Hostname**|||The fully qualified hostname or IP address of the remote 
system<br/>**Supports Expression Language: true**|
-|Http Proxy Password|||Http Proxy Password<br/>**Supports Expression Language: 
true**|
-|Http Proxy Username|||Http Proxy Username<br/>**Supports Expression Language: 
true**|
-|**Ignore Dotted Files**|true||If true, files whose names begin with a dot 
(".") will be ignored|
-|**Listing Strategy**|Tracking Timestamps|Tracking Entities<br>Tracking 
Timestamps<br>|Specify how to determine new/updated entities. See each strategy 
descriptions for detail.|
-|Maximum File Age|||The maximum age that a file must be in order to be pulled; 
any file older than this amount of time (according to last modification date) 
will be ignored|
-|Maximum File Size|||The maximum size that a file must be in order to be 
pulled|
-|**Minimum File Age**|0 sec||The minimum age that a file must be in order to 
be pulled; any file younger than this amount of time (according to last 
modification date) will be ignored|
-|**Minimum File Size**|0 B||The minimum size that a file must be in order to 
be pulled|
-|Password|||Password for the user account<br/>**Supports Expression Language: 
true**|
-|Path Filter Regex|||When Search Recursively is true, then only subdirectories 
whose path matches the given Regular Expression will be scanned|
-|**Port**|||The port that the remote system is listening on for file 
transfers<br/>**Supports Expression Language: true**|
-|Private Key Passphrase|||Password for the private key<br/>**Supports 
Expression Language: true**|
-|Private Key Path|||The fully qualified path to the Private Key 
file<br/>**Supports Expression Language: true**|
-|Proxy Host|||The fully qualified hostname or IP address of the proxy 
server<br/>**Supports Expression Language: true**|
-|Proxy Port|||The port of the proxy server<br/>**Supports Expression Language: 
true**|
-|Proxy Type|DIRECT|DIRECT<br>HTTP<br>SOCKS<br>|Specifies the Proxy 
Configuration Controller Service to proxy network requests. If set, it 
supersedes proxy settings configured per component. Supported proxies: HTTP + 
AuthN, SOCKS + AuthN|
-|Remote Path|||The fully qualified filename on the remote 
system<br/>**Supports Expression Language: true**|
-|**Search Recursively**|false||If true, will pull files from arbitrarily 
nested subdirectories; otherwise, will not traverse subdirectories|
-|**Send Keep Alive On Timeout**|true||Indicates whether or not to send a 
single Keep Alive message when SSH socket times out|
-|**State File**|ListSFTP||Specifies the file that should be used for storing 
state about what data has been ingested so that upon restart MiNiFi can resume 
from where it left off|
-|**Strict Host Key Checking**|false||Indicates whether or not strict 
enforcement of hosts keys should be applied|
-|**Target System Timestamp Precision**|Auto Detect|Auto 
Detect<br>Milliseconds<br>Minutes<br>Seconds<br>|Specify timestamp precision at 
the target system. Since this processor uses timestamp of entities to decide 
which should be listed, it is crucial to use the right timestamp precision.|
-|**Username**|||Username<br/>**Supports Expression Language: true**|
+|**Azure Storage Credentials Service**|||Name of the Azure Storage Credentials 
Service used to retrieve the connection string from.|
+|**Filesystem Name**|||Name of the Azure Storage File System. It is assumed to 
be already existing.<br/>**Supports Expression Language: true**|
+|Directory Name|||Name of the Azure Storage Directory. The Directory Name 
cannot contain a leading '/'. If left empty it designates the root directory. 
The directory will be created if not already existing.<br/>**Supports 
Expression Language: true**|
+|**Recurse Subdirectories**|true||Indicates whether to list files from 
subdirectories of the directory|
+|File Filter|||Only files whose names match the given regular expression will 
be listed|
+|Path Filter|||When 'Recurse Subdirectories' is true, then only subdirectories 
whose paths match the given regular expression will be scanned|
+|Listing Strategy|timestamps|none<br/>timestamps|Specify how to determine 
new/updated entities. If 'timestamps' is selected it tracks the latest 
timestamp of listed entity to determine new/updated entities. If 'none' is 
selected it lists an entity without any tracking, the same entity will be 
listed each time on executing this processor.|
+
 ### Relationships
 
 | Name | Description |
@@ -1016,6 +994,56 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 |**Write User Metadata**|false||If set to 'true', the user defined metadata 
associated with the S3 object will be added to FlowFile attributes/records.|
 |**Requester Pays**|false||If true, indicates that the requester consents to 
pay any charges associated with listing the S3 bucket. This sets the 
'x-amz-request-payer' header to 'requester'. Note that this setting is only 
used if Write User Metadata is true.|
 
+
+## ListSFTP
+
+### Description
+
+Performs a listing of the files residing on an SFTP server. For each file that 
is found on the remote server, a new FlowFile will be created with the filename 
attribute set to the name of the file on the remote server. This can then be 
used in conjunction with FetchSFTP in order to fetch those files.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Connection Timeout**|30 sec||Amount of time to wait before timing out while 
creating a connection|
+|**Data Timeout**|30 sec||When transferring a file between the local and 
remote system, this value specifies how long is allowed to elapse without any 
data being transferred between systems|
+|Entity Tracking Initial Listing Target|All Available|All 
Available<br>Tracking Time Window<br>|Specify how initial listing should be 
handled. Used by 'Tracking Entities' strategy.|
+|Entity Tracking Time Window|||Specify how long this processor should track 
already-listed entities. 'Tracking Entities' strategy can pick any entity whose 
timestamp is inside the specified time window. For example, if set to '30 
minutes', any entity having timestamp in recent 30 minutes will be the listing 
target when this processor runs. A listed entity is considered 'new/updated' 
and a FlowFile is emitted if one of following condition meets: 1. does not 
exist in the already-listed ent [...]
+|File Filter Regex|||Provides a Java Regular Expression for filtering 
Filenames; if a filter is supplied, only files whose names match that Regular 
Expression will be fetched|
+|**Follow symlink**|false||If true, will pull even symbolic files and also 
nested symbolic subdirectories; otherwise, will not read symbolic files and 
will not traverse symbolic link subdirectories|
+|Host Key File|||If supplied, the given file will be used as the Host Key; 
otherwise, no use host key file will be used|
+|**Hostname**|||The fully qualified hostname or IP address of the remote 
system<br/>**Supports Expression Language: true**|
+|Http Proxy Password|||Http Proxy Password<br/>**Supports Expression Language: 
true**|
+|Http Proxy Username|||Http Proxy Username<br/>**Supports Expression Language: 
true**|
+|**Ignore Dotted Files**|true||If true, files whose names begin with a dot 
(".") will be ignored|
+|**Listing Strategy**|Tracking Timestamps|Tracking Entities<br>Tracking 
Timestamps<br>|Specify how to determine new/updated entities. See each strategy 
descriptions for detail.|
+|Maximum File Age|||The maximum age that a file must be in order to be pulled; 
any file older than this amount of time (according to last modification date) 
will be ignored|
+|Maximum File Size|||The maximum size that a file must be in order to be 
pulled|
+|**Minimum File Age**|0 sec||The minimum age that a file must be in order to 
be pulled; any file younger than this amount of time (according to last 
modification date) will be ignored|
+|**Minimum File Size**|0 B||The minimum size that a file must be in order to 
be pulled|
+|Password|||Password for the user account<br/>**Supports Expression Language: 
true**|
+|Path Filter Regex|||When Search Recursively is true, then only subdirectories 
whose path matches the given Regular Expression will be scanned|
+|**Port**|||The port that the remote system is listening on for file 
transfers<br/>**Supports Expression Language: true**|
+|Private Key Passphrase|||Password for the private key<br/>**Supports 
Expression Language: true**|
+|Private Key Path|||The fully qualified path to the Private Key 
file<br/>**Supports Expression Language: true**|
+|Proxy Host|||The fully qualified hostname or IP address of the proxy 
server<br/>**Supports Expression Language: true**|
+|Proxy Port|||The port of the proxy server<br/>**Supports Expression Language: 
true**|
+|Proxy Type|DIRECT|DIRECT<br>HTTP<br>SOCKS<br>|Specifies the Proxy 
Configuration Controller Service to proxy network requests. If set, it 
supersedes proxy settings configured per component. Supported proxies: HTTP + 
AuthN, SOCKS + AuthN|
+|Remote Path|||The fully qualified filename on the remote 
system<br/>**Supports Expression Language: true**|
+|**Search Recursively**|false||If true, will pull files from arbitrarily 
nested subdirectories; otherwise, will not traverse subdirectories|
+|**Send Keep Alive On Timeout**|true||Indicates whether or not to send a 
single Keep Alive message when SSH socket times out|
+|**State File**|ListSFTP||Specifies the file that should be used for storing 
state about what data has been ingested so that upon restart MiNiFi can resume 
from where it left off|
+|**Strict Host Key Checking**|false||Indicates whether or not strict 
enforcement of hosts keys should be applied|
+|**Target System Timestamp Precision**|Auto Detect|Auto 
Detect<br>Milliseconds<br>Minutes<br>Seconds<br>|Specify timestamp precision at 
the target system. Since this processor uses timestamp of entities to decide 
which should be listed, it is crucial to use the right timestamp precision.|
+|**Username**|||Username<br/>**Supports Expression Language: true**|
+### Relationships
+
+| Name | Description |
+| - | - |
+|success|All FlowFiles that are received are routed to success|
+
+
 ### Relationships
 
 | Name | Description |
diff --git a/README.md b/README.md
index c7d99f8..6011698 100644
--- a/README.md
+++ b/README.md
@@ -77,7 +77,7 @@ Through JNI extensions you can run NiFi processors using 
NARs. The JNI extension
 | ------------- |:-------------| :-----|
 | Archive Extensions    | 
[ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry)
      |   -DBUILD_LIBARCHIVE=ON |
 | AWS | 
[AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3)
 | -DENABLE_AWS=ON  |
-| Azure | 
[AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[DeleteAzureBlobStorage](#deleteazureblobstorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)<br/>[FetchAzureDataLakeStorage](#fetchazuredatalakestorage)
 | -DENABLE_AZURE=ON  |
+| Azure | 
[AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[DeleteAzureBlobStorage](#deleteazureblobstorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)<br/>[FetchAzureDataLakeStorage](#fetchazuredatalakestorage)<br/>[ListAzureDataLakeStorage](#listazuredatalakestorage)
 | -DENABLE_AZURE=ON  |
 | CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp)  | -DDISABLE_CIVET=ON |
 | CURL | [InvokeHTTP](PROCESSORS.md#invokehttp)      |    -DDISABLE_CURL=ON  |
 | GPS | GetGPS      |    -DENABLE_GPS=ON  |
diff --git a/extensions/aws/processors/ListS3.cpp 
b/extensions/aws/processors/ListS3.cpp
index 0f958eb..9573aa4 100644
--- a/extensions/aws/processors/ListS3.cpp
+++ b/extensions/aws/processors/ListS3.cpp
@@ -36,9 +36,6 @@ namespace minifi {
 namespace aws {
 namespace processors {
 
-const std::string ListS3::LATEST_LISTED_KEY_PREFIX = "listed_key.";
-const std::string ListS3::LATEST_LISTED_KEY_TIMESTAMP = "listed_timestamp";
-
 const core::Property ListS3::Delimiter(
   core::PropertyBuilder::createProperty("Delimiter")
     ->withDescription("The string used to delimit directories within the 
bucket. Please consult the AWS documentation for the correct use of this 
field.")
@@ -93,10 +90,11 @@ void ListS3::initialize() {
 void ListS3::onSchedule(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
   S3Processor::onSchedule(context, sessionFactory);
 
-  state_manager_ = context->getStateManager();
-  if (state_manager_ == nullptr) {
+  auto state_manager = context->getStateManager();
+  if (state_manager == nullptr) {
     throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
   }
+  state_manager_ = 
std::make_unique<minifi::utils::ListingStateManager>(state_manager);
 
   auto common_properties = getCommonELSupportedProperties(context, nullptr);
   if (!common_properties) {
@@ -177,54 +175,6 @@ void ListS3::writeUserMetadata(
   }
 }
 
-std::vector<std::string> ListS3::getLatestListedKeys(const 
std::unordered_map<std::string, std::string> &state) {
-  std::vector<std::string> latest_listed_keys;
-  for (const auto& kvp : state) {
-    if (kvp.first.rfind(LATEST_LISTED_KEY_PREFIX, 0) == 0) {
-      latest_listed_keys.push_back(kvp.second);
-    }
-  }
-  return latest_listed_keys;
-}
-
-uint64_t ListS3::getLatestListedKeyTimestamp(const 
std::unordered_map<std::string, std::string> &state) {
-  std::string stored_listed_key_timestamp_str;
-  auto it = state.find(LATEST_LISTED_KEY_TIMESTAMP);
-  if (it != state.end()) {
-    stored_listed_key_timestamp_str = it->second;
-  }
-
-  int64_t stored_listed_key_timestamp = 0;
-  core::Property::StringToInt(stored_listed_key_timestamp_str, 
stored_listed_key_timestamp);
-
-  return stored_listed_key_timestamp;
-}
-
-ListS3::ListingState ListS3::getCurrentState(const 
std::shared_ptr<core::ProcessContext>& /*context*/) {
-  ListS3::ListingState current_listing_state;
-  std::unordered_map<std::string, std::string> state;
-  if (!state_manager_->get(state)) {
-    logger_->log_info("No stored state for listed objects was found");
-    return current_listing_state;
-  }
-
-  current_listing_state.listed_key_timestamp = 
getLatestListedKeyTimestamp(state);
-  logger_->log_debug("Restored previous listed timestamp %lld", 
current_listing_state.listed_key_timestamp);
-
-  current_listing_state.listed_keys = getLatestListedKeys(state);
-  return current_listing_state;
-}
-
-void ListS3::storeState(const ListS3::ListingState &latest_listing_state) {
-  std::unordered_map<std::string, std::string> state;
-  state[LATEST_LISTED_KEY_TIMESTAMP] = 
std::to_string(latest_listing_state.listed_key_timestamp);
-  for (std::size_t i = 0; i < latest_listing_state.listed_keys.size(); ++i) {
-    state[LATEST_LISTED_KEY_PREFIX + std::to_string(i)] = 
latest_listing_state.listed_keys.at(i);
-  }
-  logger_->log_debug("Stored new listed timestamp %lld", 
latest_listing_state.listed_key_timestamp);
-  state_manager_->set(state);
-}
-
 void ListS3::createNewFlowFile(
     core::ProcessSession &session,
     const aws::s3::ListedObjectAttributes &object_attributes) {
@@ -233,7 +183,7 @@ void ListS3::createNewFlowFile(
   session.putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, 
object_attributes.filename);
   session.putAttribute(flow_file, "s3.etag", object_attributes.etag);
   session.putAttribute(flow_file, "s3.isLatest", object_attributes.is_latest ? 
"true" : "false");
-  session.putAttribute(flow_file, "s3.lastModified", 
std::to_string(object_attributes.last_modified));
+  session.putAttribute(flow_file, "s3.lastModified", 
std::to_string(object_attributes.last_modified.time_since_epoch() / 
std::chrono::milliseconds(1)));
   session.putAttribute(flow_file, "s3.length", 
std::to_string(object_attributes.length));
   session.putAttribute(flow_file, "s3.storeClass", 
object_attributes.store_class);
   if (!object_attributes.version.empty()) {
@@ -255,7 +205,7 @@ void ListS3::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, con
     return;
   }
 
-  auto stored_listing_state = getCurrentState(context);
+  auto stored_listing_state = state_manager_->getCurrentState();
   auto latest_listing_state = stored_listing_state;
   std::size_t files_transferred = 0;
 
@@ -270,7 +220,7 @@ void ListS3::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, con
   }
 
   logger_->log_debug("ListS3 transferred %zu flow files", files_transferred);
-  storeState(latest_listing_state);
+  state_manager_->storeState(latest_listing_state);
 
   if (files_transferred == 0) {
     logger_->log_debug("No new S3 objects were found in bucket %s to list", 
list_request_params_->bucket);
@@ -279,22 +229,6 @@ void ListS3::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, con
   }
 }
 
-bool ListS3::ListingState::wasObjectListedAlready(const 
aws::s3::ListedObjectAttributes &object_attributes) const {
-  return listed_key_timestamp > object_attributes.last_modified ||
-      (listed_key_timestamp == object_attributes.last_modified &&
-        std::find(listed_keys.begin(), listed_keys.end(), 
object_attributes.filename) != listed_keys.end());
-}
-
-void ListS3::ListingState::updateState(const aws::s3::ListedObjectAttributes 
&object_attributes) {
-  if (listed_key_timestamp < object_attributes.last_modified) {
-    listed_key_timestamp = object_attributes.last_modified;
-    listed_keys.clear();
-    listed_keys.push_back(object_attributes.filename);
-  } else if (listed_key_timestamp == object_attributes.last_modified) {
-    listed_keys.push_back(object_attributes.filename);
-  }
-}
-
 REGISTER_RESOURCE(ListS3, "This Processor retrieves a listing of objects from 
an Amazon S3 bucket.");
 
 }  // namespace processors
diff --git a/extensions/aws/processors/ListS3.h 
b/extensions/aws/processors/ListS3.h
index d8c2e3b..ff3a53c 100644
--- a/extensions/aws/processors/ListS3.h
+++ b/extensions/aws/processors/ListS3.h
@@ -39,8 +39,6 @@ namespace processors {
 class ListS3 : public S3Processor {
  public:
   static constexpr char const* ProcessorName = "ListS3";
-  static const std::string LATEST_LISTED_KEY_PREFIX;
-  static const std::string LATEST_LISTED_KEY_TIMESTAMP;
 
   // Supported Properties
   static const core::Property Delimiter;
@@ -57,7 +55,6 @@ class ListS3 : public S3Processor {
   explicit ListS3(const std::string& name, const minifi::utils::Identifier& 
uuid = minifi::utils::Identifier())
     : S3Processor(name, uuid, 
core::logging::LoggerFactory<ListS3>::getLogger()) {
   }
-
   explicit ListS3(const std::string& name, minifi::utils::Identifier uuid, 
std::unique_ptr<aws::s3::S3RequestSender> s3_request_sender)
     : S3Processor(name, uuid, 
core::logging::LoggerFactory<ListS3>::getLogger(), 
std::move(s3_request_sender)) {
   }
@@ -73,17 +70,6 @@ class ListS3 : public S3Processor {
     return core::annotation::Input::INPUT_FORBIDDEN;
   }
 
-  struct ListingState {
-    int64_t listed_key_timestamp = 0;
-    std::vector<std::string> listed_keys;
-
-    bool wasObjectListedAlready(const aws::s3::ListedObjectAttributes 
&object_attributes) const;
-    void updateState(const aws::s3::ListedObjectAttributes &object_attributes);
-  };
-
-  static std::vector<std::string> getLatestListedKeys(const 
std::unordered_map<std::string, std::string> &state);
-  static uint64_t getLatestListedKeyTimestamp(const 
std::unordered_map<std::string, std::string> &state);
-
   void writeObjectTags(
     const aws::s3::ListedObjectAttributes &object_attributes,
     core::ProcessSession &session,
@@ -92,8 +78,6 @@ class ListS3 : public S3Processor {
     const aws::s3::ListedObjectAttributes &object_attributes,
     core::ProcessSession &session,
     const std::shared_ptr<core::FlowFile> &flow_file);
-  ListingState getCurrentState(const std::shared_ptr<core::ProcessContext> 
&context);
-  void storeState(const ListingState &latest_listing_state);
   void createNewFlowFile(
     core::ProcessSession &session,
     const aws::s3::ListedObjectAttributes &object_attributes);
@@ -102,7 +86,7 @@ class ListS3 : public S3Processor {
   bool write_object_tags_ = false;
   bool write_user_metadata_ = false;
   bool requester_pays_ = false;
-  std::shared_ptr<core::CoreComponentStateManager> state_manager_ = nullptr;
+  std::unique_ptr<minifi::utils::ListingStateManager> state_manager_;
 };
 
 }  // namespace processors
diff --git a/extensions/aws/s3/S3Wrapper.cpp b/extensions/aws/s3/S3Wrapper.cpp
index c283791..d78a2e4 100644
--- a/extensions/aws/s3/S3Wrapper.cpp
+++ b/extensions/aws/s3/S3Wrapper.cpp
@@ -161,7 +161,7 @@ void S3Wrapper::addListResults(const 
Aws::Vector<Aws::S3::Model::ObjectVersion>&
     attributes.etag = 
minifi::utils::StringUtils::removeFramingCharacters(version.GetETag(), '"');
     attributes.filename = version.GetKey();
     attributes.is_latest = version.GetIsLatest();
-    attributes.last_modified = version.GetLastModified().Millis();
+    attributes.last_modified = version.GetLastModified().UnderlyingTimestamp();
     attributes.length = version.GetSize();
     attributes.store_class = 
VERSION_STORAGE_CLASS_MAP.at(version.GetStorageClass());
     attributes.version = version.GetVersionId();
@@ -180,7 +180,7 @@ void S3Wrapper::addListResults(const 
Aws::Vector<Aws::S3::Model::Object>& conten
     attributes.etag = 
minifi::utils::StringUtils::removeFramingCharacters(object.GetETag(), '"');
     attributes.filename = object.GetKey();
     attributes.is_latest = true;
-    attributes.last_modified = object.GetLastModified().Millis();
+    attributes.last_modified = object.GetLastModified().UnderlyingTimestamp();
     attributes.length = object.GetSize();
     attributes.store_class = 
OBJECT_STORAGE_CLASS_MAP.at(object.GetStorageClass());
     listed_objects.push_back(attributes);
diff --git a/extensions/aws/s3/S3Wrapper.h b/extensions/aws/s3/S3Wrapper.h
index 8dd9ce1..3c547b1 100644
--- a/extensions/aws/s3/S3Wrapper.h
+++ b/extensions/aws/s3/S3Wrapper.h
@@ -38,6 +38,8 @@
 #include "utils/AWSInitializer.h"
 #include "utils/OptionalUtils.h"
 #include "utils/StringUtils.h"
+#include "utils/ListingStateManager.h"
+#include "utils/gsl.h"
 #include "io/BaseStream.h"
 #include "S3RequestSender.h"
 
@@ -177,11 +179,19 @@ struct ListRequestParameters : public RequestParameters {
   uint64_t min_object_age = 0;
 };
 
-struct ListedObjectAttributes {
+struct ListedObjectAttributes : public minifi::utils::ListedObject {
+  std::chrono::time_point<std::chrono::system_clock> getLastModified() const 
override {
+    return last_modified;
+  }
+
+  std::string getKey() const override {
+    return filename;
+  }
+
   std::string filename;
   std::string etag;
   bool is_latest = false;
-  int64_t last_modified = 0;
+  std::chrono::time_point<std::chrono::system_clock> last_modified;
   int64_t length = 0;
   std::string store_class;
   std::string version;
diff --git 
a/extensions/azure/processors/AzureDataLakeStorageFileProcessorBase.cpp 
b/extensions/azure/processors/AzureDataLakeStorageFileProcessorBase.cpp
new file mode 100644
index 0000000..1ddee04
--- /dev/null
+++ b/extensions/azure/processors/AzureDataLakeStorageFileProcessorBase.cpp
@@ -0,0 +1,49 @@
+/**
+ * @file AzureDataLakeStorageFileProcessorBase.cpp
+ * AzureDataLakeStorageFileProcessorBase class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AzureDataLakeStorageFileProcessorBase.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property AzureDataLakeStorageFileProcessorBase::FileName(
+    core::PropertyBuilder::createProperty("File Name")
+      ->withDescription("The filename in Azure Storage. If left empty the 
filename attribute will be used by default.")
+      ->supportsExpressionLanguage(true)
+      ->build());
+
+bool AzureDataLakeStorageFileProcessorBase::setFileOperationCommonParameters(
+    storage::AzureDataLakeStorageFileOperationParameters& params, 
core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& 
flow_file) {
+  if (!setCommonParameters(params, context, flow_file)) {
+    return false;
+  }
+
+  context.getProperty(FileName, params.filename, flow_file);
+  if (params.filename.empty() && (!flow_file->getAttribute("filename", 
params.filename) || params.filename.empty())) {
+    logger_->log_error("No File Name is set and default object key 'filename' 
attribute could not be found!");
+    return false;
+  }
+
+  return true;
+}
+
+}  // namespace org::apache::nifi::minifi::azure::processors
diff --git 
a/extensions/azure/processors/AzureDataLakeStorageFileProcessorBase.h 
b/extensions/azure/processors/AzureDataLakeStorageFileProcessorBase.h
new file mode 100644
index 0000000..0f9a003
--- /dev/null
+++ b/extensions/azure/processors/AzureDataLakeStorageFileProcessorBase.h
@@ -0,0 +1,54 @@
+/**
+ * @file AzureDataLakeStorageFileProcessorBase.h
+ * AzureDataLakeStorageFileProcessorBase class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class AzureDataLakeStorageFileProcessorBase : public 
AzureDataLakeStorageProcessorBase {
+ public:
+  // Supported Properties
+  EXTENSIONAPI static const core::Property FileName;
+
+  explicit AzureDataLakeStorageFileProcessorBase(const std::string& name, 
const minifi::utils::Identifier& uuid, const 
std::shared_ptr<core::logging::Logger> &logger)
+    : AzureDataLakeStorageProcessorBase(name, uuid, logger) {
+  }
+
+  ~AzureDataLakeStorageFileProcessorBase() override = default;
+
+ protected:
+  explicit AzureDataLakeStorageFileProcessorBase(const std::string& name, 
const minifi::utils::Identifier& uuid, const 
std::shared_ptr<core::logging::Logger> &logger,
+      std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+    : AzureDataLakeStorageProcessorBase(name, uuid, logger, 
std::move(data_lake_storage_client)) {
+  }
+
+  bool setFileOperationCommonParameters(
+    storage::AzureDataLakeStorageFileOperationParameters& params,
+    core::ProcessContext& context,
+    const std::shared_ptr<core::FlowFile>& flow_file);
+};
+
+}  // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp 
b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp
index 04d7664..ed526f3 100644
--- a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp
+++ b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.cpp
@@ -37,11 +37,6 @@ const core::Property 
AzureDataLakeStorageProcessorBase::DirectoryName(
                         "If left empty it designates the root directory. The 
directory will be created if not already existing.")
       ->supportsExpressionLanguage(true)
       ->build());
-const core::Property AzureDataLakeStorageProcessorBase::FileName(
-    core::PropertyBuilder::createProperty("File Name")
-      ->withDescription("The filename in Azure Storage. If left empty the 
filename attribute will be used by default.")
-      ->supportsExpressionLanguage(true)
-      ->build());
 
 void AzureDataLakeStorageProcessorBase::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
   gsl_Expects(context);
@@ -69,12 +64,6 @@ bool AzureDataLakeStorageProcessorBase::setCommonParameters(
 
   context.getProperty(DirectoryName, params.directory_name, flow_file);
 
-  context.getProperty(FileName, params.filename, flow_file);
-  if (params.filename.empty() && (!flow_file->getAttribute("filename", 
params.filename) || params.filename.empty())) {
-    logger_->log_error("No File Name is set and default object key 'filename' 
attribute could not be found!");
-    return false;
-  }
-
   return true;
 }
 
diff --git a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h 
b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h
index 14f2a92..65c3232 100644
--- a/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h
+++ b/extensions/azure/processors/AzureDataLakeStorageProcessorBase.h
@@ -38,7 +38,6 @@ class AzureDataLakeStorageProcessorBase : public 
AzureStorageProcessorBase {
   // Supported Properties
   EXTENSIONAPI static const core::Property FilesystemName;
   EXTENSIONAPI static const core::Property DirectoryName;
-  EXTENSIONAPI static const core::Property FileName;
 
   explicit AzureDataLakeStorageProcessorBase(const std::string& name, const 
minifi::utils::Identifier& uuid, const std::shared_ptr<core::logging::Logger> 
&logger)
     : AzureStorageProcessorBase(name, uuid, logger) {
diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp 
b/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp
index 6146a31..c43e327 100644
--- a/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.cpp
@@ -48,7 +48,7 @@ void DeleteAzureDataLakeStorage::initialize() {
 std::optional<storage::DeleteAzureDataLakeStorageParameters> 
DeleteAzureDataLakeStorage::buildDeleteParameters(
     core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& 
flow_file) {
   storage::DeleteAzureDataLakeStorageParameters params;
-  if (!setCommonParameters(params, context, flow_file)) {
+  if (!setFileOperationCommonParameters(params, context, flow_file)) {
     return std::nullopt;
   }
 
diff --git a/extensions/azure/processors/DeleteAzureDataLakeStorage.h 
b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
index cb26a2f..3813387 100644
--- a/extensions/azure/processors/DeleteAzureDataLakeStorage.h
+++ b/extensions/azure/processors/DeleteAzureDataLakeStorage.h
@@ -24,21 +24,21 @@
 #include <utility>
 #include <memory>
 
-#include "AzureDataLakeStorageProcessorBase.h"
+#include "AzureDataLakeStorageFileProcessorBase.h"
 
 template<typename AzureDataLakeStorageProcessorBase>
 class AzureDataLakeStorageTestsFixture;
 
 namespace org::apache::nifi::minifi::azure::processors {
 
-class DeleteAzureDataLakeStorage final : public 
AzureDataLakeStorageProcessorBase {
+class DeleteAzureDataLakeStorage final : public 
AzureDataLakeStorageFileProcessorBase {
  public:
   // Supported Relationships
   static const core::Relationship Failure;
   static const core::Relationship Success;
 
   explicit DeleteAzureDataLakeStorage(const std::string& name, const 
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
-    : AzureDataLakeStorageProcessorBase(name, uuid, 
core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger()) {
+    : AzureDataLakeStorageFileProcessorBase(name, uuid, 
core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger()) {
   }
 
   ~DeleteAzureDataLakeStorage() override = default;
@@ -58,7 +58,7 @@ class DeleteAzureDataLakeStorage final : public 
AzureDataLakeStorageProcessorBas
   }
 
   explicit DeleteAzureDataLakeStorage(const std::string& name, const 
minifi::utils::Identifier& uuid, 
std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
-    : AzureDataLakeStorageProcessorBase(name, uuid, 
core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger(), 
std::move(data_lake_storage_client)) {
+    : AzureDataLakeStorageFileProcessorBase(name, uuid, 
core::logging::LoggerFactory<DeleteAzureDataLakeStorage>::getLogger(), 
std::move(data_lake_storage_client)) {
   }
 
   std::optional<storage::DeleteAzureDataLakeStorageParameters> 
buildDeleteParameters(core::ProcessContext& context, const 
std::shared_ptr<core::FlowFile>& flow_file);
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp 
b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
index 4885190..b41aede 100644
--- a/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.cpp
@@ -68,7 +68,7 @@ void FetchAzureDataLakeStorage::initialize() {
 std::optional<storage::FetchAzureDataLakeStorageParameters> 
FetchAzureDataLakeStorage::buildFetchParameters(
     core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& 
flow_file) {
   storage::FetchAzureDataLakeStorageParameters params;
-  if (!setCommonParameters(params, context, flow_file)) {
+  if (!setFileOperationCommonParameters(params, context, flow_file)) {
     return std::nullopt;
   }
 
@@ -93,7 +93,7 @@ std::optional<storage::FetchAzureDataLakeStorageParameters> 
FetchAzureDataLakeSt
 
 void FetchAzureDataLakeStorage::onTrigger(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSession>& session) {
   gsl_Expects(context && session);
-  logger_->log_debug("FetchAzureDataLakeStorage onTrigger");
+  logger_->log_trace("FetchAzureDataLakeStorage onTrigger");
   std::shared_ptr<core::FlowFile> flow_file = session->get();
   if (!flow_file) {
     context->yield();
diff --git a/extensions/azure/processors/FetchAzureDataLakeStorage.h 
b/extensions/azure/processors/FetchAzureDataLakeStorage.h
index 4768e49..74a2917 100644
--- a/extensions/azure/processors/FetchAzureDataLakeStorage.h
+++ b/extensions/azure/processors/FetchAzureDataLakeStorage.h
@@ -24,14 +24,14 @@
 #include <utility>
 #include <memory>
 
-#include "AzureDataLakeStorageProcessorBase.h"
+#include "AzureDataLakeStorageFileProcessorBase.h"
 
 template<typename AzureDataLakeStorageProcessor>
 class AzureDataLakeStorageTestsFixture;
 
 namespace org::apache::nifi::minifi::azure::processors {
 
-class FetchAzureDataLakeStorage final : public 
AzureDataLakeStorageProcessorBase {
+class FetchAzureDataLakeStorage final : public 
AzureDataLakeStorageFileProcessorBase {
  public:
   // Supported Properties
   EXTENSIONAPI static const core::Property RangeStart;
@@ -43,7 +43,7 @@ class FetchAzureDataLakeStorage final : public 
AzureDataLakeStorageProcessorBase
   static const core::Relationship Success;
 
   explicit FetchAzureDataLakeStorage(const std::string& name, const 
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
-    : AzureDataLakeStorageProcessorBase(name, uuid, 
core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger()) {
+    : AzureDataLakeStorageFileProcessorBase(name, uuid, 
core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger()) {
   }
 
   ~FetchAzureDataLakeStorage() override = default;
@@ -91,7 +91,7 @@ class FetchAzureDataLakeStorage final : public 
AzureDataLakeStorageProcessorBase
   }
 
   explicit FetchAzureDataLakeStorage(const std::string& name, const 
minifi::utils::Identifier& uuid, 
std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
-    : AzureDataLakeStorageProcessorBase(name, uuid, 
core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger(), 
std::move(data_lake_storage_client)) {
+    : AzureDataLakeStorageFileProcessorBase(name, uuid, 
core::logging::LoggerFactory<FetchAzureDataLakeStorage>::getLogger(), 
std::move(data_lake_storage_client)) {
   }
 
   std::optional<storage::FetchAzureDataLakeStorageParameters> 
buildFetchParameters(core::ProcessContext& context, const 
std::shared_ptr<core::FlowFile>& flow_file);
diff --git a/extensions/azure/processors/ListAzureDataLakeStorage.cpp 
b/extensions/azure/processors/ListAzureDataLakeStorage.cpp
new file mode 100644
index 0000000..4ee4e3d
--- /dev/null
+++ b/extensions/azure/processors/ListAzureDataLakeStorage.cpp
@@ -0,0 +1,172 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+    core::PropertyBuilder::createProperty("Recurse Subdirectories")
+      ->isRequired(true)
+      ->withDefaultValue<bool>(true)
+      ->withDescription("Indicates whether to list files from subdirectories 
of the directory")
+      ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+  core::PropertyBuilder::createProperty("File Filter")
+    ->withDescription("Only files whose names match the given regular 
expression will be listed")
+    ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+  core::PropertyBuilder::createProperty("Path Filter")
+    ->withDescription("When 'Recurse Subdirectories' is true, then only 
subdirectories whose paths match the given regular expression will be scanned")
+    ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+  core::PropertyBuilder::createProperty("Listing Strategy")
+    ->withDescription("Specify how to determine new/updated entities. If 
'timestamps' is selected it tracks the latest timestamp of listed entity to "
+                      "determine new/updated entities. If 'none' is selected 
it lists an entity without any tracking, the same entity will be listed each 
time on executing this processor.")
+    
->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+    ->withAllowableValues<std::string>(storage::EntityTracking::values())
+    ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All 
FlowFiles that are received are routed to success");
+
+namespace {
+std::shared_ptr<core::FlowFile> createNewFlowFile(core::ProcessSession 
&session, const storage::ListDataLakeStorageElement &element) {
+  auto flow_file = session.create();
+  session.putAttribute(flow_file, "azure.filesystem", element.filesystem);
+  session.putAttribute(flow_file, "azure.filePath", element.file_path);
+  session.putAttribute(flow_file, "azure.directory", element.directory);
+  session.putAttribute(flow_file, "azure.filename", element.filename);
+  session.putAttribute(flow_file, "azure.length", 
std::to_string(element.length));
+  session.putAttribute(flow_file, "azure.lastModified", 
std::to_string(element.last_modified.time_since_epoch() / 
std::chrono::milliseconds(1)));
+  session.putAttribute(flow_file, "azure.etag", element.etag);
+  return flow_file;
+}
+}  // namespace
+
+void ListAzureDataLakeStorage::initialize() {
+  setSupportedProperties({
+    AzureStorageCredentialsService,
+    FilesystemName,
+    DirectoryName,
+    RecurseSubdirectories,
+    FileFilter,
+    PathFilter,
+    ListingStrategy
+  });
+  setSupportedRelationships({
+    Success
+  });
+}
+
+void ListAzureDataLakeStorage::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  gsl_Expects(context && sessionFactory);
+  AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
+
+  auto state_manager = context->getStateManager();
+  if (state_manager == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+  state_manager_ = 
std::make_unique<minifi::utils::ListingStateManager>(state_manager);
+
+  auto params = buildListParameters(*context);
+  if (!params) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required parameters for 
ListAzureDataLakeStorage processor are missing or invalid");
+  }
+
+  list_parameters_ = *std::move(params);
+  tracking_strategy_ = 
utils::parseEnumProperty<storage::EntityTracking>(*context, ListingStrategy);
+}
+
+std::optional<storage::ListAzureDataLakeStorageParameters> 
ListAzureDataLakeStorage::buildListParameters(core::ProcessContext& context) {
+  storage::ListAzureDataLakeStorageParameters params;
+  if (!setCommonParameters(params, context, nullptr)) {
+    return std::nullopt;
+  }
+
+  if (!context.getProperty(RecurseSubdirectories.getName(), 
params.recurse_subdirectories)) {
+    logger_->log_error("Recurse Subdirectories property missing or invalid");
+    return std::nullopt;
+  }
+
+  auto createFilterRegex = [&context](const std::string& property_name) -> 
std::optional<std::regex> {
+    try {
+      std::string filter_str;
+      context.getProperty(property_name, filter_str);
+      if (!filter_str.empty()) {
+        return std::regex(filter_str);
+      }
+
+      return std::nullopt;
+    } catch (const std::regex_error&) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, property_name + " regex is 
invalid");
+    }
+  };
+
+  params.file_regex = createFilterRegex(FileFilter.getName());
+  params.path_regex = createFilterRegex(PathFilter.getName());
+
+  return params;
+}
+
+void ListAzureDataLakeStorage::onTrigger(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session);
+  logger_->log_trace("ListAzureDataLakeStorage onTrigger");
+
+  auto list_result = azure_data_lake_storage_.listDirectory(list_parameters_);
+  if (!list_result || list_result->empty()) {
+    context->yield();
+    return;
+  }
+
+  auto stored_listing_state = state_manager_->getCurrentState();
+  auto latest_listing_state = stored_listing_state;
+  std::size_t files_transferred = 0;
+
+  for (const auto& element : *list_result) {
+    if (tracking_strategy_ == storage::EntityTracking::TIMESTAMPS && 
stored_listing_state.wasObjectListedAlready(element)) {
+      continue;
+    }
+
+    auto flow_file = createNewFlowFile(*session, element);
+    session->transfer(flow_file, Success);
+    ++files_transferred;
+    latest_listing_state.updateState(element);
+  }
+
+  state_manager_->storeState(latest_listing_state);
+
+  logger_->log_debug("ListAzureDataLakeStorage transferred %zu flow files", 
files_transferred);
+
+  if (files_transferred == 0) {
+    logger_->log_debug("No new Azure Data Lake Storage files were found in 
directory '%s' of filesystem '%s'", list_parameters_.directory_name, 
list_parameters_.file_system_name);
+    context->yield();
+    return;
+  }
+}
+
+REGISTER_RESOURCE(ListAzureDataLakeStorage, "Lists directory in an Azure Data 
Lake Storage Gen 2 filesystem");
+
+}  // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/ListAzureDataLakeStorage.h 
b/extensions/azure/processors/ListAzureDataLakeStorage.h
new file mode 100644
index 0000000..c08bf95
--- /dev/null
+++ b/extensions/azure/processors/ListAzureDataLakeStorage.h
@@ -0,0 +1,70 @@
+/**
+ * @file ListAzureDataLakeStorage.h
+ * ListAzureDataLakeStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+class ListAzureDataLakeStorageTestsFixture;
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class ListAzureDataLakeStorage final : public 
AzureDataLakeStorageProcessorBase {
+ public:
+  EXTENSIONAPI static const core::Property RecurseSubdirectories;
+  EXTENSIONAPI static const core::Property FileFilter;
+  EXTENSIONAPI static const core::Property PathFilter;
+  EXTENSIONAPI static const core::Property ListingStrategy;
+
+  static const core::Relationship Success;
+
+  explicit ListAzureDataLakeStorage(const std::string& name, const 
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+    : AzureDataLakeStorageProcessorBase(name, uuid, 
core::logging::LoggerFactory<ListAzureDataLakeStorage>::getLogger()) {
+  }
+
+  ~ListAzureDataLakeStorage() override = default;
+
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+  friend class ::ListAzureDataLakeStorageTestsFixture;
+
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_FORBIDDEN;
+  }
+
+  explicit ListAzureDataLakeStorage(const std::string& name, const 
minifi::utils::Identifier& uuid, 
std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+    : AzureDataLakeStorageProcessorBase(name, uuid, 
core::logging::LoggerFactory<ListAzureDataLakeStorage>::getLogger(), 
std::move(data_lake_storage_client)) {
+  }
+
+  std::optional<storage::ListAzureDataLakeStorageParameters> 
buildListParameters(core::ProcessContext& context);
+
+  storage::EntityTracking tracking_strategy_ = 
storage::EntityTracking::TIMESTAMPS;
+  storage::ListAzureDataLakeStorageParameters list_parameters_;
+  std::unique_ptr<minifi::utils::ListingStateManager> state_manager_;
+};
+
+}  // namespace org::apache::nifi::minifi::azure::processors
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.cpp 
b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
index e1c0a4b..06d3912 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.cpp
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.cpp
@@ -57,7 +57,7 @@ void PutAzureDataLakeStorage::initialize() {
 
 void PutAzureDataLakeStorage::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
   gsl_Expects(context && sessionFactory);
-  AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
+  AzureDataLakeStorageFileProcessorBase::onSchedule(context, sessionFactory);
   std::optional<storage::AzureStorageCredentials> credentials;
   std::tie(std::ignore, credentials) = 
getCredentialsFromControllerService(*context);
   if (!credentials) {
@@ -75,7 +75,7 @@ void PutAzureDataLakeStorage::onSchedule(const 
std::shared_ptr<core::ProcessCont
 std::optional<storage::PutAzureDataLakeStorageParameters> 
PutAzureDataLakeStorage::buildUploadParameters(
     core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& 
flow_file) {
   storage::PutAzureDataLakeStorageParameters params;
-  if (!setCommonParameters(params, context, flow_file)) {
+  if (!setFileOperationCommonParameters(params, context, flow_file)) {
     return std::nullopt;
   }
   params.replace_file = conflict_resolution_strategy_ == 
FileExistsResolutionStrategy::REPLACE_FILE;
diff --git a/extensions/azure/processors/PutAzureDataLakeStorage.h 
b/extensions/azure/processors/PutAzureDataLakeStorage.h
index a268711..79bff78 100644
--- a/extensions/azure/processors/PutAzureDataLakeStorage.h
+++ b/extensions/azure/processors/PutAzureDataLakeStorage.h
@@ -24,7 +24,7 @@
 #include <utility>
 #include <memory>
 
-#include "AzureDataLakeStorageProcessorBase.h"
+#include "AzureDataLakeStorageFileProcessorBase.h"
 
 #include "utils/Enum.h"
 #include "utils/Export.h"
@@ -34,7 +34,7 @@ class AzureDataLakeStorageTestsFixture;
 
 namespace org::apache::nifi::minifi::azure::processors {
 
-class PutAzureDataLakeStorage final : public AzureDataLakeStorageProcessorBase 
{
+class PutAzureDataLakeStorage final : public 
AzureDataLakeStorageFileProcessorBase {
  public:
   // Supported Properties
   EXTENSIONAPI static const core::Property ConflictResolutionStrategy;
@@ -86,7 +86,7 @@ class PutAzureDataLakeStorage final : public 
AzureDataLakeStorageProcessorBase {
   }
 
   explicit PutAzureDataLakeStorage(const std::string& name, const 
minifi::utils::Identifier& uuid, 
std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
-    : AzureDataLakeStorageProcessorBase(name, uuid, 
core::logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger(), 
std::move(data_lake_storage_client)) {
+    : AzureDataLakeStorageFileProcessorBase(name, uuid, 
core::logging::LoggerFactory<PutAzureDataLakeStorage>::getLogger(), 
std::move(data_lake_storage_client)) {
   }
 
   std::optional<storage::PutAzureDataLakeStorageParameters> 
buildUploadParameters(core::ProcessContext& context, const 
std::shared_ptr<core::FlowFile>& flow_file);
diff --git a/extensions/azure/storage/AzureDataLakeStorage.cpp 
b/extensions/azure/storage/AzureDataLakeStorage.cpp
index 34e2d3b..9bcb393 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorage.cpp
@@ -20,11 +20,41 @@
 
 #include "AzureDataLakeStorage.h"
 
+#include <regex>
+#include <string_view>
+
 #include "AzureDataLakeStorageClient.h"
 #include "io/StreamPipe.h"
+#include "utils/file/FileUtils.h"
+#include "utils/StringUtils.h"
+#include "utils/gsl.h"
+#include "utils/GeneralUtils.h"
 
 namespace org::apache::nifi::minifi::azure::storage {
 
+namespace {
+bool matchesPathFilter(std::string_view base_directory, const 
std::optional<std::regex>& path_regex, std::string path) {
+  gsl_Expects(utils::implies(!base_directory.empty(), 
minifi::utils::StringUtils::startsWith(path, base_directory)));
+  if (!path_regex) {
+    return true;
+  }
+
+  if (!base_directory.empty()) {
+    path = path.size() == base_directory.size() ? "" : 
path.substr(base_directory.size() + 1);
+  }
+
+  return std::regex_match(path, *path_regex);
+}
+
+bool matchesFileFilter(const std::optional<std::regex>& file_regex, const 
std::string& filename) {
+  if (!file_regex) {
+    return true;
+  }
+
+  return std::regex_match(filename, *file_regex);
+}
+}  // namespace
+
 
AzureDataLakeStorage::AzureDataLakeStorage(std::unique_ptr<DataLakeStorageClient>
 data_lake_storage_client)
   : data_lake_storage_client_(data_lake_storage_client ? 
std::move(data_lake_storage_client) : 
std::make_unique<AzureDataLakeStorageClient>()) {
 }
@@ -72,4 +102,39 @@ std::optional<uint64_t> 
AzureDataLakeStorage::fetchFile(const FetchAzureDataLake
   }
 }
 
+std::optional<ListDataLakeStorageResult> 
AzureDataLakeStorage::listDirectory(const ListAzureDataLakeStorageParameters& 
params) {
+  try {
+    auto list_res = data_lake_storage_client_->listDirectory(params);
+
+    ListDataLakeStorageResult result;
+    for (const auto& azure_element : list_res) {
+      if (azure_element.IsDirectory) {
+        continue;
+      }
+      ListDataLakeStorageElement element;
+      auto [directory, filename] = 
minifi::utils::file::FileUtils::split_path(azure_element.Name, true 
/*force_posix*/);
+      if (!directory.empty()) {
+        directory = directory.substr(0, directory.size() - 1);  // Remove 
ending '/' character
+      }
+
+      if (!matchesPathFilter(params.directory_name, params.path_regex, 
directory) || !matchesFileFilter(params.file_regex, filename)) {
+        continue;
+      }
+
+      element.filename = filename;
+      element.last_modified = 
static_cast<std::chrono::system_clock::time_point>(azure_element.LastModified);
+      element.etag = azure_element.ETag;
+      element.length = azure_element.FileSize;
+      element.filesystem = params.file_system_name;
+      element.file_path = azure_element.Name;
+      element.directory = directory;
+      result.push_back(element);
+    }
+    return result;
+  } catch (const std::exception& ex) {
+    logger_->log_error("An exception occurred while listing directory '%s' of 
filesystem '%s': %s", params.directory_name, params.file_system_name, 
ex.what());
+    return std::nullopt;
+  }
+}
+
 }  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorage.h 
b/extensions/azure/storage/AzureDataLakeStorage.h
index d1291a2..7839d68 100644
--- a/extensions/azure/storage/AzureDataLakeStorage.h
+++ b/extensions/azure/storage/AzureDataLakeStorage.h
@@ -28,7 +28,7 @@
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "DataLakeStorageClient.h"
-#include "azure/core/io/body_stream.hpp"
+#include "utils/ListingStateManager.h"
 
 namespace org::apache::nifi::minifi::azure::storage {
 
@@ -43,6 +43,26 @@ struct UploadDataLakeStorageResult {
   std::string primary_uri;
 };
 
+struct ListDataLakeStorageElement : public minifi::utils::ListedObject {
+  std::string filesystem;
+  std::string file_path;
+  std::string directory;
+  std::string filename;
+  uint64_t length = 0;
+  std::chrono::time_point<std::chrono::system_clock> last_modified;
+  std::string etag;
+
+  std::chrono::time_point<std::chrono::system_clock> getLastModified() const 
override {
+    return last_modified;
+  }
+
+  std::string getKey() const override {
+    return file_path;
+  }
+};
+
+using ListDataLakeStorageResult = std::vector<ListDataLakeStorageElement>;
+
 class AzureDataLakeStorage {
  public:
   explicit AzureDataLakeStorage(std::unique_ptr<DataLakeStorageClient> 
data_lake_storage_client = nullptr);
@@ -50,6 +70,7 @@ class AzureDataLakeStorage {
   storage::UploadDataLakeStorageResult uploadFile(const 
storage::PutAzureDataLakeStorageParameters& params, gsl::span<const uint8_t> 
buffer);
   bool deleteFile(const storage::DeleteAzureDataLakeStorageParameters& params);
   std::optional<uint64_t> fetchFile(const FetchAzureDataLakeStorageParameters& 
params, io::BaseStream& stream);
+  std::optional<ListDataLakeStorageResult> listDirectory(const 
ListAzureDataLakeStorageParameters& params);
 
  private:
   std::shared_ptr<core::logging::Logger> 
logger_{core::logging::LoggerFactory<AzureDataLakeStorage>::getLogger()};
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.cpp 
b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
index e542432..cfb9adf 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.cpp
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.cpp
@@ -61,10 +61,13 @@ void AzureDataLakeStorageClient::resetClientIfNeeded(const 
AzureStorageCredentia
   number_of_retries_ = number_of_retries;
 }
 
-Azure::Storage::Files::DataLake::DataLakeFileClient 
AzureDataLakeStorageClient::getFileClient(const AzureDataLakeStorageParameters& 
params) {
+Azure::Storage::Files::DataLake::DataLakeDirectoryClient 
AzureDataLakeStorageClient::getDirectoryClient(const 
AzureDataLakeStorageParameters& params) {
   resetClientIfNeeded(params.credentials, params.file_system_name, 
params.number_of_retries);
+  return client_->GetDirectoryClient(params.directory_name);
+}
 
-  auto directory_client = client_->GetDirectoryClient(params.directory_name);
+Azure::Storage::Files::DataLake::DataLakeFileClient 
AzureDataLakeStorageClient::getFileClient(const 
AzureDataLakeStorageFileOperationParameters& params) {
+  auto directory_client = getDirectoryClient(params);
   if (!params.directory_name.empty()) {
     directory_client.CreateIfNotExists();
   }
@@ -107,4 +110,20 @@ std::unique_ptr<io::InputStream> 
AzureDataLakeStorageClient::fetchFile(const Fet
   return 
std::make_unique<AzureDataLakeStorageInputStream>(std::move(result.Value));
 }
 
+std::vector<Azure::Storage::Files::DataLake::Models::PathItem> 
AzureDataLakeStorageClient::listDirectory(const 
ListAzureDataLakeStorageParameters& params) {
+  std::vector<Azure::Storage::Files::DataLake::Models::PathItem> result;
+  if (params.directory_name.empty()) {
+    resetClientIfNeeded(params.credentials, params.file_system_name, 
params.number_of_retries);
+    for (auto page_result = client_->ListPaths(params.recurse_subdirectories); 
page_result.HasPage(); page_result.MoveToNextPage()) {
+      result.insert(result.end(), page_result.Paths.begin(), 
page_result.Paths.end());
+    }
+  } else {
+    auto directory_client = getDirectoryClient(params);
+    for (auto page_result = 
directory_client.ListPaths(params.recurse_subdirectories); 
page_result.HasPage(); page_result.MoveToNextPage()) {
+      result.insert(result.end(), page_result.Paths.begin(), 
page_result.Paths.end());
+    }
+  }
+  return result;
+}
+
 }  // namespace org::apache::nifi::minifi::azure::storage
diff --git a/extensions/azure/storage/AzureDataLakeStorageClient.h 
b/extensions/azure/storage/AzureDataLakeStorageClient.h
index cd8d791..9dfcd9f 100644
--- a/extensions/azure/storage/AzureDataLakeStorageClient.h
+++ b/extensions/azure/storage/AzureDataLakeStorageClient.h
@@ -22,6 +22,7 @@
 #include <string>
 #include <memory>
 #include <utility>
+#include <vector>
 
 #include <azure/storage/files/datalake.hpp>
 
@@ -64,6 +65,13 @@ class AzureDataLakeStorageClient : public 
DataLakeStorageClient {
    */
   std::unique_ptr<io::InputStream> fetchFile(const 
FetchAzureDataLakeStorageParameters& params) override;
 
+  /**
+   * Lists a directory in Azure Data Lake Storage
+   * @param params Parameters required for connecting and directory acces on 
Azure
+   * @return The list of paths present in the directory
+   */
+  std::vector<Azure::Storage::Files::DataLake::Models::PathItem> 
listDirectory(const ListAzureDataLakeStorageParameters& params) override;
+
  private:
   class AzureDataLakeStorageInputStream : public io::InputStream {
    public:
@@ -84,7 +92,8 @@ class AzureDataLakeStorageClient : public 
DataLakeStorageClient {
   };
 
   void resetClientIfNeeded(const AzureStorageCredentials& credentials, const 
std::string& file_system_name, std::optional<uint64_t> number_of_retries);
-  Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const 
AzureDataLakeStorageParameters& params);
+  Azure::Storage::Files::DataLake::DataLakeDirectoryClient 
getDirectoryClient(const AzureDataLakeStorageParameters& params);
+  Azure::Storage::Files::DataLake::DataLakeFileClient getFileClient(const 
AzureDataLakeStorageFileOperationParameters& params);
 
   AzureStorageCredentials credentials_;
   std::string file_system_name_;
diff --git a/extensions/azure/storage/DataLakeStorageClient.h 
b/extensions/azure/storage/DataLakeStorageClient.h
index eb57d62..f804168 100644
--- a/extensions/azure/storage/DataLakeStorageClient.h
+++ b/extensions/azure/storage/DataLakeStorageClient.h
@@ -22,39 +22,58 @@
 #include <string>
 #include <optional>
 #include <memory>
+#include <vector>
+#include <regex>
 
 #include "AzureStorageCredentials.h"
 
 #include "utils/gsl.h"
 #include "io/InputStream.h"
+#include "azure/storage/files/datalake/protocol/datalake_rest_client.hpp"
+#include "utils/Enum.h"
 
 namespace org::apache::nifi::minifi::azure::storage {
 
+SMART_ENUM(EntityTracking,
+  (NONE, "none"),
+  (TIMESTAMPS, "timestamps")
+)
+
 struct AzureDataLakeStorageParameters {
   AzureStorageCredentials credentials;
   std::string file_system_name;
   std::string directory_name;
-  std::string filename;
   std::optional<uint64_t> number_of_retries;
 };
 
-struct PutAzureDataLakeStorageParameters : public 
AzureDataLakeStorageParameters {
+struct AzureDataLakeStorageFileOperationParameters : public 
AzureDataLakeStorageParameters {
+  std::string filename;
+};
+
+struct PutAzureDataLakeStorageParameters : public 
AzureDataLakeStorageFileOperationParameters {
   bool replace_file = false;
 };
 
-using DeleteAzureDataLakeStorageParameters = AzureDataLakeStorageParameters;
+using DeleteAzureDataLakeStorageParameters = 
AzureDataLakeStorageFileOperationParameters;
 
-struct FetchAzureDataLakeStorageParameters : public 
AzureDataLakeStorageParameters {
+struct FetchAzureDataLakeStorageParameters : public 
AzureDataLakeStorageFileOperationParameters {
   std::optional<uint64_t> range_start;
   std::optional<uint64_t> range_length;
 };
 
+struct ListAzureDataLakeStorageParameters : public 
AzureDataLakeStorageParameters {
+  bool recurse_subdirectories = true;
+  std::optional<std::regex> path_regex;
+  std::optional<std::regex> file_regex;
+};
+
 class DataLakeStorageClient {
  public:
   virtual bool createFile(const PutAzureDataLakeStorageParameters& params) = 0;
   virtual std::string uploadFile(const PutAzureDataLakeStorageParameters& 
params, gsl::span<const uint8_t> buffer) = 0;
   virtual bool deleteFile(const DeleteAzureDataLakeStorageParameters& params) 
= 0;
   virtual std::unique_ptr<io::InputStream> fetchFile(const 
FetchAzureDataLakeStorageParameters& params) = 0;
+  virtual std::vector<Azure::Storage::Files::DataLake::Models::PathItem> 
listDirectory(const ListAzureDataLakeStorageParameters& params) = 0;
   virtual ~DataLakeStorageClient() = default;
 };
 
diff --git a/libminifi/include/utils/GeneralUtils.h 
b/libminifi/include/utils/GeneralUtils.h
index 5d8def7..002a7c9 100644
--- a/libminifi/include/utils/GeneralUtils.h
+++ b/libminifi/include/utils/GeneralUtils.h
@@ -16,8 +16,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef LIBMINIFI_INCLUDE_UTILS_GENERALUTILS_H_
-#define LIBMINIFI_INCLUDE_UTILS_GENERALUTILS_H_
+#pragma once
 
 #include <memory>
 #include <type_traits>
@@ -25,11 +24,7 @@
 
 #include "gsl.h"
 
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace utils {
+namespace org::apache::nifi::minifi::utils {
 
 template<typename T, typename = typename 
std::enable_if<std::is_integral<T>::value>::type>
 constexpr T intdiv_ceil(T numerator, T denominator) {
@@ -111,10 +106,6 @@ template<typename T>
 using remove_cvref_t = typename std::remove_cv<typename 
std::remove_reference<T>::type>::type;
 #endif
 
-}  // namespace utils
-}  // namespace minifi
-}  // namespace nifi
-}  // namespace apache
-}  // namespace org
+inline constexpr bool implies(bool a, bool b) noexcept { return !a || b; }
 
-#endif  // LIBMINIFI_INCLUDE_UTILS_GENERALUTILS_H_
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/include/utils/ListingStateManager.h 
b/libminifi/include/utils/ListingStateManager.h
new file mode 100644
index 0000000..280155a
--- /dev/null
+++ b/libminifi/include/utils/ListingStateManager.h
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <memory>
+#include <chrono>
+#include <utility>
+
+#include "core/CoreComponentState.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+class ListedObject {
+ public:
+  [[nodiscard]] virtual std::chrono::time_point<std::chrono::system_clock> 
getLastModified() const = 0;
+  [[nodiscard]] virtual std::string getKey() const = 0;
+  virtual ~ListedObject() = default;
+};
+
+struct ListingState {
+  [[nodiscard]] bool wasObjectListedAlready(const ListedObject 
&object_attributes) const;
+  void updateState(const ListedObject &object_attributes);
+  uint64_t getListedKeyTimeStampInMilliseconds() const;
+
+  std::chrono::time_point<std::chrono::system_clock> listed_key_timestamp;
+  std::unordered_set<std::string> listed_keys;
+};
+
+class ListingStateManager {
+ public:
+  explicit 
ListingStateManager(std::shared_ptr<core::CoreComponentStateManager> 
state_manager)
+    : state_manager_(std::move(state_manager)) {
+  }
+
+  [[nodiscard]] ListingState getCurrentState() const;
+  void storeState(const ListingState &latest_listing_state);
+
+ private:
+  static const std::string LATEST_LISTED_OBJECT_PREFIX;
+  static const std::string LATEST_LISTED_OBJECT_TIMESTAMP;
+
+  [[nodiscard]] static uint64_t 
getLatestListedKeyTimestampInMilliseconds(const std::unordered_map<std::string, 
std::string> &state);
+  [[nodiscard]] static std::unordered_set<std::string> 
getLatestListedKeys(const std::unordered_map<std::string, std::string> &state);
+
+  std::shared_ptr<core::CoreComponentStateManager> state_manager_;
+  const std::string timestamp_key_;
+  const std::string listed_object_prefix_;
+  std::shared_ptr<core::logging::Logger> 
logger_{core::logging::LoggerFactory<ListingStateManager>::getLogger()};
+};
+
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/src/utils/ListingStateManager.cpp 
b/libminifi/src/utils/ListingStateManager.cpp
new file mode 100644
index 0000000..385709b
--- /dev/null
+++ b/libminifi/src/utils/ListingStateManager.cpp
@@ -0,0 +1,100 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ListingStateManager.h"
+
+#include "core/Property.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+const std::string ListingStateManager::LATEST_LISTED_OBJECT_PREFIX = 
"listed_key.";
+const std::string ListingStateManager::LATEST_LISTED_OBJECT_TIMESTAMP = 
"listed_timestamp";
+
+bool ListingState::wasObjectListedAlready(const ListedObject &object) const {
+  return listed_key_timestamp > object.getLastModified() ||
+      (listed_key_timestamp == object.getLastModified() && 
listed_keys.find(object.getKey()) != listed_keys.end());
+}
+
+void ListingState::updateState(const ListedObject &object) {
+  if (listed_key_timestamp < object.getLastModified()) {
+    listed_key_timestamp = object.getLastModified();
+    listed_keys.clear();
+    listed_keys.insert(object.getKey());
+  } else if (listed_key_timestamp == object.getLastModified()) {
+    listed_keys.insert(object.getKey());
+  }
+}
+
+uint64_t ListingState::getListedKeyTimeStampInMilliseconds() const {
+  return listed_key_timestamp.time_since_epoch() / 
std::chrono::milliseconds(1);
+}
+
+uint64_t ListingStateManager::getLatestListedKeyTimestampInMilliseconds(const 
std::unordered_map<std::string, std::string> &state) {
+  std::string stored_listed_key_timestamp_str;
+  auto it = state.find(LATEST_LISTED_OBJECT_TIMESTAMP);
+  if (it != state.end()) {
+    stored_listed_key_timestamp_str = it->second;
+  }
+
+  int64_t stored_listed_key_timestamp = 0;
+  core::Property::StringToInt(stored_listed_key_timestamp_str, 
stored_listed_key_timestamp);
+
+  return stored_listed_key_timestamp;
+}
+
+std::unordered_set<std::string> ListingStateManager::getLatestListedKeys(const 
std::unordered_map<std::string, std::string> &state) {
+  std::unordered_set<std::string> latest_listed_keys;
+  for (const auto& kvp : state) {
+    if (kvp.first.rfind(LATEST_LISTED_OBJECT_PREFIX, 0) == 0) {
+      latest_listed_keys.insert(kvp.second);
+    }
+  }
+  return latest_listed_keys;
+}
+
+ListingState ListingStateManager::getCurrentState() const {
+  ListingState current_listing_state;
+  std::unordered_map<std::string, std::string> state;
+  if (!state_manager_->get(state)) {
+    logger_->log_info("No stored state for listed objects was found");
+    return current_listing_state;
+  }
+
+  auto milliseconds = getLatestListedKeyTimestampInMilliseconds(state);
+  current_listing_state.listed_key_timestamp = 
std::chrono::time_point<std::chrono::system_clock>(std::chrono::milliseconds(milliseconds));
+  logger_->log_debug("Restored previous listed timestamp %lld", milliseconds);
+
+  current_listing_state.listed_keys = getLatestListedKeys(state);
+  return current_listing_state;
+}
+
+void ListingStateManager::storeState(const ListingState &latest_listing_state) 
{
+  std::unordered_map<std::string, std::string> state;
+  state[LATEST_LISTED_OBJECT_TIMESTAMP] = 
std::to_string(latest_listing_state.getListedKeyTimeStampInMilliseconds());
+
+  uint64_t id = 0;
+  for (const auto& key : latest_listing_state.listed_keys) {
+    state[LATEST_LISTED_OBJECT_PREFIX + std::to_string(id)] = key;
+    ++id;
+  }
+
+  logger_->log_debug("Stored new listed timestamp %s", 
state[LATEST_LISTED_OBJECT_TIMESTAMP]);
+  state_manager_->set(state);
+}
+
+}  // namespace org::apache::nifi::minifi::utils
diff --git a/libminifi/test/aws-tests/ListS3Tests.cpp 
b/libminifi/test/aws-tests/ListS3Tests.cpp
index 8ddec26..1792f23 100644
--- a/libminifi/test/aws-tests/ListS3Tests.cpp
+++ b/libminifi/test/aws-tests/ListS3Tests.cpp
@@ -94,7 +94,7 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Test listing without 
versioning", "[awsS3L
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.bucket 
value:" + S3_BUCKET) == S3_OBJECT_COUNT);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest 
value:true") == S3_OBJECT_COUNT);
   
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified 
value:") == S3_OBJECT_COUNT);
-  
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified 
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS)) == S3_OBJECT_COUNT / 
2);
+  
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified 
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS) + "\n") == 
S3_OBJECT_COUNT / 2);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length 
value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version") 
== 0);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass 
value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT);
@@ -122,7 +122,7 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Test listing with 
versioning", "[awsS3List
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest 
value:true") == S3_OBJECT_COUNT);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest 
value:false") == S3_OBJECT_COUNT);
   
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified 
value:") == S3_OBJECT_COUNT * 2);
-  
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified 
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS)) == S3_OBJECT_COUNT);
+  
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified 
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS) + "\n") == 
S3_OBJECT_COUNT);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length 
value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT * 2);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass 
value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT * 2);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") == 
0);
@@ -215,7 +215,7 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Test truncated 
listing without versioning"
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.bucket 
value:" + S3_BUCKET) == S3_OBJECT_COUNT);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest 
value:true") == S3_OBJECT_COUNT);
   
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified 
value:") == S3_OBJECT_COUNT);
-  
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified 
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS)) == S3_OBJECT_COUNT / 
2);
+  
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified 
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS) + "\n") == 
S3_OBJECT_COUNT / 2);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length 
value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version") 
== 0);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass 
value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT);
@@ -241,7 +241,7 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Test truncated 
listing with versioning", "
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest 
value:true") == S3_OBJECT_COUNT);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest 
value:false") == S3_OBJECT_COUNT);
   
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified 
value:") == S3_OBJECT_COUNT * 2);
-  
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified 
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS)) == S3_OBJECT_COUNT);
+  
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified 
value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS) + "\n") == 
S3_OBJECT_COUNT);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length 
value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT * 2);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass 
value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT * 2);
   REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") == 
0);
diff --git a/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp 
b/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp
new file mode 100644
index 0000000..cb96d08
--- /dev/null
+++ b/libminifi/test/azure-tests/ListAzureDataLakeStorageTests.cpp
@@ -0,0 +1,208 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../TestBase.h"
+#include "MockDataLakeStorageClient.h"
+#include "utils/IntegrationTestUtils.h"
+#include "processors/LogAttribute.h"
+#include "processors/ListAzureDataLakeStorage.h"
+#include "controllerservices/AzureStorageCredentialsService.h"
+
+const std::string FILESYSTEM_NAME = "testfilesystem";
+const std::string DIRECTORY_NAME = "testdir";
+const std::string CONNECTION_STRING = "test-connectionstring";
+
+class ListAzureDataLakeStorageTestsFixture {
+ public:
+  ListAzureDataLakeStorageTestsFixture() {
+    LogTestController::getInstance().setDebug<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+    
LogTestController::getInstance().setDebug<minifi::utils::ListingStateManager>();
+    
LogTestController::getInstance().setTrace<minifi::azure::processors::ListAzureDataLakeStorage>();
+
+    // Build MiNiFi processing graph
+    plan_ = test_controller_.createPlan();
+    auto mock_data_lake_storage_client = 
std::make_unique<MockDataLakeStorageClient>();
+    mock_data_lake_storage_client_ptr_ = mock_data_lake_storage_client.get();
+    list_azure_data_lake_storage_ = 
std::shared_ptr<minifi::azure::processors::ListAzureDataLakeStorage>(
+      new 
minifi::azure::processors::ListAzureDataLakeStorage("ListAzureDataLakeStorage", 
utils::Identifier(), std::move(mock_data_lake_storage_client)));
+
+    plan_->addProcessor(list_azure_data_lake_storage_, 
"ListAzureDataLakeStorage", { {"success", "d"} });
+    auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { 
{"success", "d"} }, true);
+    plan_->setProperty(logattribute, 
minifi::processors::LogAttribute::FlowFilesToLog.getName(), "0");
+
+    azure_storage_cred_service_ = 
plan_->addController("AzureStorageCredentialsService", 
"AzureStorageCredentialsService");
+    setDefaultProperties();
+  }
+
+  void setDefaultProperties() {
+    plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::AzureStorageCredentialsService.getName(),
 "AzureStorageCredentialsService");
+    plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::FilesystemName.getName(), 
FILESYSTEM_NAME);
+    plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::DirectoryName.getName(), 
DIRECTORY_NAME);
+    plan_->setProperty(azure_storage_cred_service_, 
minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(),
 CONNECTION_STRING);
+  }
+
+  virtual ~ListAzureDataLakeStorageTestsFixture() {
+    LogTestController::getInstance().reset();
+  }
+
+ protected:
+  TestController test_controller_;
+  std::shared_ptr<TestPlan> plan_;
+  MockDataLakeStorageClient* mock_data_lake_storage_client_ptr_;
+  std::shared_ptr<core::Processor> list_azure_data_lake_storage_;
+  std::shared_ptr<core::controller::ControllerServiceNode> 
azure_storage_cred_service_;
+};
+
+namespace {
+
+using namespace std::chrono_literals;
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Azure storage 
credentials service is empty", "[azureDataLakeStorageParameters]") {
+  plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::AzureStorageCredentialsService.getName(),
 "");
+  REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), 
minifi::Exception);
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Filesystem name is not 
set", "[azureDataLakeStorageParameters]") {
+  plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::FilesystemName.getName(), 
"");
+  REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), 
minifi::Exception);
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  REQUIRE(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid 
or empty!"));
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Connection String is 
empty", "[azureDataLakeStorageParameters]") {
+  plan_->setProperty(azure_storage_cred_service_, 
minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString.getName(),
 "");
+  REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), 
minifi::Exception);
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "List all files every 
time", "[listAzureDataLakeStorage]") {
+  plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::ListingStrategy.getName(), 
toString(minifi::azure::storage::EntityTracking::NONE));
+  plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::RecurseSubdirectories.getName(),
 "false");
+  test_controller_.runSession(plan_, true);
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  auto run_assertions = [this]() {
+    auto passed_params = 
mock_data_lake_storage_client_ptr_->getPassedListParams();
+    CHECK(passed_params.credentials.buildConnectionString() == 
CONNECTION_STRING);
+    CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+    CHECK(passed_params.directory_name == DIRECTORY_NAME);
+    CHECK(passed_params.recurse_subdirectories == false);
+    CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" + 
DIRECTORY_NAME));
+    CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" + 
FILESYSTEM_NAME));
+    CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filePath 
value:testdir/item1.log"));
+    CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filePath 
value:testdir/sub/item2.log"));
+    CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename 
value:item1.log"));
+    CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename 
value:item2.log"));
+    CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:128"));
+    CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:256"));
+    CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag1"));
+    CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag2"));
+  };
+  run_assertions();
+  plan_->reset();
+  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  test_controller_.runSession(plan_, true);
+  run_assertions();
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Do not list same files 
the second time when timestamps are tracked", "[listAzureDataLakeStorage]") {
+  plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::ListingStrategy.getName(), 
toString(minifi::azure::storage::EntityTracking::TIMESTAMPS));
+  plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::RecurseSubdirectories.getName(),
 "false");
+  test_controller_.runSession(plan_, true);
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  auto passed_params = 
mock_data_lake_storage_client_ptr_->getPassedListParams();
+  CHECK(passed_params.credentials.buildConnectionString() == 
CONNECTION_STRING);
+  CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+  CHECK(passed_params.directory_name == DIRECTORY_NAME);
+  CHECK(passed_params.recurse_subdirectories == false);
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" + 
DIRECTORY_NAME));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" + 
FILESYSTEM_NAME));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filePath 
value:testdir/item1.log"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filePath 
value:testdir/sub/item2.log"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename 
value:item1.log"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename 
value:item2.log"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:128"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:256"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag1"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag2"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.lastModified value:" + 
mock_data_lake_storage_client_ptr_->ITEM1_LAST_MODIFIED + "\n"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.lastModified value:" + 
mock_data_lake_storage_client_ptr_->ITEM2_LAST_MODIFIED + "\n"));
+  plan_->reset();
+  
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+  test_controller_.runSession(plan_, true);
+  REQUIRE_FALSE(LogTestController::getInstance().contains("key:azure", 0s, 
0ms));
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Do not list filtered 
files", "[listAzureDataLakeStorage]") {
+  plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::FileFilter.getName(), 
"item1.*g");
+  test_controller_.runSession(plan_, true);
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  auto passed_params = 
mock_data_lake_storage_client_ptr_->getPassedListParams();
+  CHECK(passed_params.credentials.buildConnectionString() == 
CONNECTION_STRING);
+  CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+  CHECK(passed_params.directory_name == DIRECTORY_NAME);
+  CHECK(passed_params.recurse_subdirectories == true);
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" + 
DIRECTORY_NAME));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" + 
FILESYSTEM_NAME));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filePath 
value:testdir/item1.log"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename 
value:item1.log"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:128"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag1"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.lastModified value:" + 
mock_data_lake_storage_client_ptr_->ITEM1_LAST_MODIFIED + "\n"));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:azure.filePath 
value:testdir/sub/item2.log", 0s, 0ms));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:azure.filename 
value:item2.log", 0s, 0ms));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:azure.length 
value:256", 0s, 0ms));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:azure.etag 
value:etag2", 0s, 0ms));
+  
CHECK_FALSE(LogTestController::getInstance().contains("key:azure.lastModified 
value:" + mock_data_lake_storage_client_ptr_->ITEM2_LAST_MODIFIED, 0s, 0ms));
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Do not list filtered 
paths", "[listAzureDataLakeStorage]") {
+  plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::PathFilter.getName(), 
"su.*");
+  test_controller_.runSession(plan_, true);
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
+  auto passed_params = 
mock_data_lake_storage_client_ptr_->getPassedListParams();
+  CHECK(passed_params.credentials.buildConnectionString() == 
CONNECTION_STRING);
+  CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
+  CHECK(passed_params.directory_name == DIRECTORY_NAME);
+  CHECK(passed_params.recurse_subdirectories == true);
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" + 
DIRECTORY_NAME));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" + 
FILESYSTEM_NAME));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filePath 
value:testdir/sub/item2.log"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename 
value:item2.log"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:256"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.etag value:etag2"));
+  CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.lastModified value:" + 
mock_data_lake_storage_client_ptr_->ITEM2_LAST_MODIFIED + "\n"));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:azure.filePath 
value:testdir/item1.log", 0s, 0ms));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:azure.filename 
value:item1.log", 0s, 0ms));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:azure.length 
value:128", 0s, 0ms));
+  CHECK_FALSE(LogTestController::getInstance().contains("key:azure.etag 
value:etag1", 0s, 0ms));
+  
CHECK_FALSE(LogTestController::getInstance().contains("key:azure.lastModified 
value:" + mock_data_lake_storage_client_ptr_->ITEM1_LAST_MODIFIED, 0s, 0ms));
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Throw on invalid file 
filter", "[listAzureDataLakeStorage]") {
+  plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::FileFilter.getName(), 
"(item1][].*g");
+  REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), 
minifi::Exception);
+}
+
+TEST_CASE_METHOD(ListAzureDataLakeStorageTestsFixture, "Throw on invalid path 
filter", "[listAzureDataLakeStorage]") {
+  plan_->setProperty(list_azure_data_lake_storage_, 
minifi::azure::processors::ListAzureDataLakeStorage::PathFilter.getName(), 
"su.([[*");
+  REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), 
minifi::Exception);
+}
+
+}  // namespace
diff --git a/libminifi/test/azure-tests/MockDataLakeStorageClient.h 
b/libminifi/test/azure-tests/MockDataLakeStorageClient.h
index 8969e1e..b100697 100644
--- a/libminifi/test/azure-tests/MockDataLakeStorageClient.h
+++ b/libminifi/test/azure-tests/MockDataLakeStorageClient.h
@@ -31,6 +31,8 @@ class MockDataLakeStorageClient : public 
org::apache::nifi::minifi::azure::stora
  public:
   const std::string PRIMARY_URI = "http://test-uri/file";;
   const std::string FETCHED_DATA = "test azure data for stream";
+  const std::string ITEM1_LAST_MODIFIED = "1631292120000";
+  const std::string ITEM2_LAST_MODIFIED = "1634127120000";
 
   bool createFile(const 
org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters& 
/*params*/) override {
     if (file_creation_error_) {
@@ -81,6 +83,33 @@ class MockDataLakeStorageClient : public 
org::apache::nifi::minifi::azure::stora
     return 
std::make_unique<org::apache::nifi::minifi::io::BufferStream>(buffer_.data(), 
buffer_.size());
   }
 
+  std::vector<Azure::Storage::Files::DataLake::Models::PathItem> 
listDirectory(const 
org::apache::nifi::minifi::azure::storage::ListAzureDataLakeStorageParameters& 
params) override {
+    list_params_ = params;
+    std::vector<Azure::Storage::Files::DataLake::Models::PathItem> result;
+    Azure::Storage::Files::DataLake::Models::PathItem diritem;
+    diritem.IsDirectory = true;
+    diritem.Name = "testdir/";
+
+    Azure::Storage::Files::DataLake::Models::PathItem item1;
+    item1.IsDirectory = false;
+    item1.Name = "testdir/item1.log";
+    item1.LastModified = Azure::DateTime(2021, 9, 10, 16, 42, 0);
+    item1.ETag = "etag1";
+    item1.FileSize = 128;
+
+    Azure::Storage::Files::DataLake::Models::PathItem item2;
+    item2.IsDirectory = false;
+    item2.Name = "testdir/sub/item2.log";
+    item2.LastModified = Azure::DateTime(2021, 10, 13, 12, 12, 0);
+    item2.ETag = "etag2";
+    item2.FileSize = 256;
+
+    result.push_back(diritem);
+    result.push_back(item1);
+    result.push_back(item2);
+    return result;
+  }
+
   void setFileCreation(bool create_file) {
     create_file_ = create_file;
   }
@@ -117,6 +146,10 @@ class MockDataLakeStorageClient : public 
org::apache::nifi::minifi::azure::stora
     return fetch_params_;
   }
 
+  
org::apache::nifi::minifi::azure::storage::ListAzureDataLakeStorageParameters 
getPassedListParams() const {
+    return list_params_;
+  }
+
  private:
   const std::string RETURNED_PRIMARY_URI = "http://test-uri/file?secret-sas";;
   bool create_file_ = true;
@@ -130,4 +163,5 @@ class MockDataLakeStorageClient : public 
org::apache::nifi::minifi::azure::stora
   org::apache::nifi::minifi::azure::storage::PutAzureDataLakeStorageParameters 
put_params_;
   
org::apache::nifi::minifi::azure::storage::DeleteAzureDataLakeStorageParameters 
delete_params_;
   
org::apache::nifi::minifi::azure::storage::FetchAzureDataLakeStorageParameters 
fetch_params_;
+  
org::apache::nifi::minifi::azure::storage::ListAzureDataLakeStorageParameters 
list_params_;
 };

Reply via email to