This is an automated email from the ASF dual-hosted git repository. adebreceni pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit d16f7455c746538f948db7762ce0af751ad4f4ed Author: Gabor Gyimesi <[email protected]> AuthorDate: Mon Dec 5 13:45:55 2022 +0100 MINIFICPP-1993 Fix empty return value of PyInputStream Signed-off-by: Adam Debreceni <[email protected]> This closes #1463 --- docker/test/integration/features/python.feature | 10 +++++ .../minifi/processors/MoveContentToJson.py | 22 ++++++++++ .../pythonprocessors/examples/MoveContentToJson.py | 50 ++++++++++++++++++++++ extensions/script/python/PyInputStream.cpp | 2 +- extensions/script/python/PythonScriptEngine.h | 2 +- 5 files changed, 84 insertions(+), 2 deletions(-) diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature index 2877853ea..5f1c72a34 100644 --- a/docker/test/integration/features/python.feature +++ b/docker/test/integration/features/python.feature @@ -36,3 +36,13 @@ Feature: MiNiFi can use python processors in its flows When all instances start up Then the Minifi logs contain the following message: "key:Python attribute value:attributevalue" in less than 60 seconds + + Scenario: Native python processor can read empty input stream + Given a GenerateFlowFile processor with the "File Size" property set to "0B" + And a MoveContentToJson processor + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the GenerateFlowFile processor is connected to the MoveContentToJson + And the "success" relationship of the MoveContentToJson processor is connected to the PutFile + + When all instances start up + Then a flowfile with the content '{"content": ""}' is placed in the monitored directory in less than 60 seconds diff --git a/docker/test/integration/minifi/processors/MoveContentToJson.py b/docker/test/integration/minifi/processors/MoveContentToJson.py new file mode 100644 index 000000000..4d4013ce4 --- /dev/null +++ b/docker/test/integration/minifi/processors/MoveContentToJson.py @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ..core.Processor import Processor + + +class MoveContentToJson(Processor): + def __init__(self): + super(MoveContentToJson, self).__init__('MoveContentToJson', class_prefix='org.apache.nifi.minifi.processors.examples.') diff --git a/extensions/pythonprocessors/examples/MoveContentToJson.py b/extensions/pythonprocessors/examples/MoveContentToJson.py new file mode 100644 index 000000000..1b3068b07 --- /dev/null +++ b/extensions/pythonprocessors/examples/MoveContentToJson.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import codecs +import json + + +class ReadCallback: + def process(self, input_stream): + self.content = codecs.getreader('utf-8')(input_stream).read() + return len(self.content) + + +class WriteToJsonCallback: + def __init__(self, content): + self.content = content + + def process(self, output_stream): + json_content = json.dumps({"content": self.content}) + output_stream.write(json_content.encode('utf-8')) + return len(json_content) + + +def describe(processor): + processor.setDescription("Moves content of flow file to JSON file under 'content' key") + + +def onInitialize(processor): + processor.setSupportsDynamicProperties() + + +def onTrigger(context, session): + flow_file = session.get() + if flow_file is not None: + read_callback = ReadCallback() + session.read(flow_file, read_callback) + session.write(flow_file, WriteToJsonCallback(read_callback.content)) + session.transfer(flow_file, REL_SUCCESS) diff --git a/extensions/script/python/PyInputStream.cpp b/extensions/script/python/PyInputStream.cpp index 1bc74ceb3..9e4951f9c 100644 --- a/extensions/script/python/PyInputStream.cpp +++ b/extensions/script/python/PyInputStream.cpp @@ -41,7 +41,7 @@ py::bytes PyInputStream::read(size_t len) { } if (len <= 0) { - return nullptr; + return {}; } std::vector<std::byte> buffer(len); diff --git a/extensions/script/python/PythonScriptEngine.h b/extensions/script/python/PythonScriptEngine.h index 1a58b8672..4ba38b6ad 100644 --- a/extensions/script/python/PythonScriptEngine.h +++ b/extensions/script/python/PythonScriptEngine.h @@ -112,7 +112,7 @@ class PythonScriptEngine : public script::ScriptEngine { void callRequiredFunction(const std::string &fn_name, Args &&...args) { py::gil_scoped_acquire gil { }; if (!(*bindings_).contains(fn_name.c_str())) - throw std::runtime_error("Required Function" + fn_name + " is not found within Python bindings"); + throw std::runtime_error("Required Function " + fn_name + " is not found within Python bindings"); (*bindings_)[fn_name.c_str()](convert(args)...); }
