This is an automated email from the ASF dual-hosted git repository. adebreceni pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit a3bf25359307cfd5cb9fdbfc0e48de8af494fe43 Author: Gabor Gyimesi <[email protected]> AuthorDate: Mon Dec 5 13:48:40 2022 +0100 MINIFICPP-1990 Add ProcessSession::remove to Python API Signed-off-by: Adam Debreceni <[email protected]> This closes #1462 --- docker/test/integration/features/python.feature | 8 ++++++ docker/test/integration/minifi/core/ImageStore.py | 2 +- .../minifi/processors/RemoveFlowFile.py | 22 ++++++++++++++++ .../pythonprocessors/examples/RemoveFlowFile.py | 29 ++++++++++++++++++++++ extensions/script/python/PyProcessSession.cpp | 14 +++++++++++ extensions/script/python/PyProcessSession.h | 1 + extensions/script/python/PythonBindings.h | 3 ++- libminifi/src/core/ProcessSession.cpp | 1 + 8 files changed, 78 insertions(+), 2 deletions(-) diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature index 5f1c72a34..341199c43 100644 --- a/docker/test/integration/features/python.feature +++ b/docker/test/integration/features/python.feature @@ -46,3 +46,11 @@ Feature: MiNiFi can use python processors in its flows When all instances start up Then a flowfile with the content '{"content": ""}' is placed in the monitored directory in less than 60 seconds + + Scenario: FlowFile can be removed from session + Given a GenerateFlowFile processor with the "File Size" property set to "0B" + And a RemoveFlowFile processor + + When all instances start up + Then the Minifi logs contain the following message: "Removing flow file with UUID" in less than 30 seconds + diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py index 2700c89d0..c0c158852 100644 --- a/docker/test/integration/minifi/core/ImageStore.py +++ b/docker/test/integration/minifi/core/ImageStore.py @@ -106,7 +106,7 @@ class ImageStore: echo "UserName = postgres" >> /etc/odbc.ini && \ echo "Password = password" >> /etc/odbc.ini && \ echo "Database = postgres" >> /etc/odbc.ini - RUN sed -i -e 's/INFO/DEBUG/g' {minifi_root}/conf/minifi-log.properties + RUN sed -i -e 's/INFO/TRACE/g' {minifi_root}/conf/minifi-log.properties RUN echo nifi.flow.engine.threads=5 >> {minifi_root}/conf/minifi.properties RUN echo nifi.metrics.publisher.agent.identifier=Agent1 >> {minifi_root}/conf/minifi.properties RUN echo nifi.metrics.publisher.class=PrometheusMetricsPublisher >> {minifi_root}/conf/minifi.properties diff --git a/docker/test/integration/minifi/processors/RemoveFlowFile.py b/docker/test/integration/minifi/processors/RemoveFlowFile.py new file mode 100644 index 000000000..3fc9b4da2 --- /dev/null +++ b/docker/test/integration/minifi/processors/RemoveFlowFile.py @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ..core.Processor import Processor + + +class RemoveFlowFile(Processor): + def __init__(self): + super(RemoveFlowFile, self).__init__('RemoveFlowFile', class_prefix='org.apache.nifi.minifi.processors.examples.') diff --git a/extensions/pythonprocessors/examples/RemoveFlowFile.py b/extensions/pythonprocessors/examples/RemoveFlowFile.py new file mode 100644 index 000000000..5235636b4 --- /dev/null +++ b/extensions/pythonprocessors/examples/RemoveFlowFile.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def describe(processor): + processor.setDescription("Removes flow file from the session") + + +def onInitialize(processor): + processor.setSupportsDynamicProperties() + + +def onTrigger(context, session): + flow_file = session.get() + if flow_file is not None: + session.remove(flow_file) diff --git a/extensions/script/python/PyProcessSession.cpp b/extensions/script/python/PyProcessSession.cpp index 843d00323..f13a3c162 100644 --- a/extensions/script/python/PyProcessSession.cpp +++ b/extensions/script/python/PyProcessSession.cpp @@ -135,4 +135,18 @@ void PyProcessSession::releaseCoreResources() { session_.reset(); } +void PyProcessSession::remove(const std::shared_ptr<script::ScriptFlowFile>& script_flow_file) { + if (!session_) { + throw std::runtime_error("Access of ProcessSession after it has been released"); + } + + auto flow_file = script_flow_file->getFlowFile(); + + if (!flow_file) { + throw std::runtime_error("Access of FlowFile after it has been released"); + } + + session_->remove(flow_file); +} + } // namespace org::apache::nifi::minifi::python diff --git a/extensions/script/python/PyProcessSession.h b/extensions/script/python/PyProcessSession.h index 898719854..2eea2bb58 100644 --- a/extensions/script/python/PyProcessSession.h +++ b/extensions/script/python/PyProcessSession.h @@ -46,6 +46,7 @@ class PyProcessSession { void transfer(const std::shared_ptr<script::ScriptFlowFile>& flow_file, const core::Relationship& relationship); void read(const std::shared_ptr<script::ScriptFlowFile>& flow_file, py::object input_stream_callback); void write(const std::shared_ptr<script::ScriptFlowFile>& flow_file, py::object output_stream_callback); + void remove(const std::shared_ptr<script::ScriptFlowFile>& flow_file); /** * Sometimes we want to release shared pointers to core resources when diff --git a/extensions/script/python/PythonBindings.h b/extensions/script/python/PythonBindings.h index a406f02f8..004f6f415 100644 --- a/extensions/script/python/PythonBindings.h +++ b/extensions/script/python/PythonBindings.h @@ -54,7 +54,8 @@ PYBIND11_EMBEDDED_MODULE(minifi_native, m) { // NOLINT static_cast<std::shared_ptr<script::ScriptFlowFile> (python::PyProcessSession::*)(const std::shared_ptr<script::ScriptFlowFile>&)>(&python::PyProcessSession::create)) .def("read", &python::PyProcessSession::read) .def("write", &python::PyProcessSession::write) - .def("transfer", &python::PyProcessSession::transfer); + .def("transfer", &python::PyProcessSession::transfer) + .def("remove", &python::PyProcessSession::remove); py::class_<python::PythonProcessor, std::shared_ptr<python::PythonProcessor>>(m, "Processor") .def("setSupportsDynamicProperties", &python::PythonProcessor::setSupportsDynamicProperties) diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 0181f1e36..1d06b8da7 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -195,6 +195,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(const std::shared_ptr<core } void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) { + logger_->log_trace("Removing flow file with UUID: %s", flow->getUUIDStr()); flow->setDeleted(true); deleted_flowfiles_.push_back(flow); std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr();
