fgerlits commented on code in PR #1695:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1425597679


##########
docker/test/integration/cluster/checkers/GrafanaLokiChecker.py:
##########
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import requests
+from typing import List
+from utils import wait_for
+
+
+class GrafanaLokiChecker:
+    def __init__(self):
+        self.url = "localhost:3100/loki/api/v1/query"
+
+    def veify_log_lines_on_grafana_loki(self, lines: List[str], ssl: bool, 
tenant_id: str):
+        labels = '{job="minifi"}'
+        prefix = "http://";
+        if ssl:
+            prefix = "https://";
+
+        query_url = f"{prefix}{self.url}?query={labels}"
+
+        headers = None
+        if tenant_id:
+            headers = {'X-Scope-OrgID': tenant_id}
+
+        response = requests.get(query_url, verify=False, timeout=30, 
headers=headers)
+        if response.status_code >= 200 and response.status_code < 300:

Review Comment:
   this conditional could be reversed, so the `return True` happens at the end 
of the function



##########
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##########
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include <utility>
+#include <fstream>
+#include <filesystem>
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+    start_push_time_ = std::chrono::steady_clock::now();
+    std::unordered_map<std::string, std::string> state;
+    state["start_push_time"] = 
std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count());
+    logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+    state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr<core::FlowFile>& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector<std::shared_ptr<core::FlowFile>> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result = batched_flowfiles_;
+  batched_flowfiles_.clear();
+  if (log_line_batch_wait_) {
+    start_push_time_ = {};
+    std::unordered_map<std::string, std::string> state;
+    logger_->log_debug("Reset start push time state");
+    state["start_push_time"] = "0";
+    state_manager_->set(state);
+  }
+  return result;
+}
+
+bool PushGrafanaLokiREST::LogBatch::isReady() const {
+  return (log_line_batch_size_ && batched_flowfiles_.size() >= 
*log_line_batch_size_) || (log_line_batch_wait_ && 
std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_);
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional<uint64_t> 
log_line_batch_size) {
+  log_line_batch_size_ = log_line_batch_size;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional<std::chrono::milliseconds>
 log_line_batch_wait) {
+  log_line_batch_wait_ = log_line_batch_wait;
+}
+
+void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* 
state_manager) {
+  state_manager_ = state_manager;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point
 start_push_time) {
+  start_push_time_ = start_push_time;
+}
+
+const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the 
FlowFile to be owned by this processor");
+
+void PushGrafanaLokiREST::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = 
context.getProperty(PushGrafanaLokiREST::SSLContextService)) {
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  }
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+std::string readLogLineFromFlowFile(const std::shared_ptr<core::FlowFile>& 
flow_file, core::ProcessSession& session) {
+  auto read_buffer_result = session.readBuffer(flow_file);
+  return {reinterpret_cast<const char*>(read_buffer_result.buffer.data()), 
read_buffer_result.buffer.size()};

Review Comment:
   there is a function for this in `ProcessSession.h`:
   ```suggestion
     return to_string(read_buffer_result);
   ```
   or it could even be inlined, as this function is only used in one place:
   ```c++
   auto line = to_string(session.readBuffer(flow_file));
   ```



##########
extensions/grafana-loki/PushGrafanaLokiREST.h:
##########
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include <memory>
+#include <vector>
+#include <map>
+
+#include "controllers/SSLContextService.h"
+#include "core/Processor.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/PropertyType.h"
+#include "core/RelationshipDefinition.h"
+#include "client/HTTPClient.h"
+#include "core/StateManager.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+class PushGrafanaLokiREST : public core::Processor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push 
processor that uses the Grafana Loki REST API. The processor expects each flow 
file to contain a single log line to be "
+                                                          "pushed to Grafana 
Loki, therefore it is usually used together with the TailFile processor.";
+
+  explicit PushGrafanaLokiREST(const std::string& name, const 
utils::Identifier& uuid = {})
+      : Processor(name, uuid),
+        log_batch_(logger_) {
+  }
+  ~PushGrafanaLokiREST() override = default;
+
+  EXTENSIONAPI static constexpr auto Url = 
core::PropertyDefinitionBuilder<>::createProperty("Url")
+    .withDescription("Url of the Grafana Loki server. For example 
http://localhost:3100/.";)
+    .isRequired(true)
+    .build();
+  EXTENSIONAPI static constexpr auto StreamLabels = 
core::PropertyDefinitionBuilder<>::createProperty("Stream Labels")
+    .withDescription("Comma separated list of <key>=<value> labels to be sent 
as stream labels.")
+    .isRequired(true)
+    .build();
+  EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata 
Attributes")
+    .withDescription("Comma separated list of attributes to be sent as log 
line metadata for a log line.")
+    .build();
+  EXTENSIONAPI static constexpr auto TenantID = 
core::PropertyDefinitionBuilder<>::createProperty("Tenant ID")
+    .withDescription("The tenant ID used by default to push logs to Grafana 
Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant 
mode and no X-Scope-OrgID header is sent.")
+    .build();
+  EXTENSIONAPI static constexpr auto MaxBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size")
+    .withDescription("The maximum number of flow files to process at a time. 
If not set, or set to 0, all FlowFiles will be processed at once.")
+    .withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE)
+    .withDefaultValue("100")
+    .build();
+  EXTENSIONAPI static constexpr auto LogLineBatchWait = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait")
+    .withDescription("Time to wait before sending a log line batch to Grafana 
Loki, full or not. If this property and Log Line Batch Size are both unset, "
+                     "the log batch of the current trigger will be sent 
immediately.")
+    .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+    .build();
+  EXTENSIONAPI static constexpr auto LogLineBatchSize = 
core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size")
+    .withDescription("Number of log lines to send in a batch to Loki. If this 
property and Log Line Batch Wait are both unset, "
+                     "the log batch of the current trigger will be sent 
immediately.")
+    .withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE)
+    .build();
+  EXTENSIONAPI static constexpr auto ConnectTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout")
+    .withDescription("Max wait time for connection to the Grafana Loki 
service.")
+    .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+    .withDefaultValue("5 s")
+    .isRequired(true)
+    .build();
+  EXTENSIONAPI static constexpr auto ReadTimeout = 
core::PropertyDefinitionBuilder<>::createProperty("Read Timeout")
+    .withDescription("Max wait time for response from remote service.")
+    .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE)
+    .withDefaultValue("15 s")
+    .isRequired(true)
+    .build();
+  EXTENSIONAPI static constexpr auto SSLContextService = 
core::PropertyDefinitionBuilder<>::createProperty("SSL Context Service")
+    .withDescription("The SSL Context Service used to provide client 
certificate information for TLS/SSL (https) connections.")
+    .withAllowedTypes<minifi::controllers::SSLContextService>()
+    .build();
+  EXTENSIONAPI static constexpr auto Username = 
core::PropertyDefinitionBuilder<>::createProperty("Username")
+    .withDescription("Username for authenticating using basic authentication.")
+    .build();
+  EXTENSIONAPI static constexpr auto Password = 
core::PropertyDefinitionBuilder<>::createProperty("Password")
+    .withDescription("Password for authenticating using basic authentication.")
+    .build();
+  EXTENSIONAPI static constexpr auto BearerTokenFile = 
core::PropertyDefinitionBuilder<>::createProperty("Bearer Token File")
+    .withDescription("Path of file containing bearer token for bearer token 
authentication.")
+    .build();
+  EXTENSIONAPI static constexpr auto Properties = 
std::array<core::PropertyReference, 13>{
+      Url,
+      StreamLabels,
+      LogLineMetadataAttributes,
+      TenantID,
+      MaxBatchSize,
+      LogLineBatchWait,
+      LogLineBatchSize,
+      ConnectTimeout,
+      ReadTimeout,
+      SSLContextService,
+      Username,
+      Password,
+      BearerTokenFile
+  };
+
+  EXTENSIONAPI static constexpr auto Success = 
core::RelationshipDefinition{"success", "All flowfiles that succeed in being 
transferred into Grafana Loki go here."};
+  EXTENSIONAPI static constexpr auto Failure = 
core::RelationshipDefinition{"failure", "All flowfiles that fail for reasons 
unrelated to server availability go to this relationship."};

Review Comment:
   what happens to flow files that fail because of server (un)availability?  
from the code it looks like those get transferred to `failure`, as well



##########
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##########
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include <utility>
+#include <fstream>
+#include <filesystem>
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+    start_push_time_ = std::chrono::steady_clock::now();
+    std::unordered_map<std::string, std::string> state;
+    state["start_push_time"] = 
std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count());
+    logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+    state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr<core::FlowFile>& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector<std::shared_ptr<core::FlowFile>> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result = batched_flowfiles_;
+  batched_flowfiles_.clear();
+  if (log_line_batch_wait_) {
+    start_push_time_ = {};
+    std::unordered_map<std::string, std::string> state;
+    logger_->log_debug("Reset start push time state");
+    state["start_push_time"] = "0";
+    state_manager_->set(state);
+  }
+  return result;
+}
+
+bool PushGrafanaLokiREST::LogBatch::isReady() const {
+  return (log_line_batch_size_ && batched_flowfiles_.size() >= 
*log_line_batch_size_) || (log_line_batch_wait_ && 
std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_);
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional<uint64_t> 
log_line_batch_size) {
+  log_line_batch_size_ = log_line_batch_size;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional<std::chrono::milliseconds>
 log_line_batch_wait) {
+  log_line_batch_wait_ = log_line_batch_wait;
+}
+
+void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* 
state_manager) {
+  state_manager_ = state_manager;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point
 start_push_time) {
+  start_push_time_ = start_push_time;
+}
+
+const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the 
FlowFile to be owned by this processor");
+
+void PushGrafanaLokiREST::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = 
context.getProperty(PushGrafanaLokiREST::SSLContextService)) {
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  }
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+std::string readLogLineFromFlowFile(const std::shared_ptr<core::FlowFile>& 
flow_file, core::ProcessSession& session) {
+  auto read_buffer_result = session.readBuffer(flow_file);
+  return {reinterpret_cast<const char*>(read_buffer_result.buffer.data()), 
read_buffer_result.buffer.size()};
+}
+}  // namespace
+
+void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) {
+  auto state_manager = context.getStateManager();
+  if (state_manager == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+  log_batch_.setStateManager(state_manager);
+
+  std::unordered_map<std::string, std::string> state_map;
+  if (state_manager->get(state_map)) {
+    auto it = state_map.find("start_push_time");
+    if (it != state_map.end()) {
+      logger_->log_info("Restored start push time from processor state: {}", 
it->second);
+      std::chrono::steady_clock::time_point 
start_push_time{std::chrono::milliseconds{std::stoll(it->second)}};
+      log_batch_.setStartPushTime(start_push_time);
+    }
+  }
+}
+
+void PushGrafanaLokiREST::setUpStreamLabels(core::ProcessContext& context) {
+  if (auto stream_labels_str = context.getProperty(StreamLabels)) {
+    auto stream_labels = 
utils::StringUtils::splitAndTrimRemovingEmpty(*stream_labels_str, ",");
+    if (stream_labels.empty()) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Label Attributes");
+    }
+    for (const auto& label : stream_labels) {
+      auto stream_labels = 
utils::StringUtils::splitAndTrimRemovingEmpty(label, "=");
+      if (stream_labels.size() != 2) {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Label Attributes");
+      }
+      stream_label_attributes_[stream_labels[0]] = stream_labels[1];
+    }
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Label Attributes");
+  }
+}
+
+void PushGrafanaLokiREST::setupClientTimeouts(const core::ProcessContext& 
context) {
+  if (auto connection_timeout = 
context.getProperty<core::TimePeriodValue>(PushGrafanaLokiREST::ConnectTimeout))
 {
+    client_.setConnectionTimeout(connection_timeout->getMilliseconds());
+  }
+
+  if (auto read_timeout = 
context.getProperty<core::TimePeriodValue>(PushGrafanaLokiREST::ReadTimeout)) {
+    client_.setReadTimeout(read_timeout->getMilliseconds());
+  }
+}
+
+void PushGrafanaLokiREST::setAuthorization(const core::ProcessContext& 
context) {
+  if (auto username = context.getProperty(PushGrafanaLokiREST::Username)) {
+    auto password = context.getProperty(PushGrafanaLokiREST::Password);
+    if (!password) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Username is set, but 
Password property is not!");
+    }
+    std::string auth = *username + ":" + *password;
+    auto base64_encoded_auth = utils::StringUtils::to_base64(auth);
+    client_.setRequestHeader("Authorization", "Basic " + base64_encoded_auth);
+  } else if (auto bearer_token_file = 
context.getProperty(PushGrafanaLokiREST::BearerTokenFile)) {
+    if (!std::filesystem::exists(*bearer_token_file) || 
!std::filesystem::is_regular_file(*bearer_token_file)) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bearer Token File is not a 
regular file!");
+    }
+    std::ifstream file(*bearer_token_file);

Review Comment:
   should this be opened in binary mode? or it doesn't matter if we use 
`rdbuf()`?



##########
extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp:
##########
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PushGrafanaLokiREST.h"
+#include "MockGrafanaLoki.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "utils/StringUtils.h"
+#include "utils/TestUtils.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki::test {
+
+TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, ""));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1"));
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+TEST_CASE("Valid stream labels need to be set", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1"));
+  SECTION("Stream labels cannot be empty") {
+    test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "");
+  }
+  SECTION("Stream labels need to be valid") {
+    test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "invalidlabels,invalidlabels2");
+  }
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+TEST_CASE("Log Line Batch Size cannot be 0", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "0");
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+class PushGrafanaLokiRESTTestFixture {
+ public:
+  PushGrafanaLokiRESTTestFixture()
+      : mock_loki_("10990"),
+        
push_grafana_loki_rest_(std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST")),
+        test_controller_(push_grafana_loki_rest_) {
+    LogTestController::getInstance().setDebug<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setTrace<PushGrafanaLokiREST>();
+    CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+    CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  }
+
+  void setProperty(const auto& property, const std::string& property_value) {
+    CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, 
property, property_value));
+  }
+
+  void verifyLastRequestIsEmpty() {
+    const auto& request = mock_loki_.getLastRequest();
+    REQUIRE(request.IsNull());
+  }
+
+  void verifyTenantId(const std::string& tenant_id) {
+    REQUIRE(mock_loki_.getLastTenantId() == tenant_id);
+  }
+
+  void verifyBasicAuthorization(const std::string& 
expected_username_and_password) {
+    auto last_authorization = mock_loki_.getLastAuthorization();
+    std::string expected_authorization = "Basic ";
+    REQUIRE(minifi::utils::StringUtils::startsWith(last_authorization, 
expected_authorization));

Review Comment:
   I would prefer to remove the `utils::StringUtils` alias and use 
`utils::string` instead (https://github.com/apache/nifi-minifi-cpp/pull/1710)



##########
bootstrap.sh:
##########
@@ -339,6 +339,9 @@ add_option PROMETHEUS_ENABLED ${TRUE} "ENABLE_PROMETHEUS"
 add_option OPENSSL_ENABLED ${TRUE} "OPENSSL_OFF"
 add_dependency OPENSSL_ENABLED "opensslbuild"
 
+add_option GRAFANA_LOKI_ENABLED ${TRUE} "ENABLE_GRAFANA_LOKI"
+set_dependency GRAFANA_LOKI_ENABLED HTTP_CURL_ENABLED

Review Comment:
   should this be `FALSE` by default? both the CMake flag and the corresponding 
flag in `win_build_vs.bat` default to `OFF`



##########
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##########
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include <utility>
+#include <fstream>
+#include <filesystem>
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+    start_push_time_ = std::chrono::steady_clock::now();
+    std::unordered_map<std::string, std::string> state;
+    state["start_push_time"] = 
std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count());
+    logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+    state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr<core::FlowFile>& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector<std::shared_ptr<core::FlowFile>> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result = batched_flowfiles_;
+  batched_flowfiles_.clear();
+  if (log_line_batch_wait_) {
+    start_push_time_ = {};
+    std::unordered_map<std::string, std::string> state;
+    logger_->log_debug("Reset start push time state");
+    state["start_push_time"] = "0";
+    state_manager_->set(state);
+  }
+  return result;
+}
+
+bool PushGrafanaLokiREST::LogBatch::isReady() const {
+  return (log_line_batch_size_ && batched_flowfiles_.size() >= 
*log_line_batch_size_) || (log_line_batch_wait_ && 
std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_);
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional<uint64_t> 
log_line_batch_size) {
+  log_line_batch_size_ = log_line_batch_size;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional<std::chrono::milliseconds>
 log_line_batch_wait) {
+  log_line_batch_wait_ = log_line_batch_wait;
+}
+
+void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* 
state_manager) {
+  state_manager_ = state_manager;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point
 start_push_time) {
+  start_push_time_ = start_push_time;
+}
+
+const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the 
FlowFile to be owned by this processor");
+
+void PushGrafanaLokiREST::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = 
context.getProperty(PushGrafanaLokiREST::SSLContextService)) {
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  }
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+std::string readLogLineFromFlowFile(const std::shared_ptr<core::FlowFile>& 
flow_file, core::ProcessSession& session) {
+  auto read_buffer_result = session.readBuffer(flow_file);
+  return {reinterpret_cast<const char*>(read_buffer_result.buffer.data()), 
read_buffer_result.buffer.size()};
+}
+}  // namespace
+
+void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) {
+  auto state_manager = context.getStateManager();
+  if (state_manager == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+  log_batch_.setStateManager(state_manager);
+
+  std::unordered_map<std::string, std::string> state_map;
+  if (state_manager->get(state_map)) {
+    auto it = state_map.find("start_push_time");
+    if (it != state_map.end()) {
+      logger_->log_info("Restored start push time from processor state: {}", 
it->second);
+      std::chrono::steady_clock::time_point 
start_push_time{std::chrono::milliseconds{std::stoll(it->second)}};
+      log_batch_.setStartPushTime(start_push_time);

Review Comment:
   This is not going to work well if the box is rebooted, because the 
`steady_clock` usually restarts from zero at startup.  So we will restore a 
`start_push_time_` of "50 days after reboot" from the saved state, when `now()` 
is "1 minute after reboot" -- so we will now wait 50 days + 
`log_line_batch_wait_` until we send out the next batch.
   
   Maybe we could use some in-memory-only static store instead of the state 
manager?



##########
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##########
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include <utility>
+#include <fstream>
+#include <filesystem>
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+    start_push_time_ = std::chrono::steady_clock::now();
+    std::unordered_map<std::string, std::string> state;
+    state["start_push_time"] = 
std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count());
+    logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+    state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr<core::FlowFile>& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector<std::shared_ptr<core::FlowFile>> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result = batched_flowfiles_;
+  batched_flowfiles_.clear();
+  if (log_line_batch_wait_) {
+    start_push_time_ = {};

Review Comment:
   very minor, but we have already set this at line 52



##########
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##########
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include <utility>
+#include <fstream>
+#include <filesystem>
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+    start_push_time_ = std::chrono::steady_clock::now();
+    std::unordered_map<std::string, std::string> state;
+    state["start_push_time"] = 
std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count());
+    logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+    state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr<core::FlowFile>& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector<std::shared_ptr<core::FlowFile>> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result = batched_flowfiles_;
+  batched_flowfiles_.clear();
+  if (log_line_batch_wait_) {
+    start_push_time_ = {};
+    std::unordered_map<std::string, std::string> state;
+    logger_->log_debug("Reset start push time state");
+    state["start_push_time"] = "0";
+    state_manager_->set(state);
+  }
+  return result;
+}
+
+bool PushGrafanaLokiREST::LogBatch::isReady() const {
+  return (log_line_batch_size_ && batched_flowfiles_.size() >= 
*log_line_batch_size_) || (log_line_batch_wait_ && 
std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_);
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional<uint64_t> 
log_line_batch_size) {
+  log_line_batch_size_ = log_line_batch_size;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional<std::chrono::milliseconds>
 log_line_batch_wait) {
+  log_line_batch_wait_ = log_line_batch_wait;
+}
+
+void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* 
state_manager) {
+  state_manager_ = state_manager;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point
 start_push_time) {
+  start_push_time_ = start_push_time;
+}
+
+const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the 
FlowFile to be owned by this processor");
+
+void PushGrafanaLokiREST::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = 
context.getProperty(PushGrafanaLokiREST::SSLContextService)) {
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  }
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+std::string readLogLineFromFlowFile(const std::shared_ptr<core::FlowFile>& 
flow_file, core::ProcessSession& session) {
+  auto read_buffer_result = session.readBuffer(flow_file);
+  return {reinterpret_cast<const char*>(read_buffer_result.buffer.data()), 
read_buffer_result.buffer.size()};
+}
+}  // namespace
+
+void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) {
+  auto state_manager = context.getStateManager();
+  if (state_manager == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+  log_batch_.setStateManager(state_manager);
+
+  std::unordered_map<std::string, std::string> state_map;
+  if (state_manager->get(state_map)) {
+    auto it = state_map.find("start_push_time");
+    if (it != state_map.end()) {
+      logger_->log_info("Restored start push time from processor state: {}", 
it->second);
+      std::chrono::steady_clock::time_point 
start_push_time{std::chrono::milliseconds{std::stoll(it->second)}};
+      log_batch_.setStartPushTime(start_push_time);
+    }
+  }
+}
+
+void PushGrafanaLokiREST::setUpStreamLabels(core::ProcessContext& context) {
+  if (auto stream_labels_str = context.getProperty(StreamLabels)) {
+    auto stream_labels = 
utils::StringUtils::splitAndTrimRemovingEmpty(*stream_labels_str, ",");
+    if (stream_labels.empty()) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Label Attributes");

Review Comment:
   nitpicking, but I would write "Missing or invalid Stream Labels property"



##########
extensions/grafana-loki/tests/MockGrafanaLoki.h:
##########
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+#include <CivetServer.h>
+#include "tests/CivetLibrary.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rapidjson/document.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stringbuffer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki::test {
+
+class GrafanaLokiHandler : public CivetHandler {
+ public:
+  const rapidjson::Document& getLastRequest() const {
+    return request_received_;
+  }
+
+  std::string getLastTenantId() const {
+    return tenant_id_set_;
+  }
+
+  std::string getLastAuthorization() const {
+    return authorization_set_;
+  }
+
+ private:
+  bool handlePost(CivetServer*, struct mg_connection* conn) override {
+    tenant_id_set_.clear();
+    authorization_set_.clear();
+    const char *org_id = mg_get_header(conn, "X-Scope-OrgID");
+    if (org_id != nullptr) {
+      tenant_id_set_ = org_id;
+    }
+
+    const char *authorization = mg_get_header(conn, "Authorization");
+    if (authorization != nullptr) {
+      authorization_set_ = authorization;
+    }
+
+    std::array<char, 2048> request;
+    size_t chars_read = mg_read(conn, request.data(), 2048);

Review Comment:
   `mg_read` can return `-1` on errors; we should handle that case



##########
extensions/grafana-loki/PushGrafanaLokiREST.cpp:
##########
@@ -0,0 +1,396 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PushGrafanaLokiREST.h"
+
+#include <utility>
+#include <fstream>
+#include <filesystem>
+
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+#include "utils/StringUtils.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki {
+
+void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& 
flowfile) {
+  gsl_Expects(state_manager_);
+  if (log_line_batch_wait_ && batched_flowfiles_.empty()) {
+    start_push_time_ = std::chrono::steady_clock::now();
+    std::unordered_map<std::string, std::string> state;
+    state["start_push_time"] = 
std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count());
+    logger_->log_debug("Saved start push time to state: {}", 
state["start_push_time"]);
+    state_manager_->set(state);
+  }
+  batched_flowfiles_.push_back(flowfile);
+}
+
+void PushGrafanaLokiREST::LogBatch::restore(const 
std::shared_ptr<core::FlowFile>& flowfile) {
+  batched_flowfiles_.push_back(flowfile);
+}
+
+std::vector<std::shared_ptr<core::FlowFile>> 
PushGrafanaLokiREST::LogBatch::flush() {
+  gsl_Expects(state_manager_);
+  start_push_time_ = {};
+  auto result = batched_flowfiles_;
+  batched_flowfiles_.clear();
+  if (log_line_batch_wait_) {
+    start_push_time_ = {};
+    std::unordered_map<std::string, std::string> state;
+    logger_->log_debug("Reset start push time state");
+    state["start_push_time"] = "0";
+    state_manager_->set(state);
+  }
+  return result;
+}
+
+bool PushGrafanaLokiREST::LogBatch::isReady() const {
+  return (log_line_batch_size_ && batched_flowfiles_.size() >= 
*log_line_batch_size_) || (log_line_batch_wait_ && 
std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_);
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional<uint64_t> 
log_line_batch_size) {
+  log_line_batch_size_ = log_line_batch_size;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional<std::chrono::milliseconds>
 log_line_batch_wait) {
+  log_line_batch_wait_ = log_line_batch_wait;
+}
+
+void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* 
state_manager) {
+  state_manager_ = state_manager;
+}
+
+void 
PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point
 start_push_time) {
+  start_push_time_ = start_push_time;
+}
+
+const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the 
FlowFile to be owned by this processor");
+
+void PushGrafanaLokiREST::initialize() {
+  setSupportedProperties(Properties);
+  setSupportedRelationships(Relationships);
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = 
context.getProperty(PushGrafanaLokiREST::SSLContextService)) {
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  }
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+std::string readLogLineFromFlowFile(const std::shared_ptr<core::FlowFile>& 
flow_file, core::ProcessSession& session) {
+  auto read_buffer_result = session.readBuffer(flow_file);
+  return {reinterpret_cast<const char*>(read_buffer_result.buffer.data()), 
read_buffer_result.buffer.size()};
+}
+}  // namespace
+
+void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) {
+  auto state_manager = context.getStateManager();
+  if (state_manager == nullptr) {
+    throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
+  }
+  log_batch_.setStateManager(state_manager);
+
+  std::unordered_map<std::string, std::string> state_map;
+  if (state_manager->get(state_map)) {
+    auto it = state_map.find("start_push_time");
+    if (it != state_map.end()) {
+      logger_->log_info("Restored start push time from processor state: {}", 
it->second);
+      std::chrono::steady_clock::time_point 
start_push_time{std::chrono::milliseconds{std::stoll(it->second)}};
+      log_batch_.setStartPushTime(start_push_time);
+    }
+  }
+}
+
+void PushGrafanaLokiREST::setUpStreamLabels(core::ProcessContext& context) {
+  if (auto stream_labels_str = context.getProperty(StreamLabels)) {
+    auto stream_labels = 
utils::StringUtils::splitAndTrimRemovingEmpty(*stream_labels_str, ",");
+    if (stream_labels.empty()) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Label Attributes");
+    }
+    for (const auto& label : stream_labels) {
+      auto stream_labels = 
utils::StringUtils::splitAndTrimRemovingEmpty(label, "=");
+      if (stream_labels.size() != 2) {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Label Attributes");
+      }
+      stream_label_attributes_[stream_labels[0]] = stream_labels[1];
+    }
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream 
Label Attributes");
+  }
+}
+
+void PushGrafanaLokiREST::setupClientTimeouts(const core::ProcessContext& 
context) {
+  if (auto connection_timeout = 
context.getProperty<core::TimePeriodValue>(PushGrafanaLokiREST::ConnectTimeout))
 {
+    client_.setConnectionTimeout(connection_timeout->getMilliseconds());
+  }
+
+  if (auto read_timeout = 
context.getProperty<core::TimePeriodValue>(PushGrafanaLokiREST::ReadTimeout)) {
+    client_.setReadTimeout(read_timeout->getMilliseconds());
+  }
+}
+
+void PushGrafanaLokiREST::setAuthorization(const core::ProcessContext& 
context) {
+  if (auto username = context.getProperty(PushGrafanaLokiREST::Username)) {
+    auto password = context.getProperty(PushGrafanaLokiREST::Password);
+    if (!password) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Username is set, but 
Password property is not!");
+    }
+    std::string auth = *username + ":" + *password;
+    auto base64_encoded_auth = utils::StringUtils::to_base64(auth);
+    client_.setRequestHeader("Authorization", "Basic " + base64_encoded_auth);
+  } else if (auto bearer_token_file = 
context.getProperty(PushGrafanaLokiREST::BearerTokenFile)) {
+    if (!std::filesystem::exists(*bearer_token_file) || 
!std::filesystem::is_regular_file(*bearer_token_file)) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bearer Token File is not a 
regular file!");
+    }
+    std::ifstream file(*bearer_token_file);
+    std::stringstream buffer;
+    buffer << file.rdbuf();
+    std::string bearer_token = utils::StringUtils::trim(buffer.str());
+    client_.setRequestHeader("Authorization", "Bearer " + bearer_token);
+  } else {
+    client_.setRequestHeader("Authorization", std::nullopt);
+  }
+}
+
+void PushGrafanaLokiREST::initializeHttpClient(core::ProcessContext& context) {
+  auto url = utils::getRequiredPropertyOrThrow<std::string>(context, Url.name);
+  if (url.empty()) {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Url property cannot be 
empty!");
+  }
+  if (utils::StringUtils::endsWith(url, "/")) {
+    url += "loki/api/v1/push";
+  } else {
+    url += "/loki/api/v1/push";
+  }
+  logger_->log_debug("PushGrafanaLokiREST push url is set to: {}", url);
+  client_.initialize(utils::HttpRequestMethod::POST, url, 
getSSLContextService(context));
+}
+
+void PushGrafanaLokiREST::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
+  setUpStateManager(context);
+  initializeHttpClient(context);
+  client_.setContentType("application/json");
+  client_.setFollowRedirects(true);
+
+  setUpStreamLabels(context);
+
+  if (auto log_line_metadata_attributes = 
context.getProperty(LogLineMetadataAttributes)) {
+    log_line_metadata_attributes_ = 
utils::StringUtils::splitAndTrimRemovingEmpty(*log_line_metadata_attributes, 
",");
+  }
+
+  auto tenant_id = context.getProperty(TenantID);
+  if (tenant_id && !tenant_id->empty()) {
+    client_.setRequestHeader("X-Scope-OrgID", tenant_id);
+  } else {
+    client_.setRequestHeader("X-Scope-OrgID", std::nullopt);
+  }
+  auto log_line_batch_wait = 
context.getProperty<core::TimePeriodValue>(LogLineBatchWait);
+  auto log_line_batch_size = context.getProperty<uint64_t>(LogLineBatchSize);
+    if (log_line_batch_size && *log_line_batch_size < 1) {

Review Comment:
   bad indentation



##########
extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp:
##########
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PushGrafanaLokiREST.h"
+#include "MockGrafanaLoki.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "utils/StringUtils.h"
+#include "utils/TestUtils.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki::test {
+
+TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, ""));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1"));
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+TEST_CASE("Valid stream labels need to be set", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1"));
+  SECTION("Stream labels cannot be empty") {
+    test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "");
+  }
+  SECTION("Stream labels need to be valid") {
+    test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "invalidlabels,invalidlabels2");
+  }
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+TEST_CASE("Log Line Batch Size cannot be 0", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "0");
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+class PushGrafanaLokiRESTTestFixture {
+ public:
+  PushGrafanaLokiRESTTestFixture()
+      : mock_loki_("10990"),
+        
push_grafana_loki_rest_(std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST")),
+        test_controller_(push_grafana_loki_rest_) {
+    LogTestController::getInstance().setDebug<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setTrace<PushGrafanaLokiREST>();
+    CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+    CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  }
+
+  void setProperty(const auto& property, const std::string& property_value) {
+    CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, 
property, property_value));
+  }
+
+  void verifyLastRequestIsEmpty() {
+    const auto& request = mock_loki_.getLastRequest();
+    REQUIRE(request.IsNull());
+  }
+
+  void verifyTenantId(const std::string& tenant_id) {
+    REQUIRE(mock_loki_.getLastTenantId() == tenant_id);
+  }
+
+  void verifyBasicAuthorization(const std::string& 
expected_username_and_password) {
+    auto last_authorization = mock_loki_.getLastAuthorization();
+    std::string expected_authorization = "Basic ";
+    REQUIRE(minifi::utils::StringUtils::startsWith(last_authorization, 
expected_authorization));
+    std::string username_and_password_decoded = 
minifi::utils::StringUtils::from_base64(last_authorization.substr(expected_authorization.size()),
 minifi::utils::as_string_tag_t{});
+    REQUIRE(username_and_password_decoded == expected_username_and_password);
+  }
+
+  void verifyBearerTokenAuthorization(const std::string& 
expected_bearer_token) {
+    auto last_authorization = mock_loki_.getLastAuthorization();
+    std::string expected_authorization = "Bearer ";
+    REQUIRE(minifi::utils::StringUtils::startsWith(last_authorization, 
expected_authorization));
+    auto bearer_token = 
last_authorization.substr(expected_authorization.size());
+    REQUIRE(bearer_token == expected_bearer_token);
+  }
+
+  void verifyStreamLabels() {
+    const auto& request = mock_loki_.getLastRequest();
+    REQUIRE(request.HasMember("streams"));
+    const auto& stream_array = request["streams"].GetArray();
+    REQUIRE(stream_array.Size() == 1);
+    REQUIRE(stream_array[0].HasMember("stream"));
+    const auto& stream = stream_array[0]["stream"].GetObject();
+    REQUIRE(stream.HasMember("job"));
+    std::string job_string = stream["job"].GetString();
+    REQUIRE(job_string == "minifi");
+    REQUIRE(stream.HasMember("directory"));
+    std::string directory_string = stream["directory"].GetString();
+    REQUIRE(directory_string == "/opt/minifi/logs/");
+  }
+
+  void verifySentRequestToLoki(uint64_t start_timestamp, const 
std::vector<std::string>& expected_log_values,
+      const std::vector<std::map<std::string, std::string>>& 
expected_log_line_attribute_values = {}) {
+    const auto& request = mock_loki_.getLastRequest();
+    REQUIRE(request.HasMember("streams"));
+    const auto& stream_array = request["streams"].GetArray();
+    REQUIRE(stream_array[0].HasMember("values"));
+    const auto& value_array = stream_array[0]["values"].GetArray();
+    REQUIRE(value_array.Size() == expected_log_values.size());
+    for (size_t i = 0; i < expected_log_values.size(); ++i) {
+      const auto& log_line_array = value_array[i].GetArray();
+      if (!expected_log_line_attribute_values.empty()) {
+        REQUIRE(log_line_array.Size() == 3);
+      } else {
+        REQUIRE(log_line_array.Size() == 2);
+      }
+      std::string timestamp_str = log_line_array[0].GetString();
+      REQUIRE(start_timestamp <= std::stoull(timestamp_str));
+      std::string value = log_line_array[1].GetString();
+      REQUIRE(value == expected_log_values[i]);

Review Comment:
   this `value` should be renamed to something else, so it doesn't conflict 
with the `value` on lines 139-141



##########
extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp:
##########
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PushGrafanaLokiREST.h"
+#include "MockGrafanaLoki.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "utils/StringUtils.h"
+#include "utils/TestUtils.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki::test {
+
+TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, ""));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1"));

