lordgamez commented on code in PR #1903: URL: https://github.com/apache/nifi-minifi-cpp/pull/1903#discussion_r2084860582
########## extensions/llamacpp/CMakeLists.txt: ########## @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if (NOT (ENABLE_ALL OR ENABLE_LLAMACPP)) + return() +endif() + +include(LlamaCpp) + +include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) + +file(GLOB SOURCES "processors/*.cpp") + +add_minifi_library(minifi-llamacpp SHARED ${SOURCES}) +target_include_directories(minifi-llamacpp PUBLIC "${CMAKE_SOURCE_DIR}/extensions/llamacpp") +target_include_directories(minifi-llamacpp PUBLIC "${LLAMACPP_INCLUDE_DIRS}") + +target_link_libraries(minifi-llamacpp ${LIBMINIFI} llama) + +register_extension(minifi-llamacpp "LLAMACPP EXTENSION" LLAMACPP-EXTENSION "Provides LlamaCpp support" "extensions/llamacpp/tests") Review Comment: Updated in https://github.com/apache/nifi-minifi-cpp/pull/1903/commits/e2fffeca7c51e3b3cd2f316b2897ba7e389fed8b ########## extensions/llamacpp/processors/RunLlamaCppInference.h: ########## @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <mutex> +#include <atomic> + +#include "core/Processor.h" +#include "core/logging/LoggerFactory.h" +#include "core/PropertyDefinitionBuilder.h" +#include "LlamaContext.h" +#include "core/ProcessorMetrics.h" + +namespace org::apache::nifi::minifi::extensions::llamacpp::processors { + +class RunLlamaCppInferenceMetrics : public core::ProcessorMetricsImpl { + public: + explicit RunLlamaCppInferenceMetrics(const core::Processor& source_processor) + : core::ProcessorMetricsImpl(source_processor) { + } + + std::vector<state::response::SerializedResponseNode> serialize() override { + auto resp = core::ProcessorMetricsImpl::serialize(); + auto& root_node = resp[0]; + + state::response::SerializedResponseNode tokens_in_node{"TokensIn", tokens_in.load()}; + root_node.children.push_back(tokens_in_node); + + state::response::SerializedResponseNode tokens_out_node{"TokensOut", tokens_out.load()}; + root_node.children.push_back(tokens_out_node); + + return resp; + } + + std::vector<state::PublishedMetric> calculateMetrics() override { + auto metrics = core::ProcessorMetricsImpl::calculateMetrics(); + metrics.push_back({"tokens_in", static_cast<double>(tokens_in.load()), getCommonLabels()}); + metrics.push_back({"tokens_out", static_cast<double>(tokens_out.load()), getCommonLabels()}); + return metrics; + } + + std::mutex tokens_in_mutex_; + std::mutex tokens_out_mutex_; + std::atomic<uint64_t> tokens_in{0}; + std::atomic<uint64_t> tokens_out{0}; +}; + +class RunLlamaCppInference : public core::ProcessorImpl { + public: + explicit RunLlamaCppInference(std::string_view name, const utils::Identifier& uuid = {}) + : core::ProcessorImpl(name, uuid) { + metrics_ = gsl::make_not_null(std::make_shared<RunLlamaCppInferenceMetrics>(*this)); + } + ~RunLlamaCppInference() override = default; + + EXTENSIONAPI static constexpr const char* Description = "LlamaCpp processor to use llama.cpp library for running language model inference. " + "The inference will be based on the System Prompt and the Prompt property values, together with the content of the incoming flow file. " + "In the Prompt, the content of the incoming flow file can be referred to as 'the input data' or 'the flow file content'."; + + EXTENSIONAPI static constexpr auto ModelPath = core::PropertyDefinitionBuilder<>::createProperty("Model Path") + .withDescription("The filesystem path of the model file in gguf format.") + .isRequired(true) + .build(); + EXTENSIONAPI static constexpr auto Temperature = core::PropertyDefinitionBuilder<>::createProperty("Temperature") + .withDescription("The temperature to use for sampling.") + .withDefaultValue("0.8") + .build(); + EXTENSIONAPI static constexpr auto TopK = core::PropertyDefinitionBuilder<>::createProperty("Top K") + .withDescription("Limit the next token selection to the K most probable tokens. Set <= 0 value to use vocab size.") + .withDefaultValue("40") + .build(); + EXTENSIONAPI static constexpr auto TopP = core::PropertyDefinitionBuilder<>::createProperty("Top P") + .withDescription("Limit the next token selection to a subset of tokens with a cumulative probability above a threshold P. 1.0 = disabled.") + .withDefaultValue("0.9") + .build(); + EXTENSIONAPI static constexpr auto MinP = core::PropertyDefinitionBuilder<>::createProperty("Min P") + .withDescription("Sets a minimum base probability threshold for token selection. 0.0 = disabled.") + .build(); + EXTENSIONAPI static constexpr auto MinKeep = core::PropertyDefinitionBuilder<>::createProperty("Min Keep") + .withDescription("If greater than 0, force samplers to return N possible tokens at minimum.") + .isRequired(true) + .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) + .withDefaultValue("0") + .build(); + EXTENSIONAPI static constexpr auto TextContextSize = core::PropertyDefinitionBuilder<>::createProperty("Text Context Size") + .withDescription("Size of the text context, use 0 to use size set in model.") + .isRequired(true) + .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) + .withDefaultValue("4096") + .build(); + EXTENSIONAPI static constexpr auto LogicalMaximumBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Logical Maximum Batch Size") + .withDescription("Logical maximum batch size that can be submitted to the llama.cpp decode function.") + .isRequired(true) + .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) + .withDefaultValue("2048") + .build(); + EXTENSIONAPI static constexpr auto PhysicalMaximumBatchSize = core::PropertyDefinitionBuilder<>::createProperty("Physical Maximum Batch Size") + .withDescription("Physical maximum batch size.") + .isRequired(true) + .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) + .withDefaultValue("512") + .build(); + EXTENSIONAPI static constexpr auto MaxNumberOfSequences = core::PropertyDefinitionBuilder<>::createProperty("Max Number Of Sequences") + .withDescription("Maximum number of sequences (i.e. distinct states for recurrent models).") + .isRequired(true) + .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) + .withDefaultValue("1") + .build(); + EXTENSIONAPI static constexpr auto ThreadsForGeneration = core::PropertyDefinitionBuilder<>::createProperty("Threads For Generation") + .withDescription("Number of threads to use for generation.") + .isRequired(true) + .withValidator(core::StandardPropertyValidators::INTEGER_VALIDATOR) + .withDefaultValue("4") + .build(); + EXTENSIONAPI static constexpr auto ThreadsForBatchProcessing = core::PropertyDefinitionBuilder<>::createProperty("Threads For Batch Processing") + .withDescription("Number of threads to use for batch processing.") + .isRequired(true) + .withValidator(core::StandardPropertyValidators::INTEGER_VALIDATOR) + .withDefaultValue("4") + .build(); + EXTENSIONAPI static constexpr auto Prompt = core::PropertyDefinitionBuilder<>::createProperty("Prompt") + .withDescription("The user prompt for the inference.") + .supportsExpressionLanguage(true) + .build(); + EXTENSIONAPI static constexpr auto SystemPrompt = core::PropertyDefinitionBuilder<>::createProperty("System Prompt") + .withDescription("The system prompt for the inference.") + .withDefaultValue("You are a helpful assistant. You are given a question with some possible input data otherwise called flow file content. " + "You are expected to generate a response based on the question and the input data.") + .build(); + + EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ + ModelPath, + Temperature, + TopK, + TopP, + MinP, + MinKeep, + TextContextSize, + LogicalMaximumBatchSize, + PhysicalMaximumBatchSize, + MaxNumberOfSequences, + ThreadsForGeneration, + ThreadsForBatchProcessing, + Prompt, + SystemPrompt + }); + + + EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Generated results from the model"}; + EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "Generation failed"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; + + EXTENSIONAPI static constexpr auto LlamaCppTimeToFirstToken = core::OutputAttributeDefinition<>{"llamacpp.time.to.first.token", {Success}, "Time to first token generated in milliseconds."}; + EXTENSIONAPI static constexpr auto LlamaCppTokensPerSecond = core::OutputAttributeDefinition<>{"llamacpp.tokens.per.second", {Success}, "Tokens generated per second."}; + EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 2>{LlamaCppTimeToFirstToken, LlamaCppTokensPerSecond}; Review Comment: Updated in https://github.com/apache/nifi-minifi-cpp/pull/1903/commits/e2fffeca7c51e3b3cd2f316b2897ba7e389fed8b ########## extensions/llamacpp/processors/LlamaContext.cpp: ########## @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "LlamaContext.h" +#include "DefaultLlamaContext.h" + +namespace org::apache::nifi::minifi::extensions::llamacpp::processors { + +static std::function<std::unique_ptr<LlamaContext>(const std::filesystem::path&, const LlamaSamplerParams&, const LlamaContextParams&)> test_provider; Review Comment: Updated to inject a handler function in constructor (function is needed to save the parameters in the tests) in https://github.com/apache/nifi-minifi-cpp/pull/1903/commits/e2fffeca7c51e3b3cd2f316b2897ba7e389fed8b. ########## extensions/llamacpp/processors/RunLlamaCppInference.h: ########## @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <mutex> +#include <atomic> + +#include "core/Processor.h" +#include "core/logging/LoggerFactory.h" +#include "core/PropertyDefinitionBuilder.h" +#include "LlamaContext.h" +#include "core/ProcessorMetrics.h" + +namespace org::apache::nifi::minifi::extensions::llamacpp::processors { + +class RunLlamaCppInferenceMetrics : public core::ProcessorMetricsImpl { + public: + explicit RunLlamaCppInferenceMetrics(const core::Processor& source_processor) + : core::ProcessorMetricsImpl(source_processor) { + } + + std::vector<state::response::SerializedResponseNode> serialize() override { + auto resp = core::ProcessorMetricsImpl::serialize(); + auto& root_node = resp[0]; + + state::response::SerializedResponseNode tokens_in_node{"TokensIn", tokens_in.load()}; + root_node.children.push_back(tokens_in_node); + + state::response::SerializedResponseNode tokens_out_node{"TokensOut", tokens_out.load()}; + root_node.children.push_back(tokens_out_node); + + return resp; + } + + std::vector<state::PublishedMetric> calculateMetrics() override { + auto metrics = core::ProcessorMetricsImpl::calculateMetrics(); + metrics.push_back({"tokens_in", static_cast<double>(tokens_in.load()), getCommonLabels()}); + metrics.push_back({"tokens_out", static_cast<double>(tokens_out.load()), getCommonLabels()}); + return metrics; + } + + std::mutex tokens_in_mutex_; + std::mutex tokens_out_mutex_; + std::atomic<uint64_t> tokens_in{0}; + std::atomic<uint64_t> tokens_out{0}; Review Comment: Removed in https://github.com/apache/nifi-minifi-cpp/pull/1903/commits/e2fffeca7c51e3b3cd2f316b2897ba7e389fed8b ########## extensions/llamacpp/processors/DefaultLlamaContext.cpp: ########## @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "DefaultLlamaContext.h" +#include "Exception.h" +#include "fmt/format.h" +#include "utils/ConfigurationUtils.h" + +namespace org::apache::nifi::minifi::extensions::llamacpp::processors { + +namespace { +std::vector<llama_token> tokenizeInput(const llama_vocab* vocab, const std::string& input) { + int32_t number_of_tokens = gsl::narrow<int32_t>(input.length()) + 2; + std::vector<llama_token> tokenized_input(number_of_tokens); + number_of_tokens = llama_tokenize(vocab, input.data(), gsl::narrow<int32_t>(input.length()), tokenized_input.data(), gsl::narrow<int32_t>(tokenized_input.size()), true, true); + if (number_of_tokens < 0) { + tokenized_input.resize(-number_of_tokens); + [[maybe_unused]] int32_t check = llama_tokenize(vocab, input.data(), gsl::narrow<int32_t>(input.length()), tokenized_input.data(), gsl::narrow<int32_t>(tokenized_input.size()), true, true); + gsl_Assert(check == -number_of_tokens); + } else { + tokenized_input.resize(number_of_tokens); + } + return tokenized_input; +} +} // namespace + + +DefaultLlamaContext::DefaultLlamaContext(const std::filesystem::path& model_path, const LlamaSamplerParams& llama_sampler_params, const LlamaContextParams& llama_ctx_params) { + llama_backend_init(); + + llama_model_ = llama_model_load_from_file(model_path.string().c_str(), llama_model_default_params()); // NOLINT(cppcoreguidelines-prefer-member-initializer) + if (!llama_model_) { + throw Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, fmt::format("Failed to load model from '{}'", model_path.string())); + } + + llama_context_params ctx_params = llama_context_default_params(); + ctx_params.n_ctx = llama_ctx_params.n_ctx; + ctx_params.n_batch = llama_ctx_params.n_batch; + ctx_params.n_ubatch = llama_ctx_params.n_ubatch; + ctx_params.n_seq_max = llama_ctx_params.n_seq_max; + ctx_params.n_threads = llama_ctx_params.n_threads; + ctx_params.n_threads_batch = llama_ctx_params.n_threads_batch; + ctx_params.flash_attn = false; + llama_ctx_ = llama_init_from_model(llama_model_, ctx_params); + + auto sparams = llama_sampler_chain_default_params(); + llama_sampler_ = llama_sampler_chain_init(sparams); + + if (llama_sampler_params.min_p) { + llama_sampler_chain_add(llama_sampler_, llama_sampler_init_min_p(*llama_sampler_params.min_p, llama_sampler_params.min_keep)); + } + if (llama_sampler_params.top_k) { + llama_sampler_chain_add(llama_sampler_, llama_sampler_init_top_k(*llama_sampler_params.top_k)); + } + if (llama_sampler_params.top_p) { + llama_sampler_chain_add(llama_sampler_, llama_sampler_init_top_p(*llama_sampler_params.top_p, llama_sampler_params.min_keep)); + } + if (llama_sampler_params.temperature) { + llama_sampler_chain_add(llama_sampler_, llama_sampler_init_temp(*llama_sampler_params.temperature)); + } + llama_sampler_chain_add(llama_sampler_, llama_sampler_init_dist(LLAMA_DEFAULT_SEED)); +} + +DefaultLlamaContext::~DefaultLlamaContext() { + llama_sampler_free(llama_sampler_); + llama_sampler_ = nullptr; + llama_free(llama_ctx_); + llama_ctx_ = nullptr; + llama_model_free(llama_model_); + llama_model_ = nullptr; + llama_backend_free(); +} + +std::optional<std::string> DefaultLlamaContext::applyTemplate(const std::vector<LlamaChatMessage>& messages) { + std::vector<llama_chat_message> llama_messages; + llama_messages.reserve(messages.size()); + for (auto& msg : messages) { + llama_messages.push_back(llama_chat_message{.role = msg.role.c_str(), .content = msg.content.c_str()}); + } Review Comment: Updated in https://github.com/apache/nifi-minifi-cpp/pull/1903/commits/e2fffeca7c51e3b3cd2f316b2897ba7e389fed8b ########## extensions/llamacpp/processors/RunLlamaCppInference.cpp: ########## @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "RunLlamaCppInference.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "Exception.h" + +#include "rapidjson/document.h" +#include "rapidjson/error/en.h" +#include "LlamaContext.h" +#include "utils/ProcessorConfigUtils.h" + +namespace org::apache::nifi::minifi::extensions::llamacpp::processors { + +void RunLlamaCppInference::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void RunLlamaCppInference::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + model_path_.clear(); + model_path_ = utils::parseProperty(context, ModelPath); + system_prompt_ = context.getProperty(SystemPrompt).value_or(""); + + LlamaSamplerParams llama_sampler_params; + llama_sampler_params.temperature = utils::parseOptionalFloatProperty(context, Temperature); + if (auto top_k = utils::parseOptionalI64Property(context, TopK)) { + llama_sampler_params.top_k = gsl::narrow<int32_t>(*top_k); + } + llama_sampler_params.top_p = utils::parseOptionalFloatProperty(context, TopP); + llama_sampler_params.min_p = utils::parseOptionalFloatProperty(context, MinP); + llama_sampler_params.min_keep = utils::parseU64Property(context, MinKeep); + + LlamaContextParams llama_ctx_params; + llama_ctx_params.n_ctx = gsl::narrow<uint32_t>(utils::parseU64Property(context, TextContextSize)); + llama_ctx_params.n_batch = gsl::narrow<uint32_t>(utils::parseU64Property(context, LogicalMaximumBatchSize)); + llama_ctx_params.n_ubatch = gsl::narrow<uint32_t>(utils::parseU64Property(context, PhysicalMaximumBatchSize)); + llama_ctx_params.n_seq_max = gsl::narrow<uint32_t>(utils::parseU64Property(context, MaxNumberOfSequences)); + llama_ctx_params.n_threads = gsl::narrow<int32_t>(utils::parseI64Property(context, ThreadsForGeneration)); + llama_ctx_params.n_threads_batch = gsl::narrow<int32_t>(utils::parseI64Property(context, ThreadsForBatchProcessing)); + + llama_ctx_ = LlamaContext::create(model_path_, llama_sampler_params, llama_ctx_params); +} + +void RunLlamaCppInference::increaseTokensIn(uint64_t token_count) { + auto* const llamacpp_metrics = dynamic_cast<RunLlamaCppInferenceMetrics*>(metrics_.get()); + gsl_Assert(llamacpp_metrics); + std::lock_guard<std::mutex> lock(llamacpp_metrics->tokens_in_mutex_); + if (llamacpp_metrics->tokens_in > std::numeric_limits<uint64_t>::max() - token_count) { + logger_->log_warn("Tokens in count overflow detected, resetting to 0"); + llamacpp_metrics->tokens_in = token_count; + return; + } + + llamacpp_metrics->tokens_in += token_count; +} + +void RunLlamaCppInference::increaseTokensOut(uint64_t token_count) { + auto* const llamacpp_metrics = dynamic_cast<RunLlamaCppInferenceMetrics*>(metrics_.get()); + gsl_Assert(llamacpp_metrics); + std::lock_guard<std::mutex> lock(llamacpp_metrics->tokens_out_mutex_); + if (llamacpp_metrics->tokens_out > std::numeric_limits<uint64_t>::max() - token_count) { + logger_->log_warn("Tokens out count overflow detected, resetting to 0"); + llamacpp_metrics->tokens_out = token_count; + return; + } Review Comment: Good point, removed in https://github.com/apache/nifi-minifi-cpp/pull/1903/commits/e2fffeca7c51e3b3cd2f316b2897ba7e389fed8b -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
