bakaid commented on a change in pull request #605:
URL: https://github.com/apache/nifi-minifi-cpp/pull/605#discussion_r413921343



##########
File path: extensions/sftp/processors/ListSFTP.cpp
##########
@@ -507,85 +468,74 @@ ListSFTP::ListedEntity::ListedEntity(uint64_t timestamp_, 
uint64_t size_)
     , size(size_) {
 }
 
-bool ListSFTP::persistTrackingTimestampsCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to store state to Tracking Timestamps state 
file \"%s\"", tracking_timestamps_state_filename_.c_str());
-    return false;
-  }
-  file << "hostname=" << hostname << "\n";
-  file << "username=" << username << "\n";
-  file << "remote_path=" << remote_path << "\n";
-  file << "listing.timestamp=" << last_listed_latest_entry_timestamp_ << "\n";
-  file << "processed.timestamp=" << last_processed_latest_entry_timestamp_ << 
"\n";
+bool ListSFTP::persistTrackingTimestampsCache(const 
std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::unordered_map<std::string, std::string> state;
+  state["listing_strategy"] = LISTING_STRATEGY_TRACKING_TIMESTAMPS;
+  state["hostname"] = hostname;
+  state["username"] = username;
+  state["remote_path"] = remote_path;
+  state["listing.timestamp"] = 
std::to_string(last_listed_latest_entry_timestamp_);
+  state["processed.timestamp"] = 
std::to_string(last_processed_latest_entry_timestamp_);
   size_t i = 0;
   for (const auto& identifier : latest_identifiers_processed_) {
-    file << "id." << i << "=" << identifier << "\n";
+    state["id." + std::to_string(i)] = identifier;
     ++i;
   }
-  return true;
+  return state_manager_->set(state);
 }
 