Review Comment:
   I think these should be `REQUIRE`s, since it doesn't make sense to continue 
if they fail. On the other hand, there is no way they can fail, so it could 
also be
   ```suggestion
     test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, "");
     test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/");
     test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1");
   ```



##########
extensions/grafana-loki/tests/MockGrafanaLoki.h:
##########
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+#include <CivetServer.h>
+#include "tests/CivetLibrary.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rapidjson/document.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stringbuffer.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki::test {
+
+class GrafanaLokiHandler : public CivetHandler {
+ public:
+  const rapidjson::Document& getLastRequest() const {
+    return request_received_;
+  }
+
+  std::string getLastTenantId() const {
+    return tenant_id_set_;
+  }
+
+  std::string getLastAuthorization() const {
+    return authorization_set_;
+  }
+
+ private:
+  bool handlePost(CivetServer*, struct mg_connection* conn) override {
+    tenant_id_set_.clear();
+    authorization_set_.clear();
+    const char *org_id = mg_get_header(conn, "X-Scope-OrgID");
+    if (org_id != nullptr) {
+      tenant_id_set_ = org_id;
+    }
+
+    const char *authorization = mg_get_header(conn, "Authorization");
+    if (authorization != nullptr) {
+      authorization_set_ = authorization;
+    }
+
+    std::array<char, 2048> request;
+    size_t chars_read = mg_read(conn, request.data(), 2048);
+    std::string json_str(request.data(), chars_read);
+    request_received_.Parse(json_str.c_str());
+
+    mg_printf(conn, "HTTP/1.1 204 OK\r\n");
+    mg_printf(conn, "Content-length: 0");
+    mg_printf(conn, "\r\n\r\n");
+    return true;
+  }
+
+  rapidjson::Document request_received_;
+  std::string tenant_id_set_;
+  std::string authorization_set_;
+};
+
+class MockGrafanaLoki {
+ public:
+  explicit MockGrafanaLoki(std::string port) : port_(std::move(port)) {
+    std::vector<std::string> options;
+    options.emplace_back("listening_ports");
+    options.emplace_back(port_);
+
+    server_ = std::make_unique<CivetServer>(options, &callbacks_, &logger_);
+    loki_handler_ = std::make_unique<GrafanaLokiHandler>();
+    server_->addHandler("/loki/api/v1/push", *loki_handler_);
+  }
+
+  [[nodiscard]] const std::string& getPort() const {
+    return port_;
+  }
+
+  const rapidjson::Document& getLastRequest() const {
+    return loki_handler_->getLastRequest();
+  }
+
+  std::string getLastTenantId() const {
+    return loki_handler_->getLastTenantId();
+  }
+
+  std::string getLastAuthorization() const {
+    return loki_handler_->getLastAuthorization();
+  }
+
+ private:
+  CivetLibrary lib_;
+  std::string port_;
+  std::unique_ptr<CivetServer> server_;
+  std::unique_ptr<GrafanaLokiHandler> loki_handler_;
+
+  CivetCallbacks callbacks_;
+  std::shared_ptr<org::apache::nifi::minifi::core::logging::Logger> logger_ = 
org::apache::nifi::minifi::core::logging::LoggerFactory<MockGrafanaLoki>::getLogger();

Review Comment:
   can we make these non-pointers?
   ```suggestion
     CivetCallbacks callbacks_;
     std::shared_ptr<org::apache::nifi::minifi::core::logging::Logger> logger_ 
= 
org::apache::nifi::minifi::core::logging::LoggerFactory<MockGrafanaLoki>::getLogger();
     CivetServer server_{{"listening ports", port_}, &callbacks_, &logger_};
     GrafanaLokiHandler loki_handler_;
   ```



##########
extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp:
##########
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PushGrafanaLokiREST.h"
+#include "MockGrafanaLoki.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+#include "utils/StringUtils.h"
+#include "utils/TestUtils.h"
+
+namespace org::apache::nifi::minifi::extensions::grafana::loki::test {
+
+TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, ""));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1"));
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+TEST_CASE("Valid stream labels need to be set", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "1"));
+  SECTION("Stream labels cannot be empty") {
+    test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "");
+  }
+  SECTION("Stream labels need to be valid") {
+    test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "invalidlabels,invalidlabels2");
+  }
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+TEST_CASE("Log Line Batch Size cannot be 0", "[PushGrafanaLokiREST]") {
+  auto push_grafana_loki_rest = 
std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST");
+  minifi::test::SingleProcessorTestController 
test_controller(push_grafana_loki_rest);
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+  CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  test_controller.plan->setProperty(push_grafana_loki_rest, 
PushGrafanaLokiREST::LogLineBatchSize, "0");
+  REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception);
+}
+
+class PushGrafanaLokiRESTTestFixture {
+ public:
+  PushGrafanaLokiRESTTestFixture()
+      : mock_loki_("10990"),
+        
push_grafana_loki_rest_(std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST")),
+        test_controller_(push_grafana_loki_rest_) {
+    LogTestController::getInstance().setDebug<TestPlan>();
+    LogTestController::getInstance().setDebug<minifi::core::Processor>();
+    LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
+    LogTestController::getInstance().setTrace<PushGrafanaLokiREST>();
+    CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, 
PushGrafanaLokiREST::Url, "localhost:10990"));
+    CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, 
PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"));
+  }
+
+  void setProperty(const auto& property, const std::string& property_value) {
+    CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, 
property, property_value));
+  }
+
+  void verifyLastRequestIsEmpty() {
+    const auto& request = mock_loki_.getLastRequest();
+    REQUIRE(request.IsNull());
+  }
+
+  void verifyTenantId(const std::string& tenant_id) {
+    REQUIRE(mock_loki_.getLastTenantId() == tenant_id);
+  }
+
+  void verifyBasicAuthorization(const std::string& 
expected_username_and_password) {
+    auto last_authorization = mock_loki_.getLastAuthorization();
+    std::string expected_authorization = "Basic ";
+    REQUIRE(minifi::utils::StringUtils::startsWith(last_authorization, 
expected_authorization));
+    std::string username_and_password_decoded = 
minifi::utils::StringUtils::from_base64(last_authorization.substr(expected_authorization.size()),
 minifi::utils::as_string_tag_t{});
+    REQUIRE(username_and_password_decoded == expected_username_and_password);
+  }
+
+  void verifyBearerTokenAuthorization(const std::string& 
expected_bearer_token) {
+    auto last_authorization = mock_loki_.getLastAuthorization();
+    std::string expected_authorization = "Bearer ";
+    REQUIRE(minifi::utils::StringUtils::startsWith(last_authorization, 
expected_authorization));
+    auto bearer_token = 
last_authorization.substr(expected_authorization.size());
+    REQUIRE(bearer_token == expected_bearer_token);
+  }
+
+  void verifyStreamLabels() {
+    const auto& request = mock_loki_.getLastRequest();
+    REQUIRE(request.HasMember("streams"));
+    const auto& stream_array = request["streams"].GetArray();
+    REQUIRE(stream_array.Size() == 1);
+    REQUIRE(stream_array[0].HasMember("stream"));
+    const auto& stream = stream_array[0]["stream"].GetObject();
+    REQUIRE(stream.HasMember("job"));
+    std::string job_string = stream["job"].GetString();
+    REQUIRE(job_string == "minifi");
+    REQUIRE(stream.HasMember("directory"));
+    std::string directory_string = stream["directory"].GetString();
+    REQUIRE(directory_string == "/opt/minifi/logs/");
+  }
+
+  void verifySentRequestToLoki(uint64_t start_timestamp, const 
std::vector<std::string>& expected_log_values,
+      const std::vector<std::map<std::string, std::string>>& 
expected_log_line_attribute_values = {}) {
+    const auto& request = mock_loki_.getLastRequest();
+    REQUIRE(request.HasMember("streams"));
+    const auto& stream_array = request["streams"].GetArray();
+    REQUIRE(stream_array[0].HasMember("values"));

Review Comment:
   I think we should check that the array is not empty before taking its first 
element



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to