lordgamez commented on a change in pull request #1242:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1242#discussion_r790783628
##########
File path: extensions/azure/storage/AzureDataLakeStorage.h
##########
@@ -24,11 +24,12 @@
#include <memory>
#include <optional>
#include <utility>
+#include <string_view>
#include "core/logging/Logger.h"
#include "core/logging/LoggerConfiguration.h"
#include "DataLakeStorageClient.h"
-#include "azure/core/io/body_stream.hpp"
+#include "utils/ListingStateUtils.h"
Review comment:
Updated in ef0172d119c472a1c91a10aecbe531a112ade2bf
I agree, usually I try to avoid names like *Manager, *Handler and so on, but
I think in this case it is okay as the `ListingStateManager` is already a
specific use case of the `CoreComponentStateManager` so at least there is a
connection in the names.
##########
File path: extensions/aws/s3/S3Wrapper.h
##########
@@ -177,7 +179,15 @@ struct ListRequestParameters : public RequestParameters {
uint64_t min_object_age = 0;
};
-struct ListedObjectAttributes {
+struct ListedObjectAttributes : public minifi::utils::ListedObject {
+ uint64_t getLastModified() const override {
+ return gsl::narrow<uint64_t>(last_modified);
+ }
Review comment:
Updated millisecond counters to time_points in
ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: libminifi/include/utils/ListingStateUtils.h
##########
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <memory>
+
+#include "core/CoreComponentState.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+class ListedObject {
+ public:
+ virtual uint64_t getLastModified() const = 0;
+ virtual std::string getKey() const = 0;
+ virtual ~ListedObject() = default;
+};
+
+struct ListingState {
+ bool wasObjectListedAlready(const ListedObject &object_attributes) const;
+ void updateState(const ListedObject &object_attributes);
+
+ uint64_t listed_key_timestamp = 0;
+ std::unordered_set<std::string> listed_keys;
+};
+
+class ListingStateManager {
+ public:
+ explicit ListingStateManager(const
std::shared_ptr<core::CoreComponentStateManager>& state_manager)
+ : state_manager_(state_manager) {
+ }
+
+ ListingState getCurrentState() const;
Review comment:
Updated in ef0172d119c472a1c91a10aecbe531a112ade2bf. Also for some
reason I thought we already had a clang tidy checker target, or at least a
ticket for it, but it seems I was mistaken, so created a Jira ticket for it:
https://issues.apache.org/jira/browse/MINIFICPP-1739
##########
File path: libminifi/include/utils/ListingStateUtils.h
##########
@@ -0,0 +1,69 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <memory>
+
+#include "core/CoreComponentState.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+class ListedObject {
+ public:
+ virtual uint64_t getLastModified() const = 0;
+ virtual std::string getKey() const = 0;
+ virtual ~ListedObject() = default;
+};
+
+struct ListingState {
+ bool wasObjectListedAlready(const ListedObject &object_attributes) const;
+ void updateState(const ListedObject &object_attributes);
+
+ uint64_t listed_key_timestamp = 0;
+ std::unordered_set<std::string> listed_keys;
+};
+
+class ListingStateManager {
+ public:
+ explicit ListingStateManager(const
std::shared_ptr<core::CoreComponentStateManager>& state_manager)
+ : state_manager_(state_manager) {
Review comment:
Updated in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/storage/AzureDataLakeStorage.h
##########
@@ -43,15 +44,38 @@ struct UploadDataLakeStorageResult {
std::string primary_uri;
};
+struct ListDataLakeStorageElement : public minifi::utils::ListedObject {
+ std::string filesystem;
+ std::string file_path;
+ std::string directory;
+ std::string filename;
+ uint64_t length = 0;
+ uint64_t last_modified = 0;
+ std::string etag;
+
+ uint64_t getLastModified() const override {
+ return last_modified;
+ }
Review comment:
Updated in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.h
##########
@@ -0,0 +1,73 @@
+/**
+ * @file ListAzureDataLakeStorage.h
+ * ListAzureDataLakeStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+class ListAzureDataLakeStorageTestsFixture;
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class ListAzureDataLakeStorage final : public
AzureDataLakeStorageProcessorBase {
+ public:
+ EXTENSIONAPI static const core::Property RecurseSubdirectories;
+ EXTENSIONAPI static const core::Property FileFilter;
+ EXTENSIONAPI static const core::Property PathFilter;
+ EXTENSIONAPI static const core::Property ListingStrategy;
+
+ // Supported Relationships
+ static const core::Relationship Success;
Review comment:
Removed in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.cpp
##########
@@ -0,0 +1,157 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+ core::PropertyBuilder::createProperty("Recurse Subdirectories")
+ ->isRequired(true)
+ ->withDefaultValue<bool>(true)
+ ->withDescription("Indicates whether to list files from subdirectories
of the directory")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+ core::PropertyBuilder::createProperty("File Filter")
+ ->withDescription("Only files whose names match the given regular
expression will be listed")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+ core::PropertyBuilder::createProperty("Path Filter")
+ ->withDescription("When 'Recurse Subdirectories' is true, then only
subdirectories whose paths match the given regular expression will be scanned")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+ core::PropertyBuilder::createProperty("Listing Strategy")
+ ->withDescription("Specify how to determine new/updated entities. If
'timestamps' is selected it tracks the latest timestamp of listed entity to "
+ "determine new/updated entities. If 'none' is selected
it lists an entity without any tracking, the same entity will be listed each
time on executing this processor.")
+
->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+ ->withAllowableValues<std::string>(storage::EntityTracking::values())
+ ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All
FlowFiles that are received are routed to success");
+
+void ListAzureDataLakeStorage::initialize() {
+ // Set supported properties
+ setSupportedProperties({
+ AzureStorageCredentialsService,
+ FilesystemName,
+ DirectoryName,
+ RecurseSubdirectories,
+ FileFilter,
+ PathFilter,
+ ListingStrategy
+ });
+ // Set the supported relationships
+ setSupportedRelationships({
+ Success
+ });
+}
+
+void ListAzureDataLakeStorage::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
+
+ auto state_manager = context->getStateManager();
+ if (state_manager == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+ }
+ state_manager_ =
std::make_unique<minifi::utils::ListingStateManager>(state_manager);
+
+ auto params = buildListParameters(*context);
+ if (!params) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required parameters for
ListAzureDataLakeStorage processor are missing or invalid");
+ }
+
+ list_parameters_ = *params;
+ tracking_strategy_ = storage::EntityTracking::parse(
+ utils::parsePropertyWithAllowableValuesOrThrow(*context,
ListingStrategy.getName(), storage::EntityTracking::values()).c_str());
+}
+
+std::optional<storage::ListAzureDataLakeStorageParameters>
ListAzureDataLakeStorage::buildListParameters(core::ProcessContext& context) {
+ storage::ListAzureDataLakeStorageParameters params;
+ if (!setCommonParameters(params, context, nullptr)) {
+ return std::nullopt;
+ }
+
+ if (!context.getProperty(RecurseSubdirectories.getName(),
params.recurse_subdirectories)) {
+ logger_->log_error("Recurse Subdirectories property missing or invalid");
+ return std::nullopt;
+ }
+
+ context.getProperty(FileFilter.getName(), params.file_filter);
+ context.getProperty(PathFilter.getName(), params.path_filter);
+
+ return params;
+}
+
+void ListAzureDataLakeStorage::createNewFlowFile(core::ProcessSession
&session, const storage::ListDataLakeStorageElement &element) {
Review comment:
Moved to anonymous namespace in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.cpp
##########
@@ -0,0 +1,157 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+ core::PropertyBuilder::createProperty("Recurse Subdirectories")
+ ->isRequired(true)
+ ->withDefaultValue<bool>(true)
+ ->withDescription("Indicates whether to list files from subdirectories
of the directory")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+ core::PropertyBuilder::createProperty("File Filter")
+ ->withDescription("Only files whose names match the given regular
expression will be listed")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+ core::PropertyBuilder::createProperty("Path Filter")
+ ->withDescription("When 'Recurse Subdirectories' is true, then only
subdirectories whose paths match the given regular expression will be scanned")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+ core::PropertyBuilder::createProperty("Listing Strategy")
+ ->withDescription("Specify how to determine new/updated entities. If
'timestamps' is selected it tracks the latest timestamp of listed entity to "
+ "determine new/updated entities. If 'none' is selected
it lists an entity without any tracking, the same entity will be listed each
time on executing this processor.")
+
->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+ ->withAllowableValues<std::string>(storage::EntityTracking::values())
+ ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All
FlowFiles that are received are routed to success");
+
+void ListAzureDataLakeStorage::initialize() {
+ // Set supported properties
+ setSupportedProperties({
+ AzureStorageCredentialsService,
+ FilesystemName,
+ DirectoryName,
+ RecurseSubdirectories,
+ FileFilter,
+ PathFilter,
+ ListingStrategy
+ });
+ // Set the supported relationships
+ setSupportedRelationships({
+ Success
+ });
+}
+
+void ListAzureDataLakeStorage::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
+
+ auto state_manager = context->getStateManager();
+ if (state_manager == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+ }
+ state_manager_ =
std::make_unique<minifi::utils::ListingStateManager>(state_manager);
+
+ auto params = buildListParameters(*context);
+ if (!params) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required parameters for
ListAzureDataLakeStorage processor are missing or invalid");
+ }
+
+ list_parameters_ = *params;
Review comment:
Updated in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.h
##########
@@ -0,0 +1,73 @@
+/**
+ * @file ListAzureDataLakeStorage.h
+ * ListAzureDataLakeStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+class ListAzureDataLakeStorageTestsFixture;
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class ListAzureDataLakeStorage final : public
AzureDataLakeStorageProcessorBase {
+ public:
+ EXTENSIONAPI static const core::Property RecurseSubdirectories;
+ EXTENSIONAPI static const core::Property FileFilter;
+ EXTENSIONAPI static const core::Property PathFilter;
+ EXTENSIONAPI static const core::Property ListingStrategy;
+
+ // Supported Relationships
+ static const core::Relationship Success;
+
+ explicit ListAzureDataLakeStorage(const std::string& name, const
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+ : AzureDataLakeStorageProcessorBase(name, uuid,
core::logging::LoggerFactory<ListAzureDataLakeStorage>::getLogger()) {
+ }
+
+ ~ListAzureDataLakeStorage() override = default;
+
+ void initialize() override;
+ void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+ friend class ::ListAzureDataLakeStorageTestsFixture;
+
+ core::annotation::Input getInputRequirement() const override {
+ return core::annotation::Input::INPUT_FORBIDDEN;
+ }
+
+ explicit ListAzureDataLakeStorage(const std::string& name, const
minifi::utils::Identifier& uuid,
std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+ : AzureDataLakeStorageProcessorBase(name, uuid,
core::logging::LoggerFactory<ListAzureDataLakeStorage>::getLogger(),
std::move(data_lake_storage_client)) {
+ }
+
+ std::optional<storage::ListAzureDataLakeStorageParameters>
buildListParameters(core::ProcessContext& context);
+ void createNewFlowFile(core::ProcessSession &session, const
storage::ListDataLakeStorageElement &element);
+
+ bool recurse_subdirectories_ = true;
+ storage::EntityTracking tracking_strategy_ =
storage::EntityTracking::TIMESTAMPS;
+ storage::ListAzureDataLakeStorageParameters list_parameters_;
+ std::unique_ptr<minifi::utils::ListingStateManager> state_manager_ = nullptr;
Review comment:
Updated in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.h
##########
@@ -0,0 +1,73 @@
+/**
+ * @file ListAzureDataLakeStorage.h
+ * ListAzureDataLakeStorage class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "AzureDataLakeStorageProcessorBase.h"
+
+class ListAzureDataLakeStorageTestsFixture;
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+class ListAzureDataLakeStorage final : public
AzureDataLakeStorageProcessorBase {
+ public:
+ EXTENSIONAPI static const core::Property RecurseSubdirectories;
+ EXTENSIONAPI static const core::Property FileFilter;
+ EXTENSIONAPI static const core::Property PathFilter;
+ EXTENSIONAPI static const core::Property ListingStrategy;
+
+ // Supported Relationships
+ static const core::Relationship Success;
+
+ explicit ListAzureDataLakeStorage(const std::string& name, const
minifi::utils::Identifier& uuid = minifi::utils::Identifier())
+ : AzureDataLakeStorageProcessorBase(name, uuid,
core::logging::LoggerFactory<ListAzureDataLakeStorage>::getLogger()) {
+ }
+
+ ~ListAzureDataLakeStorage() override = default;
+
+ void initialize() override;
+ void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override;
+ void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const
std::shared_ptr<core::ProcessSession> &session) override;
+
+ private:
+ friend class ::ListAzureDataLakeStorageTestsFixture;
+
+ core::annotation::Input getInputRequirement() const override {
+ return core::annotation::Input::INPUT_FORBIDDEN;
+ }
+
+ explicit ListAzureDataLakeStorage(const std::string& name, const
minifi::utils::Identifier& uuid,
std::unique_ptr<storage::DataLakeStorageClient> data_lake_storage_client)
+ : AzureDataLakeStorageProcessorBase(name, uuid,
core::logging::LoggerFactory<ListAzureDataLakeStorage>::getLogger(),
std::move(data_lake_storage_client)) {
+ }
+
+ std::optional<storage::ListAzureDataLakeStorageParameters>
buildListParameters(core::ProcessContext& context);
+ void createNewFlowFile(core::ProcessSession &session, const
storage::ListDataLakeStorageElement &element);
+
+ bool recurse_subdirectories_ = true;
Review comment:
Good catch, removed in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.cpp
##########
@@ -0,0 +1,157 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+ core::PropertyBuilder::createProperty("Recurse Subdirectories")
+ ->isRequired(true)
+ ->withDefaultValue<bool>(true)
+ ->withDescription("Indicates whether to list files from subdirectories
of the directory")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+ core::PropertyBuilder::createProperty("File Filter")
+ ->withDescription("Only files whose names match the given regular
expression will be listed")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+ core::PropertyBuilder::createProperty("Path Filter")
+ ->withDescription("When 'Recurse Subdirectories' is true, then only
subdirectories whose paths match the given regular expression will be scanned")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+ core::PropertyBuilder::createProperty("Listing Strategy")
+ ->withDescription("Specify how to determine new/updated entities. If
'timestamps' is selected it tracks the latest timestamp of listed entity to "
+ "determine new/updated entities. If 'none' is selected
it lists an entity without any tracking, the same entity will be listed each
time on executing this processor.")
+
->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+ ->withAllowableValues<std::string>(storage::EntityTracking::values())
+ ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All
FlowFiles that are received are routed to success");
+
+void ListAzureDataLakeStorage::initialize() {
+ // Set supported properties
+ setSupportedProperties({
+ AzureStorageCredentialsService,
+ FilesystemName,
+ DirectoryName,
+ RecurseSubdirectories,
+ FileFilter,
+ PathFilter,
+ ListingStrategy
+ });
+ // Set the supported relationships
+ setSupportedRelationships({
+ Success
+ });
+}
+
+void ListAzureDataLakeStorage::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
+
+ auto state_manager = context->getStateManager();
+ if (state_manager == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+ }
+ state_manager_ =
std::make_unique<minifi::utils::ListingStateManager>(state_manager);
+
+ auto params = buildListParameters(*context);
+ if (!params) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required parameters for
ListAzureDataLakeStorage processor are missing or invalid");
+ }
+
+ list_parameters_ = *params;
+ tracking_strategy_ = storage::EntityTracking::parse(
+ utils::parsePropertyWithAllowableValuesOrThrow(*context,
ListingStrategy.getName(), storage::EntityTracking::values()).c_str());
+}
+
+std::optional<storage::ListAzureDataLakeStorageParameters>
ListAzureDataLakeStorage::buildListParameters(core::ProcessContext& context) {
+ storage::ListAzureDataLakeStorageParameters params;
+ if (!setCommonParameters(params, context, nullptr)) {
+ return std::nullopt;
+ }
+
+ if (!context.getProperty(RecurseSubdirectories.getName(),
params.recurse_subdirectories)) {
+ logger_->log_error("Recurse Subdirectories property missing or invalid");
+ return std::nullopt;
+ }
+
+ context.getProperty(FileFilter.getName(), params.file_filter);
+ context.getProperty(PathFilter.getName(), params.path_filter);
+
+ return params;
+}
+
+void ListAzureDataLakeStorage::createNewFlowFile(core::ProcessSession
&session, const storage::ListDataLakeStorageElement &element) {
+ auto flow_file = session.create();
+ session.putAttribute(flow_file, "azure.filesystem", element.filesystem);
+ session.putAttribute(flow_file, "azure.filePath", element.file_path);
+ session.putAttribute(flow_file, "azure.directory", element.directory);
+ session.putAttribute(flow_file, "azure.filename", element.filename);
+ session.putAttribute(flow_file, "azure.length",
std::to_string(element.length));
+ session.putAttribute(flow_file, "azure.lastModified",
std::to_string(element.last_modified));
+ session.putAttribute(flow_file, "azure.etag", element.etag);
+ session.transfer(flow_file, Success);
Review comment:
Good point, updated in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.cpp
##########
@@ -0,0 +1,157 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+ core::PropertyBuilder::createProperty("Recurse Subdirectories")
+ ->isRequired(true)
+ ->withDefaultValue<bool>(true)
+ ->withDescription("Indicates whether to list files from subdirectories
of the directory")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+ core::PropertyBuilder::createProperty("File Filter")
+ ->withDescription("Only files whose names match the given regular
expression will be listed")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+ core::PropertyBuilder::createProperty("Path Filter")
+ ->withDescription("When 'Recurse Subdirectories' is true, then only
subdirectories whose paths match the given regular expression will be scanned")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+ core::PropertyBuilder::createProperty("Listing Strategy")
+ ->withDescription("Specify how to determine new/updated entities. If
'timestamps' is selected it tracks the latest timestamp of listed entity to "
+ "determine new/updated entities. If 'none' is selected
it lists an entity without any tracking, the same entity will be listed each
time on executing this processor.")
+
->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+ ->withAllowableValues<std::string>(storage::EntityTracking::values())
+ ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All
FlowFiles that are received are routed to success");
+
+void ListAzureDataLakeStorage::initialize() {
+ // Set supported properties
+ setSupportedProperties({
+ AzureStorageCredentialsService,
+ FilesystemName,
+ DirectoryName,
+ RecurseSubdirectories,
+ FileFilter,
+ PathFilter,
+ ListingStrategy
+ });
+ // Set the supported relationships
+ setSupportedRelationships({
+ Success
+ });
+}
+
+void ListAzureDataLakeStorage::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
Review comment:
Added in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.cpp
##########
@@ -0,0 +1,157 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+ core::PropertyBuilder::createProperty("Recurse Subdirectories")
+ ->isRequired(true)
+ ->withDefaultValue<bool>(true)
+ ->withDescription("Indicates whether to list files from subdirectories
of the directory")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+ core::PropertyBuilder::createProperty("File Filter")
+ ->withDescription("Only files whose names match the given regular
expression will be listed")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+ core::PropertyBuilder::createProperty("Path Filter")
+ ->withDescription("When 'Recurse Subdirectories' is true, then only
subdirectories whose paths match the given regular expression will be scanned")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+ core::PropertyBuilder::createProperty("Listing Strategy")
+ ->withDescription("Specify how to determine new/updated entities. If
'timestamps' is selected it tracks the latest timestamp of listed entity to "
+ "determine new/updated entities. If 'none' is selected
it lists an entity without any tracking, the same entity will be listed each
time on executing this processor.")
+
->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+ ->withAllowableValues<std::string>(storage::EntityTracking::values())
+ ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All
FlowFiles that are received are routed to success");
+
+void ListAzureDataLakeStorage::initialize() {
+ // Set supported properties
Review comment:
Removed in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/storage/AzureDataLakeStorage.cpp
##########
@@ -72,4 +77,62 @@ std::optional<uint64_t>
AzureDataLakeStorage::fetchFile(const FetchAzureDataLake
}
}
+bool AzureDataLakeStorage::matchesPathFilter(const std::string&
base_directory, const std::string& path_filter, std::string path) {
+ if (path_filter.empty()) {
+ return true;
+ }
+
+ if (!base_directory.empty()) {
+ gsl_Expects(minifi::utils::StringUtils::startsWith(path, base_directory));
Review comment:
Updated in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/storage/AzureDataLakeStorage.cpp
##########
@@ -72,4 +77,62 @@ std::optional<uint64_t>
AzureDataLakeStorage::fetchFile(const FetchAzureDataLake
}
}
+bool AzureDataLakeStorage::matchesPathFilter(const std::string&
base_directory, const std::string& path_filter, std::string path) {
Review comment:
Due to the regex functions' std::string requirements only 1 parameter
could be changed to std::string_view. Also moved matchers to anonymous
namespace in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/storage/AzureDataLakeStorage.cpp
##########
@@ -72,4 +77,62 @@ std::optional<uint64_t>
AzureDataLakeStorage::fetchFile(const FetchAzureDataLake
}
}
+bool AzureDataLakeStorage::matchesPathFilter(const std::string&
base_directory, const std::string& path_filter, std::string path) {
+ if (path_filter.empty()) {
+ return true;
+ }
+
+ if (!base_directory.empty()) {
+ gsl_Expects(minifi::utils::StringUtils::startsWith(path, base_directory));
+ path = path.size() == base_directory.size() ? "" :
path.substr(base_directory.size() + 1);
+ }
+
+ std::regex pattern(path_filter);
+ return std::regex_match(path, pattern);
+}
+
+bool AzureDataLakeStorage::matchesFileFilter(const std::string& file_filter,
const std::string& filename) {
Review comment:
Updated ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: libminifi/src/utils/ListingStateUtils.cpp
##########
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ListingStateUtils.h"
+
+#include "core/Property.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+const std::string ListingStateManager::LATEST_LISTED_OBJECT_PREFIX =
"listed_key.";
+const std::string ListingStateManager::LATEST_LISTED_OBJECT_TIMESTAMP =
"listed_timestamp";
+
+bool ListingState::wasObjectListedAlready(const ListedObject &object) const {
+ return listed_key_timestamp > object.getLastModified() ||
+ (listed_key_timestamp == object.getLastModified() &&
listed_keys.find(object.getKey()) != listed_keys.end());
+}
+
+void ListingState::updateState(const ListedObject &object) {
+ if (listed_key_timestamp < object.getLastModified()) {
+ listed_key_timestamp = object.getLastModified();
+ listed_keys.clear();
+ listed_keys.insert(object.getKey());
+ } else if (listed_key_timestamp == object.getLastModified()) {
+ listed_keys.insert(object.getKey());
+ }
+}
+
+uint64_t ListingStateManager::getLatestListedKeyTimestamp(const
std::unordered_map<std::string, std::string> &state) const {
Review comment:
Updated in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: libminifi/src/utils/ListingStateUtils.cpp
##########
@@ -0,0 +1,95 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/ListingStateUtils.h"
+
+#include "core/Property.h"
+
+namespace org::apache::nifi::minifi::utils {
+
+const std::string ListingStateManager::LATEST_LISTED_OBJECT_PREFIX =
"listed_key.";
+const std::string ListingStateManager::LATEST_LISTED_OBJECT_TIMESTAMP =
"listed_timestamp";
+
+bool ListingState::wasObjectListedAlready(const ListedObject &object) const {
+ return listed_key_timestamp > object.getLastModified() ||
+ (listed_key_timestamp == object.getLastModified() &&
listed_keys.find(object.getKey()) != listed_keys.end());
+}
+
+void ListingState::updateState(const ListedObject &object) {
+ if (listed_key_timestamp < object.getLastModified()) {
+ listed_key_timestamp = object.getLastModified();
+ listed_keys.clear();
+ listed_keys.insert(object.getKey());
+ } else if (listed_key_timestamp == object.getLastModified()) {
+ listed_keys.insert(object.getKey());
+ }
+}
+
+uint64_t ListingStateManager::getLatestListedKeyTimestamp(const
std::unordered_map<std::string, std::string> &state) const {
+ std::string stored_listed_key_timestamp_str;
+ auto it = state.find(LATEST_LISTED_OBJECT_TIMESTAMP);
+ if (it != state.end()) {
+ stored_listed_key_timestamp_str = it->second;
+ }
+
+ int64_t stored_listed_key_timestamp = 0;
+ core::Property::StringToInt(stored_listed_key_timestamp_str,
stored_listed_key_timestamp);
+
+ return stored_listed_key_timestamp;
+}
+
+std::unordered_set<std::string> ListingStateManager::getLatestListedKeys(const
std::unordered_map<std::string, std::string> &state) const {
Review comment:
Updated in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: PROCESSORS.md
##########
@@ -37,6 +37,7 @@
- [ListSFTP](#listsftp)
- [ListenHTTP](#listenhttp)
- [ListenSyslog](#listensyslog)
+- [ListAzureDataLakeStorage](#listazuredatalakestorage)
Review comment:
Updated in ef0172d119c472a1c91a10aecbe531a112ade2bf
##########
File path: extensions/azure/processors/ListAzureDataLakeStorage.cpp
##########
@@ -0,0 +1,157 @@
+/**
+ * @file ListAzureDataLakeStorage.cpp
+ * ListAzureDataLakeStorage class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ListAzureDataLakeStorage.h"
+
+#include "utils/ProcessorConfigUtils.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::azure::processors {
+
+const core::Property ListAzureDataLakeStorage::RecurseSubdirectories(
+ core::PropertyBuilder::createProperty("Recurse Subdirectories")
+ ->isRequired(true)
+ ->withDefaultValue<bool>(true)
+ ->withDescription("Indicates whether to list files from subdirectories
of the directory")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::FileFilter(
+ core::PropertyBuilder::createProperty("File Filter")
+ ->withDescription("Only files whose names match the given regular
expression will be listed")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::PathFilter(
+ core::PropertyBuilder::createProperty("Path Filter")
+ ->withDescription("When 'Recurse Subdirectories' is true, then only
subdirectories whose paths match the given regular expression will be scanned")
+ ->build());
+
+const core::Property ListAzureDataLakeStorage::ListingStrategy(
+ core::PropertyBuilder::createProperty("Listing Strategy")
+ ->withDescription("Specify how to determine new/updated entities. If
'timestamps' is selected it tracks the latest timestamp of listed entity to "
+ "determine new/updated entities. If 'none' is selected
it lists an entity without any tracking, the same entity will be listed each
time on executing this processor.")
+
->withDefaultValue<std::string>(toString(storage::EntityTracking::TIMESTAMPS))
+ ->withAllowableValues<std::string>(storage::EntityTracking::values())
+ ->build());
+
+const core::Relationship ListAzureDataLakeStorage::Success("success", "All
FlowFiles that are received are routed to success");
+
+void ListAzureDataLakeStorage::initialize() {
+ // Set supported properties
+ setSupportedProperties({
+ AzureStorageCredentialsService,
+ FilesystemName,
+ DirectoryName,
+ RecurseSubdirectories,
+ FileFilter,
+ PathFilter,
+ ListingStrategy
+ });
+ // Set the supported relationships
+ setSupportedRelationships({
+ Success
+ });
+}
+
+void ListAzureDataLakeStorage::onSchedule(const
std::shared_ptr<core::ProcessContext>& context, const
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+ AzureDataLakeStorageProcessorBase::onSchedule(context, sessionFactory);
+
+ auto state_manager = context->getStateManager();
+ if (state_manager == nullptr) {
+ throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+ }
+ state_manager_ =
std::make_unique<minifi::utils::ListingStateManager>(state_manager);
+
+ auto params = buildListParameters(*context);
+ if (!params) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Required parameters for
ListAzureDataLakeStorage processor are missing or invalid");
+ }
+
+ list_parameters_ = *params;
+ tracking_strategy_ = storage::EntityTracking::parse(
+ utils::parsePropertyWithAllowableValuesOrThrow(*context,
ListingStrategy.getName(), storage::EntityTracking::values()).c_str());
Review comment:
Updated in ef0172d119c472a1c91a10aecbe531a112ade2bf
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]