lordgamez commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1452345696


##########
PROCESSORS.md:
##########
@@ -2196,6 +2197,40 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | failure | FlowFiles that failed to be sent to the destination are 
transferred to this relationship     |
 
 
+## PushGrafanaLokiREST
+
+### Description
+
+A Grafana Loki push processor that uses the Grafana Loki REST API. The 
processor expects each flow file to contain a single log line to be pushed to 
Grafana Loki, therefore it is usually used together with the TailFile processor.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name                         | Default Value | Allowable Values | 
Description                                                                     
                                                                                
                                   |
+|------------------------------|---------------|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Url**                      |               |                  | Url of the 
Grafana Loki server. For example http://localhost:3100/.                        
                                                                                
                        |
+| **Stream Labels**            |               |                  | Comma 
separated list of <key>=<value> labels to be sent as stream labels.             
                                                                                
                             |
+| Log Line Metadata Attributes |               |                  | Comma 
separated list of attributes to be sent as log line metadata for a log line.    
                                                                                
                             |
+| Tenant ID                    |               |                  | The tenant 
ID used by default to push logs to Grafana Loki. If omitted or empty it assumes 
Grafana Loki is running in single-tenant mode and no X-Scope-OrgID header is 
sent.                      |
+| Max Batch Size               | 100           |                  | The 
maximum number of flow files to process at a time. If not set, or set to 0, all 
FlowFiles will be processed at once.                                            
                               |

Review Comment:
   Good catch, fixed in 0f1a614f803abf8ea538df7f95330a953ae2afd7