-bool ListSFTP::updateFromTrackingTimestampsCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ifstream file(tracking_timestamps_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to open Tracking Timestamps state file \"%s\"", 
tracking_timestamps_state_filename_.c_str());
-    return false;
-  }
+bool ListSFTP::updateFromTrackingTimestampsCache(const 
std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::string state_listing_strategy;
   std::string state_hostname;
   std::string state_username;
   std::string state_remote_path;
   uint64_t state_listing_timestamp;
   uint64_t state_processed_timestamp;
   std::set<std::string> state_ids;
 
-  std::string line;
-  while (std::getline(file, line)) {
-    size_t separator_pos = line.find('=');
-    if (separator_pos == std::string::npos) {
-      logger_->log_warn("None key-value line found in Tracking Timestamps 
state file \"%s\": \"%s\"", tracking_timestamps_state_filename_.c_str(), 
line.c_str());
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_info("Found no stored state");
+    return false;
+  }
+  try {
+    state_listing_strategy = state_map.at("listing_strategy");
+    state_hostname = state_map.at("hostname");
+    state_username = state_map.at("username");
+    state_remote_path = state_map.at("remote_path");
+    try {
+      state_listing_timestamp = stoull(state_map.at("listing.timestamp"));
+    } catch (...) {
+      return false;
     }
-    std::string key = line.substr(0, separator_pos);
-    std::string value = line.substr(separator_pos + 1);
-    if (key == "hostname") {
-      state_hostname = std::move(value);
-    } else if (key == "username") {
-      state_username = std::move(value);
-    } else if (key == "remote_path") {
-      state_remote_path = std::move(value);
-    } else if (key == "listing.timestamp") {
-      try {
-        state_listing_timestamp = stoull(value);
-      } catch (...) {
-        logger_->log_error("listing.timestamp is not an uint64 in Tracking 
Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
-        return false;
-      }
-    } else if (key == "processed.timestamp") {
-      try {
-        state_processed_timestamp = stoull(value);
-      } catch (...) {
-        logger_->log_error("processed.timestamp is not an uint64 in Tracking 
Timestamps state file \"%s\"", tracking_timestamps_state_filename_.c_str());
-        return false;
+    try {
+      state_processed_timestamp = stoull(state_map.at("processed.timestamp"));
+    } catch (...) {
+      return false;
+    }

Review comment:
       Added individual logging.

##########
File path: extensions/sftp/processors/ListSFTP.cpp
##########
@@ -760,140 +708,83 @@ void ListSFTP::listByTrackingTimestamps(
   }
 }
 
-bool ListSFTP::persistTrackingEntitiesCache(const std::string& hostname, const 
std::string& username, const std::string& remote_path) {
-  std::ofstream file(tracking_entities_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to store Tracking Entities state to state file 
\"%s\"", tracking_entities_state_filename_.c_str());
-    return false;
-  }
-  file << "hostname=" << hostname << "\n";
-  file << "username=" << username << "\n";
-  file << "remote_path=" << remote_path << "\n";
-  file << "json_state_file=" << tracking_entities_state_json_filename_ << "\n";
-  file.close();
-
-  std::ofstream json_file(tracking_entities_state_json_filename_);
-  if (!json_file.is_open()) {
-    logger_->log_error("Failed to store Tracking Entities state to state json 
file \"%s\"", tracking_entities_state_json_filename_.c_str());
-    return false;
-  }
-
-  rapidjson::Document entities(rapidjson::kObjectType);
-  rapidjson::Document::AllocatorType& alloc = entities.GetAllocator();
-  for (const auto& already_listed_entity : already_listed_entities_) {
-    rapidjson::Value entity(rapidjson::kObjectType);
-    entity.AddMember("timestamp", already_listed_entity.second.timestamp, 
alloc);
-    entity.AddMember("size", already_listed_entity.second.size, alloc);
-    entities.AddMember(rapidjson::Value(already_listed_entity.first.c_str(), 
alloc), std::move(entity), alloc);
+bool ListSFTP::persistTrackingEntitiesCache(const 
std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::unordered_map<std::string, std::string> state;
+  state["listing_strategy"] = listing_strategy_;
+  state["hostname"] = hostname;
+  state["username"] = username;
+  state["remote_path"] = remote_path;
+  size_t i = 0;
+  for (const auto &already_listed_entity : already_listed_entities_) {
+    state["entity." + std::to_string(i) + ".name"] = 
already_listed_entity.first;
+    state["entity." + std::to_string(i) + ".timestamp"] = 
std::to_string(already_listed_entity.second.timestamp);
+    state["entity." + std::to_string(i) + ".size"] = 
std::to_string(already_listed_entity.second.size);
+    ++i;
   }
-
-  rapidjson::OStreamWrapper osw(json_file);
-  rapidjson::Writer<rapidjson::OStreamWrapper> writer(osw);
-  entities.Accept(writer);
-
-  return true;
+  return state_manager_->set(state);
 }
 
-bool ListSFTP::updateFromTrackingEntitiesCache(const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
-  std::ifstream file(tracking_entities_state_filename_);
-  if (!file.is_open()) {
-    logger_->log_error("Failed to open Tracking Entities state file \"%s\"", 
tracking_entities_state_filename_.c_str());
-    return false;
-  }
+bool ListSFTP::updateFromTrackingEntitiesCache(const 
std::shared_ptr<core::ProcessContext>& context, const std::string& hostname, 
const std::string& username, const std::string& remote_path) {
+  std::string state_listing_strategy;
   std::string state_hostname;
   std::string state_username;
   std::string state_remote_path;
-  std::string state_json_state_file;
-
-  std::string line;
-  while (std::getline(file, line)) {
-    size_t separator_pos = line.find('=');
-    if (separator_pos == std::string::npos) {
-      logger_->log_warn("None key-value line found in Tracking Entities state 
file \"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), line.c_str());
-      continue;
-    }
-    std::string key = line.substr(0, separator_pos);
-    std::string value = line.substr(separator_pos + 1);
-    if (key == "hostname") {
-      state_hostname = std::move(value);
-    } else if (key == "username") {
-      state_username = std::move(value);
-    } else if (key == "remote_path") {
-      state_remote_path = std::move(value);
-    } else if (key == "json_state_file") {
-      state_json_state_file = std::move(value);
-    } else {
-      logger_->log_warn("Unknown key found in Tracking Entities state file 
\"%s\": \"%s\"", tracking_entities_state_filename_.c_str(), key.c_str());
-    }
-  }
-  file.close();
-
-  if (state_hostname != hostname ||
-      state_username != username ||
-      state_remote_path != remote_path) {
-    logger_->log_error("Tracking Entities state file \"%s\" was created with 
different settings than the current ones, ignoring. "
-                       "Hostname: \"%s\" vs. \"%s\", "
-                       "Username: \"%s\" vs. \"%s\", "
-                       "Remote Path: \"%s\" vs. \"%s\"",
-                       tracking_entities_state_filename_.c_str(),
-                       state_hostname, hostname,
-                       state_username, username,
-                       state_remote_path, remote_path);
-    return false;
-  }
+  std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
 
-  if (state_json_state_file.empty()) {
-    logger_->log_error("Could not found json state file path in Tracking 
Entities state file \"%s\"", tracking_entities_state_filename_.c_str());
+  std::unordered_map<std::string, std::string> state_map;
+  if (!state_manager_->get(state_map)) {
+    logger_->log_debug("Failed to get state from StateManager");
     return false;
   }
-
-  std::ifstream json_file(state_json_state_file);
-  if (!json_file.is_open()) {
-    logger_->log_error("Failed to open entities Tracking Entities state json 
file \"%s\"", state_json_state_file.c_str());
-    return false;
-  }
-
   try {
-    rapidjson::IStreamWrapper isw(json_file);
-    rapidjson::Document d;
-    rapidjson::ParseResult res = d.ParseStream(isw);
-    if (!res) {
-      logger_->log_error("Failed to parse Tracking Entities state json file 
\"%s\"", state_json_state_file.c_str());
-      return false;
-    }
-    if (!d.IsObject()) {
-      logger_->log_error("Tracking Entities state json file \"%s\" root is not 
an object", state_json_state_file.c_str());
-      return false;
-    }
+    state_listing_strategy = state_map.at("listing_strategy");
+    state_hostname = state_map.at("hostname");
+    state_username = state_map.at("username");
+    state_remote_path = state_map.at("remote_path");
 
     std::unordered_map<std::string, ListedEntity> new_already_listed_entities;
-    for (const auto &already_listed_entity : d.GetObject()) {
-      auto it = already_listed_entity.value.FindMember("timestamp");
-      if (it == already_listed_entity.value.MemberEnd() || 
!it->value.IsUint64()) {
-        logger_->log_error("Tracking Entities state json file \"%s\" timestamp 
missing or malformatted for entity \"%s\"",
-            state_json_state_file.c_str(),
-            already_listed_entity.name.GetString());
-        continue;
+    size_t i = 0;
+    while (true) {
+      std::string name;
+      try {
+        name = state_map.at("entity." + std::to_string(i) + ".name");
+      } catch (...) {
+        break;
       }
-      uint64_t timestamp = it->value.GetUint64();
-      it = already_listed_entity.value.FindMember("size");
-      if (it == already_listed_entity.value.MemberEnd() || 
!it->value.IsUint64()) {
-        logger_->log_error("Tracking Entities state json file \"%s\" size 
missing or malformatted for entity \"%s\"",
-                           state_json_state_file.c_str(),
-                           already_listed_entity.name.GetString());
+      try {
+        uint64_t timestamp = std::stoull(state_map.at("entity." + 
std::to_string(i) + ".timestamp"));
+        uint64_t size = std::stoull(state_map.at("entity." + std::to_string(i) 
+ ".size"));
+        new_already_listed_entities.emplace(std::piecewise_construct,
+                                            std::forward_as_tuple(name),
+                                            std::forward_as_tuple(timestamp, 
size));
+      } catch (...) {
         continue;

Review comment:
       Added logging.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to