szaszm commented on a change in pull request #1107: URL: https://github.com/apache/nifi-minifi-cpp/pull/1107#discussion_r724963981
########## File path: libminifi/test/StatefulProcessor.h ########## @@ -0,0 +1,59 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <utility> +#include <vector> +#include "core/Processor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +class StatefulProcessor : public core::Processor { + public: + using core::Processor::Processor; + + using HookType = std::function<void(core::CoreComponentStateManager&)>; + using HookListType = std::vector<HookType>; + + void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) override; + + void onTrigger(const std::shared_ptr<core::ProcessContext>&, const std::shared_ptr<core::ProcessSession>&) override; + + void setHooks(HookType onScheduleHook, HookListType onTriggerHooks); + + bool hasFinishedHooks() const; Review comment: Consider `[[nodiscard]]`. ########## File path: libminifi/test/StatefulProcessor.h ########## @@ -0,0 +1,59 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <utility> +#include <vector> +#include "core/Processor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +class StatefulProcessor : public core::Processor { + public: + using core::Processor::Processor; + + using HookType = std::function<void(core::CoreComponentStateManager&)>; + using HookListType = std::vector<HookType>; Review comment: I think this type alias (`HookListType`) hurts more than it helps readability. The upside is helping readability by not having to read long complex types when they don't matter and providing more specific description of the use case. The downside is not having the type information readily available when it matters and the usage is obvious. I'm neutral on `HookType`: I wouldn't have introduced it myself, but I don't mind it. ########## File path: libminifi/test/integration/StateTransactionalityTests.cpp ########## @@ -0,0 +1,614 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG + +#include <iostream> +#include "IntegrationBase.h" +#include "../StatefulProcessor.h" +#include "../TestBase.h" +#include "utils/IntegrationTestUtils.h" +#include "core/state/ProcessorController.h" + +using org::apache::nifi::minifi::processors::StatefulProcessor; +using org::apache::nifi::minifi::state::ProcessorController; + +namespace { +using LogChecker = std::function<bool()>; + +struct HookCollection { + StatefulProcessor::HookType onScheduleHook_; + StatefulProcessor::HookListType onTriggerHooks_; + LogChecker logChecker_; +}; + +class StatefulIntegrationTest : public IntegrationBase { + public: + explicit StatefulIntegrationTest(std::string testCase, HookCollection hookCollection) + : onScheduleHook_(std::move(hookCollection.onScheduleHook_)) + , onTriggerHooks_(std::move(hookCollection.onTriggerHooks_)) + , logChecker_(hookCollection.logChecker_) + , testCase_(std::move(testCase)) { + } + + void testSetup() override { + LogTestController::getInstance().reset(); + LogTestController::getInstance().setDebug<core::ProcessGroup>(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setDebug<core::ProcessSession>(); + LogTestController::getInstance().setDebug<StatefulIntegrationTest>(); + logger_->log_info("Running test case \"%s\"", testCase_); + } + + void updateProperties(std::shared_ptr<minifi::FlowController> fc) override { + const auto controllerVec = fc->getAllComponents(); + /* This tests depends on a configuration that contains only one StatefulProcessor named statefulProcessor + * (See TestStateTransactionality.yml) + * In this case there are two components in the flowcontroller: first is the controller itself, + * second is the processor that the test uses. + * Added here some assertions to make it clear. In case any of these fail without changing the corresponding yml file, + * that most probably means a breaking change. */ + assert(controllerVec.size() == 2); + assert(controllerVec[0]->getComponentName() == "FlowController"); + assert(controllerVec[1]->getComponentName() == "statefulProcessor"); + + // set hooks + const auto processController = std::dynamic_pointer_cast<ProcessorController>(controllerVec[1]); + assert(processController != nullptr); + statefulProcessor_ = std::dynamic_pointer_cast<StatefulProcessor>(processController->getProcessor()); + assert(statefulProcessor_ != nullptr); + statefulProcessor_->setHooks(onScheduleHook_, onTriggerHooks_); + } + + void runAssertions() override { + using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime; + assert(verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] { + return statefulProcessor_->hasFinishedHooks() && logChecker_(); + })); + } + + private: + const StatefulProcessor::HookType onScheduleHook_; + const StatefulProcessor::HookListType onTriggerHooks_; + const LogChecker logChecker_; + const std::string testCase_; + std::shared_ptr<StatefulProcessor> statefulProcessor_; + std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<StatefulIntegrationTest>::getLogger()}; +}; + +const std::unordered_map<std::string, std::string> exampleState{{"key1", "value1"}, {"key2", "value2"}}; +const std::unordered_map<std::string, std::string> exampleState2{{"key3", "value3"}, {"key4", "value4"}}; + +auto standardLogChecker = [] { + const std::string logs = LogTestController::getInstance().log_output.str(); + const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]"); + const auto warningResult = utils::StringUtils::countOccurrences(logs, "[warning]"); + return errorResult.second == 0 && warningResult.second == 0; +}; + +auto commitAndRollbackWarnings = [] { + const std::string logs = LogTestController::getInstance().log_output.str(); + const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]"); + const auto commitWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Process Session Operation: State manager commit failed.\""); + const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs, + "[warning] Caught Exception during process session rollback: Process Session Operation: State manager rollback failed."); + return errorResult.second == 0 && commitWarningResult.second == 1 && rollbackWarningResult.second == 1; +}; + +auto exceptionRollbackWarnings = [] { + const std::string logs = LogTestController::getInstance().log_output.str(); + const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]"); + const auto exceptionWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Triggering rollback\""); + const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] ProcessSession rollback for statefulProcessor executed"); + return errorResult.second == 0 && exceptionWarningResult.second == 1 && rollbackWarningResult.second == 1; +}; + +const std::unordered_map<std::string, HookCollection> testCasesToHookLists { + {"State_is_recorded_after_committing", { + {}, + { + [] (core::CoreComponentStateManager& stateManager) { + assert(stateManager.set(exampleState)); + }, + [] (core::CoreComponentStateManager& stateManager) { + std::unordered_map<std::string, std::string> state; + assert(stateManager.get(state)); + assert(state == exampleState); + } + }, + standardLogChecker + }}, + {"State_is_discarded_after_rolling_back", { + {}, + { + [](core::CoreComponentStateManager& stateManager) { + assert(stateManager.set(exampleState)); + }, + [](core::CoreComponentStateManager& stateManager) { + assert(stateManager.set(exampleState2)); + throw std::runtime_error("Triggering rollback"); + }, + [](core::CoreComponentStateManager& stateManager) { + std::unordered_map<std::string, std::string> state; + assert(stateManager.get(state)); + assert(state == exampleState); + } + }, + exceptionRollbackWarnings + }}, + { + "Get_in_onSchedule_without_previous_state", { + [](core::CoreComponentStateManager& stateManager) { + std::unordered_map<std::string, std::string> state; + assert(!stateManager.get(state)); + assert(state.empty()); + }, + {}, + standardLogChecker + } + }, + { + "Set_in_onSchedule", { + [](core::CoreComponentStateManager& stateManager) { + assert(stateManager.set(exampleState)); + }, + { + [](core::CoreComponentStateManager& stateManager) { + std::unordered_map<std::string, std::string> state; + assert(stateManager.get(state)); + assert(state == exampleState); + } + }, + standardLogChecker + } + }, + { + "Clear_in_onSchedule", { + [](core::CoreComponentStateManager& stateManager) { + assert(!stateManager.clear()); + assert(stateManager.set(exampleState)); + assert(stateManager.clear()); + }, + { + [](core::CoreComponentStateManager& stateManager) { + std::unordered_map<std::string, std::string> state; + assert(!stateManager.get(state)); + assert(state.empty()); + } + }, + standardLogChecker + }, + }, + { + "Persist_in_onSchedule", { + { + [](core::CoreComponentStateManager& stateManager) { + assert(stateManager.persist()); + } + }, Review comment: We have strange indentation here. ########## File path: libminifi/test/integration/StateTransactionalityTests.cpp ########## @@ -0,0 +1,614 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG + +#include <iostream> +#include "IntegrationBase.h" +#include "../StatefulProcessor.h" +#include "../TestBase.h" +#include "utils/IntegrationTestUtils.h" +#include "core/state/ProcessorController.h" + +using org::apache::nifi::minifi::processors::StatefulProcessor; +using org::apache::nifi::minifi::state::ProcessorController; + +namespace { +using LogChecker = std::function<bool()>; + +struct HookCollection { + StatefulProcessor::HookType onScheduleHook_; + StatefulProcessor::HookListType onTriggerHooks_; + LogChecker logChecker_; +}; + +class StatefulIntegrationTest : public IntegrationBase { + public: + explicit StatefulIntegrationTest(std::string testCase, HookCollection hookCollection) + : onScheduleHook_(std::move(hookCollection.onScheduleHook_)) + , onTriggerHooks_(std::move(hookCollection.onTriggerHooks_)) + , logChecker_(hookCollection.logChecker_) + , testCase_(std::move(testCase)) { + } + + void testSetup() override { + LogTestController::getInstance().reset(); + LogTestController::getInstance().setDebug<core::ProcessGroup>(); + LogTestController::getInstance().setDebug<core::Processor>(); + LogTestController::getInstance().setDebug<core::ProcessSession>(); + LogTestController::getInstance().setDebug<StatefulIntegrationTest>(); + logger_->log_info("Running test case \"%s\"", testCase_); + } + + void updateProperties(std::shared_ptr<minifi::FlowController> fc) override { + const auto controllerVec = fc->getAllComponents(); + /* This tests depends on a configuration that contains only one StatefulProcessor named statefulProcessor + * (See TestStateTransactionality.yml) + * In this case there are two components in the flowcontroller: first is the controller itself, + * second is the processor that the test uses. + * Added here some assertions to make it clear. In case any of these fail without changing the corresponding yml file, + * that most probably means a breaking change. */ + assert(controllerVec.size() == 2); + assert(controllerVec[0]->getComponentName() == "FlowController"); + assert(controllerVec[1]->getComponentName() == "statefulProcessor"); + + // set hooks + const auto processController = std::dynamic_pointer_cast<ProcessorController>(controllerVec[1]); + assert(processController != nullptr); + statefulProcessor_ = std::dynamic_pointer_cast<StatefulProcessor>(processController->getProcessor()); + assert(statefulProcessor_ != nullptr); + statefulProcessor_->setHooks(onScheduleHook_, onTriggerHooks_); + } + + void runAssertions() override { + using org::apache::nifi::minifi::utils::verifyEventHappenedInPollTime; + assert(verifyEventHappenedInPollTime(std::chrono::milliseconds(wait_time_), [&] { + return statefulProcessor_->hasFinishedHooks() && logChecker_(); + })); + } + + private: + const StatefulProcessor::HookType onScheduleHook_; + const StatefulProcessor::HookListType onTriggerHooks_; + const LogChecker logChecker_; + const std::string testCase_; + std::shared_ptr<StatefulProcessor> statefulProcessor_; + std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<StatefulIntegrationTest>::getLogger()}; +}; + +const std::unordered_map<std::string, std::string> exampleState{{"key1", "value1"}, {"key2", "value2"}}; +const std::unordered_map<std::string, std::string> exampleState2{{"key3", "value3"}, {"key4", "value4"}}; + +auto standardLogChecker = [] { + const std::string logs = LogTestController::getInstance().log_output.str(); + const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]"); + const auto warningResult = utils::StringUtils::countOccurrences(logs, "[warning]"); + return errorResult.second == 0 && warningResult.second == 0; +}; + +auto commitAndRollbackWarnings = [] { + const std::string logs = LogTestController::getInstance().log_output.str(); + const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]"); + const auto commitWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Process Session Operation: State manager commit failed.\""); + const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs, + "[warning] Caught Exception during process session rollback: Process Session Operation: State manager rollback failed."); + return errorResult.second == 0 && commitWarningResult.second == 1 && rollbackWarningResult.second == 1; +}; + +auto exceptionRollbackWarnings = [] { + const std::string logs = LogTestController::getInstance().log_output.str(); + const auto errorResult = utils::StringUtils::countOccurrences(logs, "[error]"); + const auto exceptionWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] Caught \"Triggering rollback\""); + const auto rollbackWarningResult = utils::StringUtils::countOccurrences(logs, "[warning] ProcessSession rollback for statefulProcessor executed"); + return errorResult.second == 0 && exceptionWarningResult.second == 1 && rollbackWarningResult.second == 1; +}; + +const std::unordered_map<std::string, HookCollection> testCasesToHookLists { + {"State_is_recorded_after_committing", { + {}, + { + [] (core::CoreComponentStateManager& stateManager) { + assert(stateManager.set(exampleState)); + }, + [] (core::CoreComponentStateManager& stateManager) { + std::unordered_map<std::string, std::string> state; + assert(stateManager.get(state)); + assert(state == exampleState); + } + }, + standardLogChecker + }}, + {"State_is_discarded_after_rolling_back", { + {}, + { + [](core::CoreComponentStateManager& stateManager) { + assert(stateManager.set(exampleState)); + }, + [](core::CoreComponentStateManager& stateManager) { + assert(stateManager.set(exampleState2)); + throw std::runtime_error("Triggering rollback"); + }, + [](core::CoreComponentStateManager& stateManager) { + std::unordered_map<std::string, std::string> state; + assert(stateManager.get(state)); + assert(state == exampleState); + } + }, + exceptionRollbackWarnings + }}, + { + "Get_in_onSchedule_without_previous_state", { + [](core::CoreComponentStateManager& stateManager) { + std::unordered_map<std::string, std::string> state; + assert(!stateManager.get(state)); + assert(state.empty()); + }, + {}, + standardLogChecker + } + }, + { + "Set_in_onSchedule", { + [](core::CoreComponentStateManager& stateManager) { + assert(stateManager.set(exampleState)); + }, + { + [](core::CoreComponentStateManager& stateManager) { + std::unordered_map<std::string, std::string> state; + assert(stateManager.get(state)); + assert(state == exampleState); + } + }, + standardLogChecker + } + }, + { + "Clear_in_onSchedule", { + [](core::CoreComponentStateManager& stateManager) { + assert(!stateManager.clear()); + assert(stateManager.set(exampleState)); + assert(stateManager.clear()); + }, + { + [](core::CoreComponentStateManager& stateManager) { + std::unordered_map<std::string, std::string> state; + assert(!stateManager.get(state)); + assert(state.empty()); + } + }, + standardLogChecker + }, + }, + { + "Persist_in_onSchedule", { + { + [](core::CoreComponentStateManager& stateManager) { + assert(stateManager.persist()); + } + }, + {}, + standardLogChecker + } + }, + { + "Manual_beginTransaction", { + {}, + { + [](core::CoreComponentStateManager& stateManager) { + assert(!stateManager.beginTransaction()); + } + }, Review comment: If it's indented, it should be 2 spaces deeper, not 1. This is not the only occurrence of this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