##########
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##########
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include <utility>
+#include <fstream>
+#include <filesystem>
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+    start_push_time_ = std::chrono::system_clock::now();
+    std::unordered_map<std::string, std::string> state;
+    state["start_push_time"] = 
std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count());
+    logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+    state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr<core::FlowFile>& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector<std::shared_ptr<core::FlowFile>> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result = batched_flowfiles_;
+  batched_flowfiles_.clear();
+  if (log_line_batch_wait_) {
+    std::unordered_map<std::string, std::string> state;
+    logger_->log_debug("Reset start push time state");
+    state["start_push_time"] = "0";
+    state_manager_->set(state);
+  }
+  return result;
+}
+
+bool PushGrafanaLokiREST::LogBatch::isReady() const {
+  return (log_line_batch_size_ && batched_flowfiles_.size() >= 
*log_line_batch_size_) || (log_line_batch_wait_ && 
std::chrono::system_clock::now() - start_push_time_ >= *log_line_batch_wait_);
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional<uint64_t> 
log_line_batch_size) {
+  log_line_batch_size_ = log_line_batch_size;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional<std::chrono::milliseconds>
 log_line_batch_wait) {
+  log_line_batch_wait_ = log_line_batch_wait;
+}
+
+void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* 
state_manager) {
+  state_manager_ = state_manager;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::system_clock::time_point
 start_push_time) {
+  start_push_time_ = start_push_time;
+}
+
+const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the 
FlowFile to be owned by this processor");
+
+void PushGrafanaLokiREST::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = 
context.getProperty(PushGrafanaLokiREST::SSLContextService)) {
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  }
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+}  // namespace
+
+void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) {
+  auto state_manager = context.getStateManager();
+  if (state_manager == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+  log_batch_.setStateManager(state_manager);
+
+  std::unordered_map<std::string, std::string> state_map;
+  if (state_manager->get(state_map)) {
+    auto it = state_map.find("start_push_time");
+    if (it != state_map.end()) {
+      logger_->log_info("Restored start push time from processor state: {}", 
it->second);
+      std::chrono::system_clock::time_point 
start_push_time{std::chrono::milliseconds{std::stoll(it->second)}};
+      log_batch_.setStartPushTime(start_push_time);
+    }
+  }
+}
+
+void PushGrafanaLokiREST::setUpStreamLabels(core::ProcessContext& context) {
+  if (auto stream_labels_str = context.getProperty(StreamLabels)) {
+    auto stream_labels = 
utils::string::splitAndTrimRemovingEmpty(*stream_labels_str, ",");
+    if (stream_labels.empty()) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Labels property");
+    }
+    for (const auto& label : stream_labels) {
+      auto stream_labels = utils::string::splitAndTrimRemovingEmpty(label, 
"=");
+      if (stream_labels.size() != 2) {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Labels property");
+      }
+      stream_label_attributes_[stream_labels[0]] = stream_labels[1];
+    }
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Labels property");
+  }
+}
+
+void PushGrafanaLokiREST::setupClientTimeouts(const core::ProcessContext& 
context) {
+  if (auto connection_timeout = 
context.getProperty<core::TimePeriodValue>(PushGrafanaLokiREST::ConnectTimeout))
 {
+    client_.setConnectionTimeout(connection_timeout->getMilliseconds());
+  }
+
+  if (auto read_timeout = 
context.getProperty<core::TimePeriodValue>(PushGrafanaLokiREST::ReadTimeout)) {
+    client_.setReadTimeout(read_timeout->getMilliseconds());
+  }
+}
+
+void PushGrafanaLokiREST::setAuthorization(const core::ProcessContext& 
context) {
+  if (auto username = context.getProperty(PushGrafanaLokiREST::Username)) {
+    auto password = context.getProperty(PushGrafanaLokiREST::Password);
+    if (!password) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Username is set, but 
Password property is not!");
+    }
+    std::string auth = *username + ":" + *password;
+    auto base64_encoded_auth = utils::string::to_base64(auth);
+    client_.setRequestHeader("Authorization", "Basic " + base64_encoded_auth);
+  } else if (auto bearer_token_file = 
context.getProperty(PushGrafanaLokiREST::BearerTokenFile)) {
+    if (!std::filesystem::exists(*bearer_token_file) || 
!std::filesystem::is_regular_file(*bearer_token_file)) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bearer Token File is not a 
regular file!");
+    }
+    std::ifstream file(*bearer_token_file, std::ios::binary);
+    std::stringstream buffer;
+    buffer << file.rdbuf();
+    std::string bearer_token = utils::string::trim(buffer.str());
+    client_.setRequestHeader("Authorization", "Bearer " + bearer_token);
+  } else {
+    client_.setRequestHeader("Authorization", std::nullopt);
+  }
+}
+
+void PushGrafanaLokiREST::initializeHttpClient(core::ProcessContext& context) {
+  auto url = utils::getRequiredPropertyOrThrow<std::string>(context, Url.name);
+  if (url.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Url property cannot be 
empty!");
+  }
+  if (utils::string::endsWith(url, "/")) {
+    url += "loki/api/v1/push";
+  } else {
+    url += "/loki/api/v1/push";
+  }
+  logger_->log_debug("PushGrafanaLokiREST push url is set to: {}", url);
+  client_.initialize(utils::HttpRequestMethod::POST, url, 
getSSLContextService(context));
+}
+
+void PushGrafanaLokiREST::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  setUpStateManager(context);
+  initializeHttpClient(context);
+  client_.setContentType("application/json");
+  client_.setFollowRedirects(true);
+
+  setUpStreamLabels(context);
+
+  if (auto log_line_metadata_attributes = 
context.getProperty(LogLineMetadataAttributes)) {
+    log_line_metadata_attributes_ = 
utils::string::splitAndTrimRemovingEmpty(*log_line_metadata_attributes, ",");
+  }
+
+  auto tenant_id = context.getProperty(TenantID);
+  if (tenant_id && !tenant_id->empty()) {
+    client_.setRequestHeader("X-Scope-OrgID", tenant_id);
+  } else {
+    client_.setRequestHeader("X-Scope-OrgID", std::nullopt);
+  }
+  auto log_line_batch_wait = 
context.getProperty<core::TimePeriodValue>(LogLineBatchWait);
+  auto log_line_batch_size = context.getProperty<uint64_t>(LogLineBatchSize);
+  if (log_line_batch_size && *log_line_batch_size < 1) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Log Line Batch Size property 
is invalid!");

Review Comment:
   Good point, updated in 0f1a614f803abf8ea538df7f95330a953ae2afd7



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to