bakaid commented on a change in pull request #605: MINIFICPP-550 - Implement RocksDB controller service and component st… URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r393554128
########## File path: extensions/sftp/processors/ListSFTP.cpp ########## @@ -760,140 +713,83 @@ void ListSFTP::listByTrackingTimestamps( } } -bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) { - std::ofstream file(tracking_entities_state_filename_); - if (!file.is_open()) { - logger_->log_error("Failed to store Tracking Entities state to state file \"%s\"", tracking_entities_state_filename_.c_str()); - return false; - } - file << "hostname=" << hostname << "\n"; - file << "username=" << username << "\n"; - file << "remote_path=" << remote_path << "\n"; - file << "json_state_file=" << tracking_entities_state_json_filename_ << "\n"; - file.close(); - - std::ofstream json_file(tracking_entities_state_json_filename_); - if (!json_file.is_open()) { - logger_->log_error("Failed to store Tracking Entities state to state json file \"%s\"", tracking_entities_state_json_filename_.c_str()); - return false; - } - - rapidjson::Document entities(rapidjson::kObjectType); - rapidjson::Document::AllocatorType& alloc = entities.GetAllocator(); - for (const auto& already_listed_entity : already_listed_entities_) { - rapidjson::Value entity(rapidjson::kObjectType); - entity.AddMember("timestamp", already_listed_entity.second.timestamp, alloc); - entity.AddMember("size", already_listed_entity.second.size, alloc); - entities.AddMember(rapidjson::Value(already_listed_entity.first.c_str(), alloc), std::move(entity), alloc); +bool ListSFTP::persistTrackingEntitiesCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path) { + std::unordered_map<std::string, std::string> state; + state["listing_strategy"] = listing_strategy_; + state["hostname"] = hostname; + state["username"] = username; + state["remote_path"] = remote_path; + size_t i = 0; + for (const auto &already_listed_entity : already_listed_entities_) { + state["entity." + std::to_string(i) + ".name"] = already_listed_entity.first; + state["entity." + std::to_string(i) + ".timestamp"] = std::to_string(already_listed_entity.second.timestamp); + state["entity." + std::to_string(i) + ".size"] = std::to_string(already_listed_entity.second.size); + ++i; } - - rapidjson::OStreamWrapper osw(json_file); - rapidjson::Writer<rapidjson::OStreamWrapper> writer(osw); - entities.Accept(writer); - - return true; + return state_manager_->set(state); } -bool ListSFTP::updateFromTrackingEntitiesCache(const std::string& hostname, const std::string& username, const std::string& remote_path) { - std::ifstream file(tracking_entities_state_filename_); - if (!file.is_open()) { - logger_->log_error("Failed to open Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str()); - return false; - } +bool ListSFTP::updateFromTrackingEntitiesCache(const std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, const std::string& username, const std::string& remote_path) { + std::string state_listing_strategy; std::string state_hostname; std::string state_username; std::string state_remote_path; - std::string state_json_state_file; - - std::string line; - while (std::getline(file, line)) { - size_t separator_pos = line.find('='); - if (separator_pos == std::string::npos) { - logger_->log_warn("None key-value line found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), line.c_str()); - continue; - } - std::string key = line.substr(0, separator_pos); - std::string value = line.substr(separator_pos + 1); - if (key == "hostname") { - state_hostname = std::move(value); - } else if (key == "username") { - state_username = std::move(value); - } else if (key == "remote_path") { - state_remote_path = std::move(value); - } else if (key == "json_state_file") { - state_json_state_file = std::move(value); - } else { - logger_->log_warn("Unknown key found in Tracking Entities state file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), key.c_str()); - } - } - file.close(); - - if (state_hostname != hostname || - state_username != username || - state_remote_path != remote_path) { - logger_->log_error("Tracking Entities state file \"%s\" was created with different settings than the current ones, ignoring. " - "Hostname: \"%s\" vs. \"%s\", " - "Username: \"%s\" vs. \"%s\", " - "Remote Path: \"%s\" vs. \"%s\"", - tracking_entities_state_filename_.c_str(), - state_hostname, hostname, - state_username, username, - state_remote_path, remote_path); - return false; - } + std::unordered_map<std::string, ListedEntity> new_already_listed_entities; - if (state_json_state_file.empty()) { - logger_->log_error("Could not found json state file path in Tracking Entities state file \"%s\"", tracking_entities_state_filename_.c_str()); + std::unordered_map<std::string, std::string> state_map; + if (!state_manager_->get(state_map)) { + logger_->log_debug("Failed to get state from StateManager"); Review comment: If the processor has no stored state yet, `state_manager_->get` will return with false. It could be modified to return an empty map, but I think there is value in having a distinction between "we have already stored a state, we just have nothing to store" and "we have no stored state yet". ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services