fgerlits commented on code in PR #1695: URL: https://github.com/apache/nifi-minifi-cpp/pull/1695#discussion_r1425597679
########## docker/test/integration/cluster/checkers/GrafanaLokiChecker.py: ########## @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import requests +from typing import List +from utils import wait_for + + +class GrafanaLokiChecker: + def __init__(self): + self.url = "localhost:3100/loki/api/v1/query" + + def veify_log_lines_on_grafana_loki(self, lines: List[str], ssl: bool, tenant_id: str): + labels = '{job="minifi"}' + prefix = "http://" + if ssl: + prefix = "https://" + + query_url = f"{prefix}{self.url}?query={labels}" + + headers = None + if tenant_id: + headers = {'X-Scope-OrgID': tenant_id} + + response = requests.get(query_url, verify=False, timeout=30, headers=headers) + if response.status_code >= 200 and response.status_code < 300: Review Comment: this conditional could be reversed, so the `return True` happens at the end of the function ########## extensions/grafana-loki/PushGrafanaLokiREST.cpp: ########## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PushGrafanaLokiREST.h" + +#include <utility> +#include <fstream> +#include <filesystem> + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& flowfile) { + gsl_Expects(state_manager_); + if (log_line_batch_wait_ && batched_flowfiles_.empty()) { + start_push_time_ = std::chrono::steady_clock::now(); + std::unordered_map<std::string, std::string> state; + state["start_push_time"] = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count()); + logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]); + state_manager_->set(state); + } + batched_flowfiles_.push_back(flowfile); +} + +void PushGrafanaLokiREST::LogBatch::restore(const std::shared_ptr<core::FlowFile>& flowfile) { + batched_flowfiles_.push_back(flowfile); +} + +std::vector<std::shared_ptr<core::FlowFile>> PushGrafanaLokiREST::LogBatch::flush() { + gsl_Expects(state_manager_); + start_push_time_ = {}; + auto result = batched_flowfiles_; + batched_flowfiles_.clear(); + if (log_line_batch_wait_) { + start_push_time_ = {}; + std::unordered_map<std::string, std::string> state; + logger_->log_debug("Reset start push time state"); + state["start_push_time"] = "0"; + state_manager_->set(state); + } + return result; +} + +bool PushGrafanaLokiREST::LogBatch::isReady() const { + return (log_line_batch_size_ && batched_flowfiles_.size() >= *log_line_batch_size_) || (log_line_batch_wait_ && std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_); +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional<uint64_t> log_line_batch_size) { + log_line_batch_size_ = log_line_batch_size; +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional<std::chrono::milliseconds> log_line_batch_wait) { + log_line_batch_wait_ = log_line_batch_wait; +} + +void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* state_manager) { + state_manager_ = state_manager; +} + +void PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point start_push_time) { + start_push_time_ = start_push_time; +} + +const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the FlowFile to be owned by this processor"); + +void PushGrafanaLokiREST::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +namespace { +auto getSSLContextService(core::ProcessContext& context) { + if (auto ssl_context = context.getProperty(PushGrafanaLokiREST::SSLContextService)) { + return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context)); + } + return std::shared_ptr<minifi::controllers::SSLContextService>{}; +} + +std::string readLogLineFromFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) { + auto read_buffer_result = session.readBuffer(flow_file); + return {reinterpret_cast<const char*>(read_buffer_result.buffer.data()), read_buffer_result.buffer.size()}; Review Comment: there is a function for this in `ProcessSession.h`: ```suggestion return to_string(read_buffer_result); ``` or it could even be inlined, as this function is only used in one place: ```c++ auto line = to_string(session.readBuffer(flow_file)); ``` ########## extensions/grafana-loki/PushGrafanaLokiREST.h: ########## @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <string> +#include <memory> +#include <vector> +#include <map> + +#include "controllers/SSLContextService.h" +#include "core/Processor.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/PropertyType.h" +#include "core/RelationshipDefinition.h" +#include "client/HTTPClient.h" +#include "core/StateManager.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +class PushGrafanaLokiREST : public core::Processor { + public: + EXTENSIONAPI static constexpr const char* Description = "A Grafana Loki push processor that uses the Grafana Loki REST API. The processor expects each flow file to contain a single log line to be " + "pushed to Grafana Loki, therefore it is usually used together with the TailFile processor."; + + explicit PushGrafanaLokiREST(const std::string& name, const utils::Identifier& uuid = {}) + : Processor(name, uuid), + log_batch_(logger_) { + } + ~PushGrafanaLokiREST() override = default; + + EXTENSIONAPI static constexpr auto Url = core::PropertyDefinitionBuilder<>::createProperty("Url") + .withDescription("Url of the Grafana Loki server. For example http://localhost:3100/.") + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto StreamLabels = core::PropertyDefinitionBuilder<>::createProperty("Stream Labels") + .withDescription("Comma separated list of <key>=<value> labels to be sent as stream labels.") + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto LogLineMetadataAttributes = core::PropertyDefinitionBuilder<>::createProperty("Log Line Metadata Attributes") + .withDescription("Comma separated list of attributes to be sent as log line metadata for a log line.") + .build(); + EXTENSIONAPI static constexpr auto TenantID = core::PropertyDefinitionBuilder<>::createProperty("Tenant ID") + .withDescription("The tenant ID used by default to push logs to Grafana Loki. If omitted or empty it assumes Grafana Loki is running in single-tenant mode and no X-Scope-OrgID header is sent.") + .build(); + EXTENSIONAPI static constexpr auto MaxBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Max Batch Size") + .withDescription("The maximum number of flow files to process at a time. If not set, or set to 0, all FlowFiles will be processed at once.") + .withPropertyType(core::StandardPropertyTypes::UNSIGNED_LONG_TYPE) + .withDefaultValue("100") + .build(); + EXTENSIONAPI static constexpr auto LogLineBatchWait = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Wait") + .withDescription("Time to wait before sending a log line batch to Grafana Loki, full or not. If this property and Log Line Batch Size are both unset, " + "the log batch of the current trigger will be sent immediately.") + .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) + .build(); + EXTENSIONAPI static constexpr auto LogLineBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Log Line Batch Size") + .withDescription("Number of log lines to send in a batch to Loki. If this property and Log Line Batch Wait are both unset, " + "the log batch of the current trigger will be sent immediately.") + .withPropertyType(core::StandardPropertyTypes::UNSIGNED_INT_TYPE) + .build(); + EXTENSIONAPI static constexpr auto ConnectTimeout = core::PropertyDefinitionBuilder<>::createProperty("Connection Timeout") + .withDescription("Max wait time for connection to the Grafana Loki service.") + .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) + .withDefaultValue("5 s") + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto ReadTimeout = core::PropertyDefinitionBuilder<>::createProperty("Read Timeout") + .withDescription("Max wait time for response from remote service.") + .withPropertyType(core::StandardPropertyTypes::TIME_PERIOD_TYPE) + .withDefaultValue("15 s") + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto SSLContextService = core::PropertyDefinitionBuilder<>::createProperty("SSL Context Service") + .withDescription("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.") + .withAllowedTypes<minifi::controllers::SSLContextService>() + .build(); + EXTENSIONAPI static constexpr auto Username = core::PropertyDefinitionBuilder<>::createProperty("Username") + .withDescription("Username for authenticating using basic authentication.") + .build(); + EXTENSIONAPI static constexpr auto Password = core::PropertyDefinitionBuilder<>::createProperty("Password") + .withDescription("Password for authenticating using basic authentication.") + .build(); + EXTENSIONAPI static constexpr auto BearerTokenFile = core::PropertyDefinitionBuilder<>::createProperty("Bearer Token File") + .withDescription("Path of file containing bearer token for bearer token authentication.") + .build(); + EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 13>{ + Url, + StreamLabels, + LogLineMetadataAttributes, + TenantID, + MaxBatchSize, + LogLineBatchWait, + LogLineBatchSize, + ConnectTimeout, + ReadTimeout, + SSLContextService, + Username, + Password, + BearerTokenFile + }; + + EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "All flowfiles that succeed in being transferred into Grafana Loki go here."}; + EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "All flowfiles that fail for reasons unrelated to server availability go to this relationship."}; Review Comment: what happens to flow files that fail because of server (un)availability? from the code it looks like those get transferred to `failure`, as well ########## extensions/grafana-loki/PushGrafanaLokiREST.cpp: ########## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PushGrafanaLokiREST.h" + +#include <utility> +#include <fstream> +#include <filesystem> + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& flowfile) { + gsl_Expects(state_manager_); + if (log_line_batch_wait_ && batched_flowfiles_.empty()) { + start_push_time_ = std::chrono::steady_clock::now(); + std::unordered_map<std::string, std::string> state; + state["start_push_time"] = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count()); + logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]); + state_manager_->set(state); + } + batched_flowfiles_.push_back(flowfile); +} + +void PushGrafanaLokiREST::LogBatch::restore(const std::shared_ptr<core::FlowFile>& flowfile) { + batched_flowfiles_.push_back(flowfile); +} + +std::vector<std::shared_ptr<core::FlowFile>> PushGrafanaLokiREST::LogBatch::flush() { + gsl_Expects(state_manager_); + start_push_time_ = {}; + auto result = batched_flowfiles_; + batched_flowfiles_.clear(); + if (log_line_batch_wait_) { + start_push_time_ = {}; + std::unordered_map<std::string, std::string> state; + logger_->log_debug("Reset start push time state"); + state["start_push_time"] = "0"; + state_manager_->set(state); + } + return result; +} + +bool PushGrafanaLokiREST::LogBatch::isReady() const { + return (log_line_batch_size_ && batched_flowfiles_.size() >= *log_line_batch_size_) || (log_line_batch_wait_ && std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_); +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional<uint64_t> log_line_batch_size) { + log_line_batch_size_ = log_line_batch_size; +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional<std::chrono::milliseconds> log_line_batch_wait) { + log_line_batch_wait_ = log_line_batch_wait; +} + +void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* state_manager) { + state_manager_ = state_manager; +} + +void PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point start_push_time) { + start_push_time_ = start_push_time; +} + +const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the FlowFile to be owned by this processor"); + +void PushGrafanaLokiREST::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +namespace { +auto getSSLContextService(core::ProcessContext& context) { + if (auto ssl_context = context.getProperty(PushGrafanaLokiREST::SSLContextService)) { + return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context)); + } + return std::shared_ptr<minifi::controllers::SSLContextService>{}; +} + +std::string readLogLineFromFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) { + auto read_buffer_result = session.readBuffer(flow_file); + return {reinterpret_cast<const char*>(read_buffer_result.buffer.data()), read_buffer_result.buffer.size()}; +} +} // namespace + +void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) { + auto state_manager = context.getStateManager(); + if (state_manager == nullptr) { + throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager"); + } + log_batch_.setStateManager(state_manager); + + std::unordered_map<std::string, std::string> state_map; + if (state_manager->get(state_map)) { + auto it = state_map.find("start_push_time"); + if (it != state_map.end()) { + logger_->log_info("Restored start push time from processor state: {}", it->second); + std::chrono::steady_clock::time_point start_push_time{std::chrono::milliseconds{std::stoll(it->second)}}; + log_batch_.setStartPushTime(start_push_time); + } + } +} + +void PushGrafanaLokiREST::setUpStreamLabels(core::ProcessContext& context) { + if (auto stream_labels_str = context.getProperty(StreamLabels)) { + auto stream_labels = utils::StringUtils::splitAndTrimRemovingEmpty(*stream_labels_str, ","); + if (stream_labels.empty()) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Label Attributes"); + } + for (const auto& label : stream_labels) { + auto stream_labels = utils::StringUtils::splitAndTrimRemovingEmpty(label, "="); + if (stream_labels.size() != 2) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Label Attributes"); + } + stream_label_attributes_[stream_labels[0]] = stream_labels[1]; + } + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Label Attributes"); + } +} + +void PushGrafanaLokiREST::setupClientTimeouts(const core::ProcessContext& context) { + if (auto connection_timeout = context.getProperty<core::TimePeriodValue>(PushGrafanaLokiREST::ConnectTimeout)) { + client_.setConnectionTimeout(connection_timeout->getMilliseconds()); + } + + if (auto read_timeout = context.getProperty<core::TimePeriodValue>(PushGrafanaLokiREST::ReadTimeout)) { + client_.setReadTimeout(read_timeout->getMilliseconds()); + } +} + +void PushGrafanaLokiREST::setAuthorization(const core::ProcessContext& context) { + if (auto username = context.getProperty(PushGrafanaLokiREST::Username)) { + auto password = context.getProperty(PushGrafanaLokiREST::Password); + if (!password) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Username is set, but Password property is not!"); + } + std::string auth = *username + ":" + *password; + auto base64_encoded_auth = utils::StringUtils::to_base64(auth); + client_.setRequestHeader("Authorization", "Basic " + base64_encoded_auth); + } else if (auto bearer_token_file = context.getProperty(PushGrafanaLokiREST::BearerTokenFile)) { + if (!std::filesystem::exists(*bearer_token_file) || !std::filesystem::is_regular_file(*bearer_token_file)) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bearer Token File is not a regular file!"); + } + std::ifstream file(*bearer_token_file); Review Comment: should this be opened in binary mode? or it doesn't matter if we use `rdbuf()`? ########## extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp: ########## @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../PushGrafanaLokiREST.h" +#include "MockGrafanaLoki.h" +#include "SingleProcessorTestController.h" +#include "Catch.h" +#include "utils/StringUtils.h" +#include "utils/TestUtils.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki::test { + +TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1")); + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +TEST_CASE("Valid stream labels need to be set", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "localhost:10990")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1")); + SECTION("Stream labels cannot be empty") { + test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, ""); + } + SECTION("Stream labels need to be valid") { + test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "invalidlabels,invalidlabels2"); + } + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +TEST_CASE("Log Line Batch Size cannot be 0", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "localhost:10990")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "0"); + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +class PushGrafanaLokiRESTTestFixture { + public: + PushGrafanaLokiRESTTestFixture() + : mock_loki_("10990"), + push_grafana_loki_rest_(std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST")), + test_controller_(push_grafana_loki_rest_) { + LogTestController::getInstance().setDebug<TestPlan>(); + LogTestController::getInstance().setDebug<minifi::core::Processor>(); + LogTestController::getInstance().setTrace<minifi::core::ProcessSession>(); + LogTestController::getInstance().setTrace<PushGrafanaLokiREST>(); + CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, PushGrafanaLokiREST::Url, "localhost:10990")); + CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + } + + void setProperty(const auto& property, const std::string& property_value) { + CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, property, property_value)); + } + + void verifyLastRequestIsEmpty() { + const auto& request = mock_loki_.getLastRequest(); + REQUIRE(request.IsNull()); + } + + void verifyTenantId(const std::string& tenant_id) { + REQUIRE(mock_loki_.getLastTenantId() == tenant_id); + } + + void verifyBasicAuthorization(const std::string& expected_username_and_password) { + auto last_authorization = mock_loki_.getLastAuthorization(); + std::string expected_authorization = "Basic "; + REQUIRE(minifi::utils::StringUtils::startsWith(last_authorization, expected_authorization)); Review Comment: I would prefer to remove the `utils::StringUtils` alias and use `utils::string` instead (https://github.com/apache/nifi-minifi-cpp/pull/1710) ########## bootstrap.sh: ########## @@ -339,6 +339,9 @@ add_option PROMETHEUS_ENABLED ${TRUE} "ENABLE_PROMETHEUS" add_option OPENSSL_ENABLED ${TRUE} "OPENSSL_OFF" add_dependency OPENSSL_ENABLED "opensslbuild" +add_option GRAFANA_LOKI_ENABLED ${TRUE} "ENABLE_GRAFANA_LOKI" +set_dependency GRAFANA_LOKI_ENABLED HTTP_CURL_ENABLED Review Comment: should this be `FALSE` by default? both the CMake flag and the corresponding flag in `win_build_vs.bat` default to `OFF` ########## extensions/grafana-loki/PushGrafanaLokiREST.cpp: ########## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PushGrafanaLokiREST.h" + +#include <utility> +#include <fstream> +#include <filesystem> + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& flowfile) { + gsl_Expects(state_manager_); + if (log_line_batch_wait_ && batched_flowfiles_.empty()) { + start_push_time_ = std::chrono::steady_clock::now(); + std::unordered_map<std::string, std::string> state; + state["start_push_time"] = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count()); + logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]); + state_manager_->set(state); + } + batched_flowfiles_.push_back(flowfile); +} + +void PushGrafanaLokiREST::LogBatch::restore(const std::shared_ptr<core::FlowFile>& flowfile) { + batched_flowfiles_.push_back(flowfile); +} + +std::vector<std::shared_ptr<core::FlowFile>> PushGrafanaLokiREST::LogBatch::flush() { + gsl_Expects(state_manager_); + start_push_time_ = {}; + auto result = batched_flowfiles_; + batched_flowfiles_.clear(); + if (log_line_batch_wait_) { + start_push_time_ = {}; + std::unordered_map<std::string, std::string> state; + logger_->log_debug("Reset start push time state"); + state["start_push_time"] = "0"; + state_manager_->set(state); + } + return result; +} + +bool PushGrafanaLokiREST::LogBatch::isReady() const { + return (log_line_batch_size_ && batched_flowfiles_.size() >= *log_line_batch_size_) || (log_line_batch_wait_ && std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_); +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional<uint64_t> log_line_batch_size) { + log_line_batch_size_ = log_line_batch_size; +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional<std::chrono::milliseconds> log_line_batch_wait) { + log_line_batch_wait_ = log_line_batch_wait; +} + +void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* state_manager) { + state_manager_ = state_manager; +} + +void PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point start_push_time) { + start_push_time_ = start_push_time; +} + +const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the FlowFile to be owned by this processor"); + +void PushGrafanaLokiREST::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +namespace { +auto getSSLContextService(core::ProcessContext& context) { + if (auto ssl_context = context.getProperty(PushGrafanaLokiREST::SSLContextService)) { + return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context)); + } + return std::shared_ptr<minifi::controllers::SSLContextService>{}; +} + +std::string readLogLineFromFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) { + auto read_buffer_result = session.readBuffer(flow_file); + return {reinterpret_cast<const char*>(read_buffer_result.buffer.data()), read_buffer_result.buffer.size()}; +} +} // namespace + +void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) { + auto state_manager = context.getStateManager(); + if (state_manager == nullptr) { + throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager"); + } + log_batch_.setStateManager(state_manager); + + std::unordered_map<std::string, std::string> state_map; + if (state_manager->get(state_map)) { + auto it = state_map.find("start_push_time"); + if (it != state_map.end()) { + logger_->log_info("Restored start push time from processor state: {}", it->second); + std::chrono::steady_clock::time_point start_push_time{std::chrono::milliseconds{std::stoll(it->second)}}; + log_batch_.setStartPushTime(start_push_time); Review Comment: This is not going to work well if the box is rebooted, because the `steady_clock` usually restarts from zero at startup. So we will restore a `start_push_time_` of "50 days after reboot" from the saved state, when `now()` is "1 minute after reboot" -- so we will now wait 50 days + `log_line_batch_wait_` until we send out the next batch. Maybe we could use some in-memory-only static store instead of the state manager? ########## extensions/grafana-loki/PushGrafanaLokiREST.cpp: ########## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PushGrafanaLokiREST.h" + +#include <utility> +#include <fstream> +#include <filesystem> + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& flowfile) { + gsl_Expects(state_manager_); + if (log_line_batch_wait_ && batched_flowfiles_.empty()) { + start_push_time_ = std::chrono::steady_clock::now(); + std::unordered_map<std::string, std::string> state; + state["start_push_time"] = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count()); + logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]); + state_manager_->set(state); + } + batched_flowfiles_.push_back(flowfile); +} + +void PushGrafanaLokiREST::LogBatch::restore(const std::shared_ptr<core::FlowFile>& flowfile) { + batched_flowfiles_.push_back(flowfile); +} + +std::vector<std::shared_ptr<core::FlowFile>> PushGrafanaLokiREST::LogBatch::flush() { + gsl_Expects(state_manager_); + start_push_time_ = {}; + auto result = batched_flowfiles_; + batched_flowfiles_.clear(); + if (log_line_batch_wait_) { + start_push_time_ = {}; Review Comment: very minor, but we have already set this at line 52 ########## extensions/grafana-loki/PushGrafanaLokiREST.cpp: ########## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PushGrafanaLokiREST.h" + +#include <utility> +#include <fstream> +#include <filesystem> + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& flowfile) { + gsl_Expects(state_manager_); + if (log_line_batch_wait_ && batched_flowfiles_.empty()) { + start_push_time_ = std::chrono::steady_clock::now(); + std::unordered_map<std::string, std::string> state; + state["start_push_time"] = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count()); + logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]); + state_manager_->set(state); + } + batched_flowfiles_.push_back(flowfile); +} + +void PushGrafanaLokiREST::LogBatch::restore(const std::shared_ptr<core::FlowFile>& flowfile) { + batched_flowfiles_.push_back(flowfile); +} + +std::vector<std::shared_ptr<core::FlowFile>> PushGrafanaLokiREST::LogBatch::flush() { + gsl_Expects(state_manager_); + start_push_time_ = {}; + auto result = batched_flowfiles_; + batched_flowfiles_.clear(); + if (log_line_batch_wait_) { + start_push_time_ = {}; + std::unordered_map<std::string, std::string> state; + logger_->log_debug("Reset start push time state"); + state["start_push_time"] = "0"; + state_manager_->set(state); + } + return result; +} + +bool PushGrafanaLokiREST::LogBatch::isReady() const { + return (log_line_batch_size_ && batched_flowfiles_.size() >= *log_line_batch_size_) || (log_line_batch_wait_ && std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_); +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional<uint64_t> log_line_batch_size) { + log_line_batch_size_ = log_line_batch_size; +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional<std::chrono::milliseconds> log_line_batch_wait) { + log_line_batch_wait_ = log_line_batch_wait; +} + +void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* state_manager) { + state_manager_ = state_manager; +} + +void PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point start_push_time) { + start_push_time_ = start_push_time; +} + +const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the FlowFile to be owned by this processor"); + +void PushGrafanaLokiREST::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +namespace { +auto getSSLContextService(core::ProcessContext& context) { + if (auto ssl_context = context.getProperty(PushGrafanaLokiREST::SSLContextService)) { + return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context)); + } + return std::shared_ptr<minifi::controllers::SSLContextService>{}; +} + +std::string readLogLineFromFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) { + auto read_buffer_result = session.readBuffer(flow_file); + return {reinterpret_cast<const char*>(read_buffer_result.buffer.data()), read_buffer_result.buffer.size()}; +} +} // namespace + +void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) { + auto state_manager = context.getStateManager(); + if (state_manager == nullptr) { + throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager"); + } + log_batch_.setStateManager(state_manager); + + std::unordered_map<std::string, std::string> state_map; + if (state_manager->get(state_map)) { + auto it = state_map.find("start_push_time"); + if (it != state_map.end()) { + logger_->log_info("Restored start push time from processor state: {}", it->second); + std::chrono::steady_clock::time_point start_push_time{std::chrono::milliseconds{std::stoll(it->second)}}; + log_batch_.setStartPushTime(start_push_time); + } + } +} + +void PushGrafanaLokiREST::setUpStreamLabels(core::ProcessContext& context) { + if (auto stream_labels_str = context.getProperty(StreamLabels)) { + auto stream_labels = utils::StringUtils::splitAndTrimRemovingEmpty(*stream_labels_str, ","); + if (stream_labels.empty()) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Label Attributes"); Review Comment: nitpicking, but I would write "Missing or invalid Stream Labels property" ########## extensions/grafana-loki/tests/MockGrafanaLoki.h: ########## @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <string> +#include <utility> +#include <vector> +#include <CivetServer.h> +#include "tests/CivetLibrary.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "rapidjson/document.h" +#include "rapidjson/writer.h" +#include "rapidjson/stringbuffer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki::test { + +class GrafanaLokiHandler : public CivetHandler { + public: + const rapidjson::Document& getLastRequest() const { + return request_received_; + } + + std::string getLastTenantId() const { + return tenant_id_set_; + } + + std::string getLastAuthorization() const { + return authorization_set_; + } + + private: + bool handlePost(CivetServer*, struct mg_connection* conn) override { + tenant_id_set_.clear(); + authorization_set_.clear(); + const char *org_id = mg_get_header(conn, "X-Scope-OrgID"); + if (org_id != nullptr) { + tenant_id_set_ = org_id; + } + + const char *authorization = mg_get_header(conn, "Authorization"); + if (authorization != nullptr) { + authorization_set_ = authorization; + } + + std::array<char, 2048> request; + size_t chars_read = mg_read(conn, request.data(), 2048); Review Comment: `mg_read` can return `-1` on errors; we should handle that case ########## extensions/grafana-loki/PushGrafanaLokiREST.cpp: ########## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PushGrafanaLokiREST.h" + +#include <utility> +#include <fstream> +#include <filesystem> + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "utils/ProcessorConfigUtils.h" +#include "utils/StringUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/stream.h" +#include "rapidjson/writer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki { + +void PushGrafanaLokiREST::LogBatch::add(const std::shared_ptr<core::FlowFile>& flowfile) { + gsl_Expects(state_manager_); + if (log_line_batch_wait_ && batched_flowfiles_.empty()) { + start_push_time_ = std::chrono::steady_clock::now(); + std::unordered_map<std::string, std::string> state; + state["start_push_time"] = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(start_push_time_.time_since_epoch()).count()); + logger_->log_debug("Saved start push time to state: {}", state["start_push_time"]); + state_manager_->set(state); + } + batched_flowfiles_.push_back(flowfile); +} + +void PushGrafanaLokiREST::LogBatch::restore(const std::shared_ptr<core::FlowFile>& flowfile) { + batched_flowfiles_.push_back(flowfile); +} + +std::vector<std::shared_ptr<core::FlowFile>> PushGrafanaLokiREST::LogBatch::flush() { + gsl_Expects(state_manager_); + start_push_time_ = {}; + auto result = batched_flowfiles_; + batched_flowfiles_.clear(); + if (log_line_batch_wait_) { + start_push_time_ = {}; + std::unordered_map<std::string, std::string> state; + logger_->log_debug("Reset start push time state"); + state["start_push_time"] = "0"; + state_manager_->set(state); + } + return result; +} + +bool PushGrafanaLokiREST::LogBatch::isReady() const { + return (log_line_batch_size_ && batched_flowfiles_.size() >= *log_line_batch_size_) || (log_line_batch_wait_ && std::chrono::steady_clock::now() - start_push_time_ >= *log_line_batch_wait_); +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchSize(std::optional<uint64_t> log_line_batch_size) { + log_line_batch_size_ = log_line_batch_size; +} + +void PushGrafanaLokiREST::LogBatch::setLogLineBatchWait(std::optional<std::chrono::milliseconds> log_line_batch_wait) { + log_line_batch_wait_ = log_line_batch_wait; +} + +void PushGrafanaLokiREST::LogBatch::setStateManager(core::StateManager* state_manager) { + state_manager_ = state_manager; +} + +void PushGrafanaLokiREST::LogBatch::setStartPushTime(std::chrono::steady_clock::time_point start_push_time) { + start_push_time_ = start_push_time; +} + +const core::Relationship PushGrafanaLokiREST::Self("__self__", "Marks the FlowFile to be owned by this processor"); + +void PushGrafanaLokiREST::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +namespace { +auto getSSLContextService(core::ProcessContext& context) { + if (auto ssl_context = context.getProperty(PushGrafanaLokiREST::SSLContextService)) { + return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context)); + } + return std::shared_ptr<minifi::controllers::SSLContextService>{}; +} + +std::string readLogLineFromFlowFile(const std::shared_ptr<core::FlowFile>& flow_file, core::ProcessSession& session) { + auto read_buffer_result = session.readBuffer(flow_file); + return {reinterpret_cast<const char*>(read_buffer_result.buffer.data()), read_buffer_result.buffer.size()}; +} +} // namespace + +void PushGrafanaLokiREST::setUpStateManager(core::ProcessContext& context) { + auto state_manager = context.getStateManager(); + if (state_manager == nullptr) { + throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager"); + } + log_batch_.setStateManager(state_manager); + + std::unordered_map<std::string, std::string> state_map; + if (state_manager->get(state_map)) { + auto it = state_map.find("start_push_time"); + if (it != state_map.end()) { + logger_->log_info("Restored start push time from processor state: {}", it->second); + std::chrono::steady_clock::time_point start_push_time{std::chrono::milliseconds{std::stoll(it->second)}}; + log_batch_.setStartPushTime(start_push_time); + } + } +} + +void PushGrafanaLokiREST::setUpStreamLabels(core::ProcessContext& context) { + if (auto stream_labels_str = context.getProperty(StreamLabels)) { + auto stream_labels = utils::StringUtils::splitAndTrimRemovingEmpty(*stream_labels_str, ","); + if (stream_labels.empty()) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Label Attributes"); + } + for (const auto& label : stream_labels) { + auto stream_labels = utils::StringUtils::splitAndTrimRemovingEmpty(label, "="); + if (stream_labels.size() != 2) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Label Attributes"); + } + stream_label_attributes_[stream_labels[0]] = stream_labels[1]; + } + } else { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid Stream Label Attributes"); + } +} + +void PushGrafanaLokiREST::setupClientTimeouts(const core::ProcessContext& context) { + if (auto connection_timeout = context.getProperty<core::TimePeriodValue>(PushGrafanaLokiREST::ConnectTimeout)) { + client_.setConnectionTimeout(connection_timeout->getMilliseconds()); + } + + if (auto read_timeout = context.getProperty<core::TimePeriodValue>(PushGrafanaLokiREST::ReadTimeout)) { + client_.setReadTimeout(read_timeout->getMilliseconds()); + } +} + +void PushGrafanaLokiREST::setAuthorization(const core::ProcessContext& context) { + if (auto username = context.getProperty(PushGrafanaLokiREST::Username)) { + auto password = context.getProperty(PushGrafanaLokiREST::Password); + if (!password) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Username is set, but Password property is not!"); + } + std::string auth = *username + ":" + *password; + auto base64_encoded_auth = utils::StringUtils::to_base64(auth); + client_.setRequestHeader("Authorization", "Basic " + base64_encoded_auth); + } else if (auto bearer_token_file = context.getProperty(PushGrafanaLokiREST::BearerTokenFile)) { + if (!std::filesystem::exists(*bearer_token_file) || !std::filesystem::is_regular_file(*bearer_token_file)) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bearer Token File is not a regular file!"); + } + std::ifstream file(*bearer_token_file); + std::stringstream buffer; + buffer << file.rdbuf(); + std::string bearer_token = utils::StringUtils::trim(buffer.str()); + client_.setRequestHeader("Authorization", "Bearer " + bearer_token); + } else { + client_.setRequestHeader("Authorization", std::nullopt); + } +} + +void PushGrafanaLokiREST::initializeHttpClient(core::ProcessContext& context) { + auto url = utils::getRequiredPropertyOrThrow<std::string>(context, Url.name); + if (url.empty()) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Url property cannot be empty!"); + } + if (utils::StringUtils::endsWith(url, "/")) { + url += "loki/api/v1/push"; + } else { + url += "/loki/api/v1/push"; + } + logger_->log_debug("PushGrafanaLokiREST push url is set to: {}", url); + client_.initialize(utils::HttpRequestMethod::POST, url, getSSLContextService(context)); +} + +void PushGrafanaLokiREST::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + setUpStateManager(context); + initializeHttpClient(context); + client_.setContentType("application/json"); + client_.setFollowRedirects(true); + + setUpStreamLabels(context); + + if (auto log_line_metadata_attributes = context.getProperty(LogLineMetadataAttributes)) { + log_line_metadata_attributes_ = utils::StringUtils::splitAndTrimRemovingEmpty(*log_line_metadata_attributes, ","); + } + + auto tenant_id = context.getProperty(TenantID); + if (tenant_id && !tenant_id->empty()) { + client_.setRequestHeader("X-Scope-OrgID", tenant_id); + } else { + client_.setRequestHeader("X-Scope-OrgID", std::nullopt); + } + auto log_line_batch_wait = context.getProperty<core::TimePeriodValue>(LogLineBatchWait); + auto log_line_batch_size = context.getProperty<uint64_t>(LogLineBatchSize); + if (log_line_batch_size && *log_line_batch_size < 1) { Review Comment: bad indentation ########## extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp: ########## @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../PushGrafanaLokiREST.h" +#include "MockGrafanaLoki.h" +#include "SingleProcessorTestController.h" +#include "Catch.h" +#include "utils/StringUtils.h" +#include "utils/TestUtils.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki::test { + +TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1")); + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +TEST_CASE("Valid stream labels need to be set", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "localhost:10990")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1")); + SECTION("Stream labels cannot be empty") { + test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, ""); + } + SECTION("Stream labels need to be valid") { + test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "invalidlabels,invalidlabels2"); + } + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +TEST_CASE("Log Line Batch Size cannot be 0", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "localhost:10990")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "0"); + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +class PushGrafanaLokiRESTTestFixture { + public: + PushGrafanaLokiRESTTestFixture() + : mock_loki_("10990"), + push_grafana_loki_rest_(std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST")), + test_controller_(push_grafana_loki_rest_) { + LogTestController::getInstance().setDebug<TestPlan>(); + LogTestController::getInstance().setDebug<minifi::core::Processor>(); + LogTestController::getInstance().setTrace<minifi::core::ProcessSession>(); + LogTestController::getInstance().setTrace<PushGrafanaLokiREST>(); + CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, PushGrafanaLokiREST::Url, "localhost:10990")); + CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + } + + void setProperty(const auto& property, const std::string& property_value) { + CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, property, property_value)); + } + + void verifyLastRequestIsEmpty() { + const auto& request = mock_loki_.getLastRequest(); + REQUIRE(request.IsNull()); + } + + void verifyTenantId(const std::string& tenant_id) { + REQUIRE(mock_loki_.getLastTenantId() == tenant_id); + } + + void verifyBasicAuthorization(const std::string& expected_username_and_password) { + auto last_authorization = mock_loki_.getLastAuthorization(); + std::string expected_authorization = "Basic "; + REQUIRE(minifi::utils::StringUtils::startsWith(last_authorization, expected_authorization)); + std::string username_and_password_decoded = minifi::utils::StringUtils::from_base64(last_authorization.substr(expected_authorization.size()), minifi::utils::as_string_tag_t{}); + REQUIRE(username_and_password_decoded == expected_username_and_password); + } + + void verifyBearerTokenAuthorization(const std::string& expected_bearer_token) { + auto last_authorization = mock_loki_.getLastAuthorization(); + std::string expected_authorization = "Bearer "; + REQUIRE(minifi::utils::StringUtils::startsWith(last_authorization, expected_authorization)); + auto bearer_token = last_authorization.substr(expected_authorization.size()); + REQUIRE(bearer_token == expected_bearer_token); + } + + void verifyStreamLabels() { + const auto& request = mock_loki_.getLastRequest(); + REQUIRE(request.HasMember("streams")); + const auto& stream_array = request["streams"].GetArray(); + REQUIRE(stream_array.Size() == 1); + REQUIRE(stream_array[0].HasMember("stream")); + const auto& stream = stream_array[0]["stream"].GetObject(); + REQUIRE(stream.HasMember("job")); + std::string job_string = stream["job"].GetString(); + REQUIRE(job_string == "minifi"); + REQUIRE(stream.HasMember("directory")); + std::string directory_string = stream["directory"].GetString(); + REQUIRE(directory_string == "/opt/minifi/logs/"); + } + + void verifySentRequestToLoki(uint64_t start_timestamp, const std::vector<std::string>& expected_log_values, + const std::vector<std::map<std::string, std::string>>& expected_log_line_attribute_values = {}) { + const auto& request = mock_loki_.getLastRequest(); + REQUIRE(request.HasMember("streams")); + const auto& stream_array = request["streams"].GetArray(); + REQUIRE(stream_array[0].HasMember("values")); + const auto& value_array = stream_array[0]["values"].GetArray(); + REQUIRE(value_array.Size() == expected_log_values.size()); + for (size_t i = 0; i < expected_log_values.size(); ++i) { + const auto& log_line_array = value_array[i].GetArray(); + if (!expected_log_line_attribute_values.empty()) { + REQUIRE(log_line_array.Size() == 3); + } else { + REQUIRE(log_line_array.Size() == 2); + } + std::string timestamp_str = log_line_array[0].GetString(); + REQUIRE(start_timestamp <= std::stoull(timestamp_str)); + std::string value = log_line_array[1].GetString(); + REQUIRE(value == expected_log_values[i]); Review Comment: this `value` should be renamed to something else, so it doesn't conflict with the `value` on lines 139-141 ########## extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp: ########## @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../PushGrafanaLokiREST.h" +#include "MockGrafanaLoki.h" +#include "SingleProcessorTestController.h" +#include "Catch.h" +#include "utils/StringUtils.h" +#include "utils/TestUtils.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki::test { + +TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1")); Review Comment: I think these should be `REQUIRE`s, since it doesn't make sense to continue if they fail. On the other hand, there is no way they can fail, so it could also be ```suggestion test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, ""); test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/"); test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1"); ``` ########## extensions/grafana-loki/tests/MockGrafanaLoki.h: ########## @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <string> +#include <utility> +#include <vector> +#include <CivetServer.h> +#include "tests/CivetLibrary.h" +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "rapidjson/document.h" +#include "rapidjson/writer.h" +#include "rapidjson/stringbuffer.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki::test { + +class GrafanaLokiHandler : public CivetHandler { + public: + const rapidjson::Document& getLastRequest() const { + return request_received_; + } + + std::string getLastTenantId() const { + return tenant_id_set_; + } + + std::string getLastAuthorization() const { + return authorization_set_; + } + + private: + bool handlePost(CivetServer*, struct mg_connection* conn) override { + tenant_id_set_.clear(); + authorization_set_.clear(); + const char *org_id = mg_get_header(conn, "X-Scope-OrgID"); + if (org_id != nullptr) { + tenant_id_set_ = org_id; + } + + const char *authorization = mg_get_header(conn, "Authorization"); + if (authorization != nullptr) { + authorization_set_ = authorization; + } + + std::array<char, 2048> request; + size_t chars_read = mg_read(conn, request.data(), 2048); + std::string json_str(request.data(), chars_read); + request_received_.Parse(json_str.c_str()); + + mg_printf(conn, "HTTP/1.1 204 OK\r\n"); + mg_printf(conn, "Content-length: 0"); + mg_printf(conn, "\r\n\r\n"); + return true; + } + + rapidjson::Document request_received_; + std::string tenant_id_set_; + std::string authorization_set_; +}; + +class MockGrafanaLoki { + public: + explicit MockGrafanaLoki(std::string port) : port_(std::move(port)) { + std::vector<std::string> options; + options.emplace_back("listening_ports"); + options.emplace_back(port_); + + server_ = std::make_unique<CivetServer>(options, &callbacks_, &logger_); + loki_handler_ = std::make_unique<GrafanaLokiHandler>(); + server_->addHandler("/loki/api/v1/push", *loki_handler_); + } + + [[nodiscard]] const std::string& getPort() const { + return port_; + } + + const rapidjson::Document& getLastRequest() const { + return loki_handler_->getLastRequest(); + } + + std::string getLastTenantId() const { + return loki_handler_->getLastTenantId(); + } + + std::string getLastAuthorization() const { + return loki_handler_->getLastAuthorization(); + } + + private: + CivetLibrary lib_; + std::string port_; + std::unique_ptr<CivetServer> server_; + std::unique_ptr<GrafanaLokiHandler> loki_handler_; + + CivetCallbacks callbacks_; + std::shared_ptr<org::apache::nifi::minifi::core::logging::Logger> logger_ = org::apache::nifi::minifi::core::logging::LoggerFactory<MockGrafanaLoki>::getLogger(); Review Comment: can we make these non-pointers? ```suggestion CivetCallbacks callbacks_; std::shared_ptr<org::apache::nifi::minifi::core::logging::Logger> logger_ = org::apache::nifi::minifi::core::logging::LoggerFactory<MockGrafanaLoki>::getLogger(); CivetServer server_{{"listening ports", port_}, &callbacks_, &logger_}; GrafanaLokiHandler loki_handler_; ``` ########## extensions/grafana-loki/tests/PushGrafanaLokiRESTTest.cpp: ########## @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../PushGrafanaLokiREST.h" +#include "MockGrafanaLoki.h" +#include "SingleProcessorTestController.h" +#include "Catch.h" +#include "utils/StringUtils.h" +#include "utils/TestUtils.h" + +namespace org::apache::nifi::minifi::extensions::grafana::loki::test { + +TEST_CASE("Url property is required", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1")); + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +TEST_CASE("Valid stream labels need to be set", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "localhost:10990")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "1")); + SECTION("Stream labels cannot be empty") { + test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, ""); + } + SECTION("Stream labels need to be valid") { + test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "invalidlabels,invalidlabels2"); + } + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +TEST_CASE("Log Line Batch Size cannot be 0", "[PushGrafanaLokiREST]") { + auto push_grafana_loki_rest = std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST"); + minifi::test::SingleProcessorTestController test_controller(push_grafana_loki_rest); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::Url, "localhost:10990")); + CHECK(test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + test_controller.plan->setProperty(push_grafana_loki_rest, PushGrafanaLokiREST::LogLineBatchSize, "0"); + REQUIRE_THROWS_AS(test_controller.trigger(), minifi::Exception); +} + +class PushGrafanaLokiRESTTestFixture { + public: + PushGrafanaLokiRESTTestFixture() + : mock_loki_("10990"), + push_grafana_loki_rest_(std::make_shared<PushGrafanaLokiREST>("PushGrafanaLokiREST")), + test_controller_(push_grafana_loki_rest_) { + LogTestController::getInstance().setDebug<TestPlan>(); + LogTestController::getInstance().setDebug<minifi::core::Processor>(); + LogTestController::getInstance().setTrace<minifi::core::ProcessSession>(); + LogTestController::getInstance().setTrace<PushGrafanaLokiREST>(); + CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, PushGrafanaLokiREST::Url, "localhost:10990")); + CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, PushGrafanaLokiREST::StreamLabels, "job=minifi,directory=/opt/minifi/logs/")); + } + + void setProperty(const auto& property, const std::string& property_value) { + CHECK(test_controller_.plan->setProperty(push_grafana_loki_rest_, property, property_value)); + } + + void verifyLastRequestIsEmpty() { + const auto& request = mock_loki_.getLastRequest(); + REQUIRE(request.IsNull()); + } + + void verifyTenantId(const std::string& tenant_id) { + REQUIRE(mock_loki_.getLastTenantId() == tenant_id); + } + + void verifyBasicAuthorization(const std::string& expected_username_and_password) { + auto last_authorization = mock_loki_.getLastAuthorization(); + std::string expected_authorization = "Basic "; + REQUIRE(minifi::utils::StringUtils::startsWith(last_authorization, expected_authorization)); + std::string username_and_password_decoded = minifi::utils::StringUtils::from_base64(last_authorization.substr(expected_authorization.size()), minifi::utils::as_string_tag_t{}); + REQUIRE(username_and_password_decoded == expected_username_and_password); + } + + void verifyBearerTokenAuthorization(const std::string& expected_bearer_token) { + auto last_authorization = mock_loki_.getLastAuthorization(); + std::string expected_authorization = "Bearer "; + REQUIRE(minifi::utils::StringUtils::startsWith(last_authorization, expected_authorization)); + auto bearer_token = last_authorization.substr(expected_authorization.size()); + REQUIRE(bearer_token == expected_bearer_token); + } + + void verifyStreamLabels() { + const auto& request = mock_loki_.getLastRequest(); + REQUIRE(request.HasMember("streams")); + const auto& stream_array = request["streams"].GetArray(); + REQUIRE(stream_array.Size() == 1); + REQUIRE(stream_array[0].HasMember("stream")); + const auto& stream = stream_array[0]["stream"].GetObject(); + REQUIRE(stream.HasMember("job")); + std::string job_string = stream["job"].GetString(); + REQUIRE(job_string == "minifi"); + REQUIRE(stream.HasMember("directory")); + std::string directory_string = stream["directory"].GetString(); + REQUIRE(directory_string == "/opt/minifi/logs/"); + } + + void verifySentRequestToLoki(uint64_t start_timestamp, const std::vector<std::string>& expected_log_values, + const std::vector<std::map<std::string, std::string>>& expected_log_line_attribute_values = {}) { + const auto& request = mock_loki_.getLastRequest(); + REQUIRE(request.HasMember("streams")); + const auto& stream_array = request["streams"].GetArray(); + REQUIRE(stream_array[0].HasMember("values")); Review Comment: I think we should check that the array is not empty before taking its first element -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
