martinzink commented on code in PR #1987: URL: https://github.com/apache/nifi-minifi-cpp/pull/1987#discussion_r2444697408
########## core-framework/c-api-framework/src/core/ProcessSession.cpp: ########## @@ -0,0 +1,149 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "api/core/ProcessSession.h" + +#include "io/InputStream.h" +#include "io/OutputStream.h" +#include "api/utils/minifi-c-utils.h" +#include "api/core/FlowFile.h" +#include "minifi-cpp/Exception.h" + +namespace org::apache::nifi::minifi::api::core { + +namespace { + +class MinifiOutputStreamWrapper : public io::OutputStreamImpl { + public: + explicit MinifiOutputStreamWrapper(MinifiOutputStream impl): impl_(impl) {} + + size_t write(const uint8_t *value, size_t len) override { + return MinifiOutputStreamWrite(impl_, reinterpret_cast<const char*>(value), len); + } + + void close() override {gsl_FailFast();} + void seek(size_t /*offset*/) override {gsl_FailFast();} + [[nodiscard]] size_t tell() const override {gsl_FailFast();} + int initialize() override {gsl_FailFast();} + [[nodiscard]] std::span<const std::byte> getBuffer() const override {gsl_FailFast();} + + private: + MinifiOutputStream impl_; +}; + +class MinifiInputStreamWrapper : public io::InputStreamImpl { + public: + explicit MinifiInputStreamWrapper(MinifiInputStream impl): impl_(impl) {} + + size_t read(std::span<std::byte> out_buffer) override { + return MinifiInputStreamRead(impl_, reinterpret_cast<char*>(out_buffer.data()), out_buffer.size()); + } + + [[nodiscard]] size_t size() const override { + return MinifiInputStreamSize(impl_); + } + + void close() override {gsl_FailFast();} + void seek(size_t /*offset*/) override {gsl_FailFast();} + [[nodiscard]] size_t tell() const override {gsl_FailFast();} + int initialize() override {gsl_FailFast();} + [[nodiscard]] std::span<const std::byte> getBuffer() const override {gsl_FailFast();} + + private: + MinifiInputStream impl_; +}; + +} // namespace + +std::shared_ptr<FlowFile> ProcessSession::get() { + return std::make_shared<FlowFile>(MinifiProcessSessionGet(impl_)); +} + +std::shared_ptr<FlowFile> ProcessSession::create(const FlowFile* parent) { + return std::make_shared<FlowFile>(MinifiProcessSessionCreate(impl_, parent ? parent->getImpl() : MINIFI_NULL)); +} + +void ProcessSession::transfer(const std::shared_ptr<FlowFile>& ff, const minifi::core::Relationship& relationship) { + const auto rel_name = relationship.getName(); + MinifiProcessSessionTransfer(impl_, ff->getImpl(), utils::toStringView(rel_name)); +} + +void ProcessSession::write(FlowFile& flow_file, const io::OutputStreamCallback& callback) { + const auto status = MinifiProcessSessionWrite(impl_, flow_file.getImpl(), [] (void* data, MinifiOutputStream output) { + return (*static_cast<const io::OutputStreamCallback*>(data))(std::make_shared<MinifiOutputStreamWrapper>(output)); + }, const_cast<io::OutputStreamCallback*>(&callback)); + if (status != MINIFI_SUCCESS) { + throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); + } +} + +void ProcessSession::read(FlowFile& flow_file, const io::InputStreamCallback& callback) { + const auto status = MinifiProcessSessionRead(impl_, flow_file.getImpl(), [] (void* data, MinifiInputStream input) { + return (*static_cast<const io::InputStreamCallback*>(data))(std::make_shared<MinifiInputStreamWrapper>(input)); + }, const_cast<io::InputStreamCallback*>(&callback)); + if (status != MINIFI_SUCCESS) { + throw minifi::Exception(minifi::FILE_OPERATION_EXCEPTION, "Failed to process flowfile content"); + } +} + +void ProcessSession::setAttribute(FlowFile& ff, std::string_view key, std::string value) { Review Comment: ```suggestion void ProcessSession::setAttribute(FlowFile& ff, std::string_view key, std::string value) { // NOLINT(performance-unnecessary-value-param) ``` This is the last clang tidy issue, I think we can ignore this since the signature comes from the parent not much we can do about it here ########## core-framework/c-api-framework/include/api/core/ProcessContext.h: ########## @@ -0,0 +1,47 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <string> + +#include "minifi-c.h" +#include "nonstd/expected.hpp" +#include "minifi-cpp/core/PropertyDefinition.h" +#include "api/core/FlowFile.h" + +namespace org::apache::nifi::minifi::api::core { + +class ProcessContext { + public: + explicit ProcessContext(MinifiProcessContext impl): impl_(impl) {} + + nonstd::expected<std::string, std::error_code> getProperty(std::string_view name, const FlowFile* flow_file = nullptr) const; + nonstd::expected<std::string, std::error_code> getProperty(const minifi::core::PropertyReference& property_reference, const FlowFile* flow_file = nullptr) const { + return getProperty(property_reference.name, flow_file); + } + + bool hasNonEmptyProperty(std::string_view name) const; + void yield(); Review Comment: I mentioned this already (and i cant recall what your answer was), but what if we were to remove yield from ProcessContext and moved yield functionality into onTrigger and onSchedule instead? This way ProcessContext would only have const functions and yield feels like a result of ontrigger in my eyes. ########## core-framework/c-api-framework/include/api/core/ProcessorImpl.h: ########## @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <algorithm> +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <functional> +#include <memory> +#include <mutex> +#include <string> +#include <string_view> +#include <unordered_set> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "minifi-cpp/core/Annotation.h" +#include "minifi-cpp/core/DynamicProperty.h" +// #include "minifi-cpp/core/ProcessorMetrics.h" Review Comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
