This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit fe0dcd0affad9fe58ef67551e06b6b92789ef73c Author: Gabor Gyimesi <[email protected]> AuthorDate: Thu Apr 11 18:36:11 2024 +0200 MINIFICPP-2277 Add virtualenv support for python processors Closes #1721 Signed-off-by: Marton Szasz <[email protected]> --- conf/minifi.properties | 7 +- docker/python-verify/conda.Dockerfile | 6 +- docker/python-verify/installed.Dockerfile | 2 +- docker/python-verify/venv.Dockerfile | 2 +- docker/test/integration/cluster/ContainerStore.py | 10 +- .../test/integration/cluster/DockerTestCluster.py | 10 +- docker/test/integration/cluster/ImageStore.py | 23 ++- .../cluster/containers/MinifiContainer.py | 16 +- .../features/MiNiFi_integration_test_driver.py | 10 +- docker/test/integration/features/environment.py | 1 - docker/test/integration/features/python.feature | 10 +- docker/test/integration/features/steps/steps.py | 13 ++ encrypt-config/tests/ConfigFileEncryptorTests.cpp | 2 +- encrypt-config/tests/ConfigFileTests.cpp | 8 +- encrypt-config/tests/resources/minifi.properties | 7 +- ...th-additional-sensitive-props.minifi.properties | 7 +- extensions/python/ExecutePythonProcessor.h | 8 - extensions/python/PYTHON.md | 38 ++++- extensions/python/PythonConfigState.h | 37 +++++ extensions/python/PythonCreator.h | 5 +- extensions/python/PythonDependencyInstaller.cpp | 168 +++++++++++++++++++++ extensions/python/PythonDependencyInstaller.h | 50 ++++++ extensions/python/PythonInterpreter.cpp | 78 ++++++++++ extensions/python/PythonInterpreter.h | 48 ++++++ extensions/python/PythonObjectFactory.h | 8 - extensions/python/PythonScriptEngine.cpp | 60 +------- extensions/python/PythonScriptEngine.h | 57 +------ libminifi/include/properties/Configuration.h | 3 + libminifi/src/Configuration.cpp | 6 +- .../test/resources/encrypted.minifi.properties | 7 +- 30 files changed, 543 insertions(+), 164 deletions(-) diff --git a/conf/minifi.properties b/conf/minifi.properties index 9ba8a0a08..67de200a3 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -135,10 +135,15 @@ nifi.nar.deploy.directory=${MINIFI_HOME}/minifi-jni/nardeploy nifi.nar.docs.directory=${MINIFI_HOME}/minifi-jni/nardocs # must be comma separated nifi.jvm.options=-Xmx1G -nifi.python.processor.dir=${MINIFI_HOME}/minifi-python/ # Publish metrics to external consumers # nifi.metrics.publisher.agent.identifier= # nifi.metrics.publisher.class=PrometheusMetricsPublisher # nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 # nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation + +# Python processor properties +nifi.python.processor.dir=${MINIFI_HOME}/minifi-python/ +nifi.python.virtualenv.directory=${MINIFI_HOME}/minifi-python-env +nifi.python.install.packages.automatically=true +# nifi.python.env.setup.binary=python3 diff --git a/docker/python-verify/conda.Dockerfile b/docker/python-verify/conda.Dockerfile index 911e2bd22..d8bd4b64b 100644 --- a/docker/python-verify/conda.Dockerfile +++ b/docker/python-verify/conda.Dockerfile @@ -31,14 +31,12 @@ USER root RUN wget https://repo.anaconda.com/archive/Anaconda3-2023.09-0-Linux-x86_64.sh -P /tmp \ && echo "6c8a4abb36fbb711dc055b7049a23bbfd61d356de9468b41c5140f8a11abd851 /tmp/Anaconda3-2023.09-0-Linux-x86_64.sh" | sha256sum -c \ && bash /tmp/Anaconda3-2023.09-0-Linux-x86_64.sh -b -p /opt/conda \ - && chown -R ${USER}:${USER} /opt/conda \ - && mkdir /home/${USER} \ - && chown -R ${USER}:${USER} /home/${USER} + && chown -R ${USER}:${USER} /opt/conda USER ${USER} RUN ${CONDA_HOME}/bin/conda init bash -RUN ${CONDA_HOME}/bin/conda install langchain -c conda-forge +RUN ${CONDA_HOME}/bin/conda install "langchain<=0.17.0" -c conda-forge WORKDIR ${MINIFI_HOME} diff --git a/docker/python-verify/installed.Dockerfile b/docker/python-verify/installed.Dockerfile index c49bd0657..aa2c24abc 100644 --- a/docker/python-verify/installed.Dockerfile +++ b/docker/python-verify/installed.Dockerfile @@ -32,7 +32,7 @@ ENV MINIFI_HOME ${MINIFI_BASE_DIR}/minifi-current ENV MINIFI_VERSIONED_HOME ${MINIFI_BASE_DIR}/nifi-minifi-cpp-${MINIFI_VERSION} -RUN groupadd -g ${GID} ${USER} && useradd -g ${GID} ${USER} && \ +RUN groupadd -g ${GID} ${USER} && useradd -m -g ${GID} ${USER} && \ install -d -o ${USER} -g ${USER} ${MINIFI_BASE_DIR} && ln -s ${MINIFI_VERSIONED_HOME} ${MINIFI_HOME} ADD ${ARCHIVE_LOCATION} ${MINIFI_BASE_DIR} diff --git a/docker/python-verify/venv.Dockerfile b/docker/python-verify/venv.Dockerfile index c00a0321c..470855e04 100644 --- a/docker/python-verify/venv.Dockerfile +++ b/docker/python-verify/venv.Dockerfile @@ -26,7 +26,7 @@ USER ${USER} WORKDIR ${MINIFI_HOME} RUN python3 -m venv venv -RUN . ./venv/bin/activate && pip install --upgrade pip && pip install numpy langchain +RUN . ./venv/bin/activate && pip install --upgrade pip && pip install numpy "langchain<=0.17.0" # Start MiNiFi CPP in the foreground CMD ["/bin/bash", "-c", "source ./venv/bin/activate && ./bin/minifi.sh run"] diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index ee30f944c..0dad14fe0 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -349,8 +349,14 @@ class ContainerStore: def enable_sql_in_minifi(self): self.minifi_options.enable_sql = True - def use_nifi_python_processors_in_minifi(self): - self.minifi_options.use_nifi_python_processors = True + def use_nifi_python_processors_with_system_python_packages_installed_in_minifi(self): + self.minifi_options.use_nifi_python_processors_with_system_python_packages_installed = True + + def use_nifi_python_processors_with_virtualenv_in_minifi(self): + self.minifi_options.use_nifi_python_processors_with_virtualenv = True + + def use_nifi_python_processors_with_virtualenv_packages_installed_in_minifi(self): + self.minifi_options.use_nifi_python_processors_with_virtualenv_packages_installed = True def set_yaml_in_minifi(self): self.minifi_options.config_format = "yaml" diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index e96dd2747..4114b3f6d 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -107,8 +107,14 @@ class DockerTestCluster: def enable_multi_tenancy_in_grafana_loki(self): self.container_store.enable_multi_tenancy_in_grafana_loki() - def use_nifi_python_processors_in_minifi(self): - self.container_store.use_nifi_python_processors_in_minifi() + def use_nifi_python_processors_with_system_python_packages_installed_in_minifi(self): + self.container_store.use_nifi_python_processors_with_system_python_packages_installed_in_minifi() + + def use_nifi_python_processors_with_virtualenv_in_minifi(self): + self.container_store.use_nifi_python_processors_with_virtualenv_in_minifi() + + def use_nifi_python_processors_with_virtualenv_packages_installed_in_minifi(self): + self.container_store.use_nifi_python_processors_with_virtualenv_packages_installed_in_minifi() def set_yaml_in_minifi(self): self.container_store.set_yaml_in_minifi() diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index e915c1b27..bddc9e417 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -44,6 +44,8 @@ class ImageStore: image = self.__build_minifi_cpp_sql_image() elif container_engine == "minifi-cpp-nifi-python": image = self.__build_minifi_cpp_image_with_nifi_python_processors() + elif container_engine == "minifi-cpp-nifi-python-system-python-packages": + image = self.__build_minifi_cpp_image_with_nifi_python_processors('RUN pip3 install "langchain<=0.17.0"') elif container_engine == "http-proxy": image = self.__build_http_proxy_image() elif container_engine == "postgresql-server": @@ -95,7 +97,7 @@ class ImageStore: return self.__build_image(dockerfile) - def __build_minifi_cpp_image_with_nifi_python_processors(self): + def __build_minifi_cpp_image_with_nifi_python_processors(self, additional_cmd=""): parse_document_url = "https://raw.githubusercontent.com/apache/nifi/rel/nifi-" + NifiContainer.NIFI_VERSION + "/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/ParseDocument.py" chunk_document_url = "https://raw.githubusercontent.com/apache/nifi/rel/nifi-" + NifiContainer.NIFI_VERSION + "/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/ChunkDocument.py" pip3_install_command = "" @@ -105,15 +107,21 @@ class ImageStore: FROM {base_image} USER root {pip3_install_command} - RUN pip3 install langchain + {additional_cmd} USER minificpp COPY RotatingForwarder.py /opt/minifi/minifi-current/minifi-python/nifi_python_processors/RotatingForwarder.py RUN wget {parse_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ - wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors + wget {chunk_document_url} --directory-prefix=/opt/minifi/minifi-current/minifi-python/nifi_python_processors && \\ + echo 'langchain<=0.17.0' > /opt/minifi/minifi-current/minifi-python/nifi_python_processors/requirements.txt && \\ + python3 -m venv /opt/minifi/minifi-current/venv && \\ + python3 -m venv /opt/minifi/minifi-current/venv-with-langchain && \\ + . /opt/minifi/minifi-current/venv-with-langchain/bin/activate && python3 -m pip install --no-cache-dir "langchain<=0.17.0" && \\ + deactivate """.format(base_image='apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION, pip3_install_command=pip3_install_command, parse_document_url=parse_document_url, - chunk_document_url=chunk_document_url)) + chunk_document_url=chunk_document_url, + additional_cmd=additional_cmd)) return self.__build_image(dockerfile, [os.path.join(self.test_dir, "resources", "python", "RotatingForwarder.py")]) @@ -229,10 +237,11 @@ class ImageStore: command=['conda', '--version'], ) try: - result = container.start() + container.start() + result = container.logs() + container.remove(force=True) except docker.errors.APIError: - container.remove() + container.remove(force=True) return False - container.remove() return result.decode('utf-8').startswith('conda ') diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py index 05cc7ff52..73c14c7bf 100644 --- a/docker/test/integration/cluster/containers/MinifiContainer.py +++ b/docker/test/integration/cluster/containers/MinifiContainer.py @@ -33,7 +33,9 @@ class MinifiOptions: self.enable_prometheus = False self.enable_prometheus_with_ssl = False self.enable_sql = False - self.use_nifi_python_processors = False + self.use_nifi_python_processors_with_system_python_packages_installed = False + self.use_nifi_python_processors_with_virtualenv = False + self.use_nifi_python_processors_with_virtualenv_packages_installed = False self.config_format = "json" self.use_flow_config_from_url = False self.set_ssl_context_properties = False @@ -148,6 +150,14 @@ class MinifiContainer(FlowContainer): f.write("controller.socket.port=9998\n") f.write("controller.socket.local.any.interface=false\n") + if self.options.use_nifi_python_processors_with_virtualenv: + f.write("nifi.python.virtualenv.directory=/opt/minifi/minifi-current/venv\n") + elif self.options.use_nifi_python_processors_with_virtualenv_packages_installed: + f.write("nifi.python.virtualenv.directory=/opt/minifi/minifi-current/venv-with-langchain\n") + + if self.options.use_nifi_python_processors_with_virtualenv: + f.write("nifi.python.install.packages.automatically=true\n") + def _setup_config(self): self._create_properties() if not self.options.use_flow_config_from_url: @@ -166,7 +176,9 @@ class MinifiContainer(FlowContainer): if self.options.enable_sql: image = self.image_store.get_image('minifi-cpp-sql') - elif self.options.use_nifi_python_processors: + elif self.options.use_nifi_python_processors_with_system_python_packages_installed: + image = self.image_store.get_image('minifi-cpp-nifi-python-system-python-packages') + elif self.options.use_nifi_python_processors_with_virtualenv or self.options.use_nifi_python_processors_with_virtualenv_packages_installed: image = self.image_store.get_image('minifi-cpp-nifi-python') else: image = 'apacheminificpp:' + MinifiContainer.MINIFI_TAG_PREFIX + MinifiContainer.MINIFI_VERSION diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 3ee5274c1..35fd97bcd 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -423,8 +423,14 @@ class MiNiFi_integration_test: def enable_multi_tenancy_in_grafana_loki(self): self.cluster.enable_multi_tenancy_in_grafana_loki() - def use_nifi_python_processors_in_minifi(self): - self.cluster.use_nifi_python_processors_in_minifi() + def use_nifi_python_processors_with_system_python_packages_installed_in_minifi(self): + self.cluster.use_nifi_python_processors_with_system_python_packages_installed_in_minifi() + + def use_nifi_python_processors_with_virtualenv_in_minifi(self): + self.cluster.use_nifi_python_processors_with_virtualenv_in_minifi() + + def use_nifi_python_processors_with_virtualenv_packages_installed_in_minifi(self): + self.cluster.use_nifi_python_processors_with_virtualenv_packages_installed_in_minifi() def set_yaml_in_minifi(self): self.cluster.set_yaml_in_minifi() diff --git a/docker/test/integration/features/environment.py b/docker/test/integration/features/environment.py index 12fc1403a..5483b1138 100644 --- a/docker/test/integration/features/environment.py +++ b/docker/test/integration/features/environment.py @@ -52,7 +52,6 @@ def before_scenario(context, scenario): if not context.image_store.is_conda_available_in_minifi_image() and context.image_store.get_minifi_image_python_version() < (3, 8, 1): scenario.skip("NiFi Python processor tests use langchain library which requires Python 3.8.1 or later.") return - context.test.use_nifi_python_processors_in_minifi() for step in scenario.steps: inject_feature_id(context, step) diff --git a/docker/test/integration/features/python.feature b/docker/test/integration/features/python.feature index 90c9c3f3e..c4ad12922 100644 --- a/docker/test/integration/features/python.feature +++ b/docker/test/integration/features/python.feature @@ -65,7 +65,7 @@ Feature: MiNiFi can use python processors in its flows Then flowfiles with these contents are placed in the monitored directory in less than 5 seconds: "0,1,2,3,4,5" @USE_NIFI_PYTHON_PROCESSORS - Scenario: MiNiFi C++ can use NiFi native python processors + Scenario Outline: MiNiFi C++ can use native NiFi python processors Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with filename "test_file.log" and content "test_data" is present in "/tmp/input" And a ParseDocument processor @@ -73,6 +73,7 @@ Feature: MiNiFi can use python processors in its flows And the "Chunk Overlap" property of the ChunkDocument processor is set to "3" And a PutFile processor with the "Directory" property set to "/tmp/output" And a LogAttribute processor with the "FlowFiles To Log" property set to "0" + And python is installed on the MiNiFi agent <python_install_mode> And the "success" relationship of the GetFile processor is connected to the ParseDocument And the "success" relationship of the ParseDocument processor is connected to the ChunkDocument @@ -83,6 +84,12 @@ Feature: MiNiFi can use python processors in its flows Then at least one flowfile's content match the following regex: '{"text": "test_", "metadata": {"filename": "test_file.log", "uuid": "", "chunk_index": 0, "chunk_count": 3}}' in less than 30 seconds And the Minifi logs contain the following message: "key:document.count value:3" in less than 10 seconds + Examples: Different python installation modes + | python_install_mode | + | with required python packages | + | with a pre-created virtualenv | + | with a pre-created virtualenv containing the required python packages | + @USE_NIFI_PYTHON_PROCESSORS Scenario: MiNiFi C++ can use custom relationships in NiFi native python processors Given a GetFile processor with the "Input Directory" property set to "/tmp/input" @@ -92,6 +99,7 @@ Feature: MiNiFi can use python processors in its flows And a file with filename "test_file4.log" and content "test_data_four" is present in "/tmp/input" And a RotatingForwarder processor And a PutFile processor with the "Directory" property set to "/tmp/output" + And python is installed on the MiNiFi agent with a pre-created virtualenv And the "success" relationship of the GetFile processor is connected to the RotatingForwarder And the "first" relationship of the RotatingForwarder processor is connected to the PutFile diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 0d88cb087..3bc9e781c 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -1273,3 +1273,16 @@ def step_impl(context, processor_name: str): @given(u'a reverse proxy is set up to forward requests to the Grafana Loki server') def step_impl(context): context.test.acquire_container(context=context, name="reverse-proxy", engine="reverse-proxy") + + +# Python +@given("python is installed on the MiNiFi agent {install_mode}") +def step_impl(context, install_mode): + if install_mode == "with required python packages": + context.test.use_nifi_python_processors_with_system_python_packages_installed_in_minifi() + elif install_mode == "with a pre-created virtualenv": + context.test.use_nifi_python_processors_with_virtualenv_in_minifi() + elif install_mode == "with a pre-created virtualenv containing the required python packages": + context.test.use_nifi_python_processors_with_virtualenv_packages_installed_in_minifi() + else: + raise Exception("Unknown python install mode.") diff --git a/encrypt-config/tests/ConfigFileEncryptorTests.cpp b/encrypt-config/tests/ConfigFileEncryptorTests.cpp index 0df8878b6..3793df3e1 100644 --- a/encrypt-config/tests/ConfigFileEncryptorTests.cpp +++ b/encrypt-config/tests/ConfigFileEncryptorTests.cpp @@ -77,7 +77,7 @@ TEST_CASE("ConfigFileEncryptor can encrypt the sensitive properties", "[encrypt- uint32_t num_properties_encrypted = encryptSensitivePropertiesInFile(test_file, KEY); REQUIRE(num_properties_encrypted == 1); - REQUIRE(test_file.size() == 110); + REQUIRE(test_file.size() == 115); REQUIRE(check_encryption(test_file, Configuration::nifi_rest_api_password, original_password.length())); SECTION("calling encryptSensitiveValuesInMinifiProperties a second time does nothing") { diff --git a/encrypt-config/tests/ConfigFileTests.cpp b/encrypt-config/tests/ConfigFileTests.cpp index aa1589499..5f97bff8b 100644 --- a/encrypt-config/tests/ConfigFileTests.cpp +++ b/encrypt-config/tests/ConfigFileTests.cpp @@ -90,7 +90,7 @@ TEST_CASE("ConfigFile creates an empty object from a nonexistent file", "[encryp TEST_CASE("ConfigFile can parse a simple config file", "[encrypt-config][constructor]") { ConfigFile test_file{std::ifstream{"resources/minifi.properties"}}; - REQUIRE(test_file.size() == 109); + REQUIRE(test_file.size() == 114); } TEST_CASE("ConfigFile can test whether a key is present", "[encrypt-config][hasValue]") { @@ -102,7 +102,7 @@ TEST_CASE("ConfigFile can test whether a key is present", "[encrypt-config][hasV TEST_CASE("ConfigFile can read empty properties correctly", "[encrypt-config][constructor]") { ConfigFile test_file{std::ifstream{"resources/with-additional-sensitive-props.minifi.properties"}}; - REQUIRE(test_file.size() == 110); + REQUIRE(test_file.size() == 115); auto empty_property = test_file.getValue(Configuration::nifi_security_need_ClientAuth); REQUIRE(empty_property); @@ -143,7 +143,7 @@ TEST_CASE("ConfigFile can add a new setting after an existing setting", "[encryp SECTION("valid key") { test_file.insertAfter(Configuration::nifi_rest_api_password, "nifi.rest.api.password.protected", "my-cipher-name"); - REQUIRE(test_file.size() == 110); + REQUIRE(test_file.size() == 115); REQUIRE(test_file.getValue("nifi.rest.api.password.protected") == "my-cipher-name"); } @@ -158,7 +158,7 @@ TEST_CASE("ConfigFile can add a new setting at the end", "[encrypt-config][appen const std::string KEY = "nifi.bootstrap.sensitive.key"; const std::string VALUE = "aa411f289c91685ef9d5a9e5a4fad9393ff4c7a78ab978484323488caed7a9ab"; test_file.append(KEY, VALUE); - REQUIRE(test_file.size() == 110); + REQUIRE(test_file.size() == 115); REQUIRE(test_file.getValue(KEY) == std::make_optional(VALUE)); } diff --git a/encrypt-config/tests/resources/minifi.properties b/encrypt-config/tests/resources/minifi.properties index 8a19c0ab9..824f55a50 100644 --- a/encrypt-config/tests/resources/minifi.properties +++ b/encrypt-config/tests/resources/minifi.properties @@ -98,7 +98,6 @@ nifi.nar.deploy.directory=${MINIFI_HOME}/minifi-jni/nardeploy nifi.nar.docs.directory=${MINIFI_HOME}/minifi-jni/nardocs # must be comma separated nifi.jvm.options=-Xmx1G -nifi.python.processor.dir=${MINIFI_HOME}/minifi-python/ nifi.c2.flow.id= nifi.c2.flow.url= @@ -107,3 +106,9 @@ nifi.c2.flow.url= # nifi.metrics.publisher.class=PrometheusMetricsPublisher # nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 # nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation + +# Python processor properties +nifi.python.processor.dir=${MINIFI_HOME}/minifi-python/ +nifi.python.virtualenv.directory=${MINIFI_HOME}/minifi-python-env +nifi.python.install.packages.automatically=true +# nifi.python.env.setup.binary=python3 diff --git a/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties b/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties index 75da62618..a9739fe57 100644 --- a/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties +++ b/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties @@ -99,7 +99,6 @@ nifi.nar.deploy.directory=${MINIFI_HOME}/minifi-jni/nardeploy nifi.nar.docs.directory=${MINIFI_HOME}/minifi-jni/nardocs # must be comma separated nifi.jvm.options=-Xmx1G -nifi.python.processor.dir=${MINIFI_HOME}/minifi-python/ nifi.c2.flow.id= nifi.c2.flow.url= @@ -108,3 +107,9 @@ nifi.c2.flow.url= # nifi.metrics.publisher.class=PrometheusMetricsPublisher # nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 # nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation + +# Python processor properties +nifi.python.processor.dir=${MINIFI_HOME}/minifi-python/ +nifi.python.virtualenv.directory=${MINIFI_HOME}/minifi-python-env +nifi.python.install.packages.automatically=true +# nifi.python.env.setup.binary=python3 diff --git a/extensions/python/ExecutePythonProcessor.h b/extensions/python/ExecutePythonProcessor.h index aea941374..7de09a37a 100644 --- a/extensions/python/ExecutePythonProcessor.h +++ b/extensions/python/ExecutePythonProcessor.h @@ -35,10 +35,6 @@ #include "PythonScriptEngine.h" #include "PythonScriptEngine.h" -#if defined(__GNUC__) || defined(__GNUG__) -#pragma GCC visibility push(hidden) -#endif - namespace org::apache::nifi::minifi::extensions::python::processors { class ExecutePythonProcessor : public core::Processor { @@ -162,7 +158,3 @@ class ExecutePythonProcessor : public core::Processor { }; } // namespace org::apache::nifi::minifi::extensions::python::processors - -#if defined(__GNUC__) || defined(__GNUG__) -#pragma GCC visibility pop -#endif diff --git a/extensions/python/PYTHON.md b/extensions/python/PYTHON.md index bb496daf8..52d7031a5 100644 --- a/extensions/python/PYTHON.md +++ b/extensions/python/PYTHON.md @@ -25,6 +25,9 @@ This readme defines the configuration parameters to use ExecutePythonProcessor t - [Configuration](#configuration) - [Processors](#processors) - [Using NiFi Python Processors](#using-nifi-python-processors) +- [Use Python processors from virtualenv](#use-python-processors-from-virtualenv) +- [Automatically install dependencies from requirements.txt files](#automatically-install-dependencies-from-requirementstxt-files) +- [Set python binary for virtualenv creation and package installation](#set-python-binary-for-virtualenv-creation-and-package-installation) ## Requirements @@ -130,9 +133,9 @@ Therefore if the nifi.python.processor.dir is /tmp/ and you have a subdirectory produce a processor with the name org.apache.nifi.minifi.processors.packagedir.file. Note that each subdirectory will append a package to the reference class name. - in minifi.properties - #directory where processors exist - nifi.python.processor.dir=XXXX + # in minifi.properties + #directory where processors exist + nifi.python.processor.dir=XXXX ## Processors @@ -142,8 +145,8 @@ exist. ### Sentiment Analysis The SentimentAnalysis processor will perform a Vader Sentiment Analysis. This requires that you install nltk and VaderSentiment - pip install nltk - pip install VaderSentiment + pip install nltk + pip install VaderSentiment ## Using NiFi Python Processors @@ -155,10 +158,33 @@ In the flow configuration these Python processors can be referenced by their ful Due to some differences between the NiFi and MiNiFi C++ processors and implementation, there are some limitations using the NiFi Python processors: - Record based processors are not yet supported in MiNiFi C++, so the NiFi Python processors inherited from RecordTransform are not supported. -- Virtualenv support is not yet available in MiNiFi C++, so all required packages must be installed on the system. - Controller properties are not supported at the moment. - There are some validators in NiFi that are not present in MiNiFi C++, so some property validations will be missing using the NiFi Python processors. - Allowable values specified in NiFi Python processors are ignored in MiNiFi C++ (due to MiNiFi C++ requiring them to be specified at compile time), so the property values are not pre-verified. - MiNiFi C++ only supports expression language with flow file attributes, so only FLOWFILE_ATTRIBUTES expression language scope is supported, otherwise the expression language will not be evaluated. - MiNiFi C++ does not support property dependencies, so the property dependencies will be ignored. If a property depends on another property, the property will not be required. - MiNiFi C++ does not support the use of self.jvm member in Python processors that provides JVM bindings in NiFi, it is set to None in MiNiFi C++. +- Inline definition of Python package dependencies, defined in the ProcessorDetails nested class are not supported as in NiFi, so the dependencies must be defined in the requirements.txt files. If a processor's dependencies are defined in the ProcessorDetails class, the dependencies should be copied to the requirements.txt file. + +## Use Python processors from virtualenv + +It is possible to set a virtualenv to be used by the Python processors in Apache MiNiFi C++. If the virtualenv directory is set, the Python processors will be executed using the packages installed in the virtualenv. If the virtualenv directory is not set, the Python processors will be executed using the packages installed on the system. + + # in minifi.properties + nifi.python.virtualenv.directory=${MINIFI_HOME}/minifi-python-env + +**NOTE:* Using different python versions for the system and the virtualenv is not supported. The virtualenv must be created using the same python version as the system python. + +## Automatically install dependencies from requirements.txt files + +It is possible to automatically install the dependencies of the Python processors defined in requirements.txt files into a virtualenv. To enable this feature, the `nifi.python.install.packages.automatically` property must be set to true, and the `nifi.python.virtualenv.directory` property must be set to a directory where a virtualenv either already exists, or it can be set up. In this case, all requirements.txt files that appear under the MiNiFi Python directory (defined by the `nifi.pyt [...] + + # in minifi.properties + nifi.python.install.packages.automatically=true + +## Set python binary for virtualenv creation and package installation + +By default the `python3` command is used on Unix systems and `python` command is used on Windows to create virtualenvs and call the `pip` command for installing Python packages. This can be changed using the `nifi.python.env.setup.binary` property to use a different python command or a specific python binary path. + + # in minifi.properties + nifi.python.env.setup.binary=python3 diff --git a/extensions/python/PythonConfigState.h b/extensions/python/PythonConfigState.h new file mode 100644 index 000000000..e1ea18e80 --- /dev/null +++ b/extensions/python/PythonConfigState.h @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <filesystem> +#include <string> + +namespace org::apache::nifi::minifi::extensions::python { + +struct PythonConfigState { + bool isPackageInstallationNeeded() const { + return install_python_packages_automatically && !virtualenv_path.empty(); + } + + std::filesystem::path virtualenv_path; + std::filesystem::path python_processor_dir; + std::string python_binary; + bool install_python_packages_automatically = false; +}; + +} // namespace org::apache::nifi::minifi::extensions::python diff --git a/extensions/python/PythonCreator.h b/extensions/python/PythonCreator.h index 198f85da9..94a6131f6 100644 --- a/extensions/python/PythonCreator.h +++ b/extensions/python/PythonCreator.h @@ -37,6 +37,7 @@ #include "properties/Configuration.h" #include "utils/file/FilePattern.h" #include "range/v3/view/filter.hpp" +#include "PythonDependencyInstaller.h" namespace org::apache::nifi::minifi::extensions::python { @@ -56,8 +57,8 @@ class PythonCreator : public minifi::core::CoreComponent { } void configure(const std::shared_ptr<Configure> &configuration) override { - python::PythonScriptEngine::initialize(); - + PythonDependencyInstaller dependency_installer(configuration); + dependency_installer.installDependenciesFromRequirementsFiles(); auto engine = std::make_shared<python::PythonScriptEngine>(); std::optional<std::string> pathListings = configuration ? configuration->get(minifi::Configuration::nifi_python_processor_dir) : std::nullopt; if (!pathListings) { diff --git a/extensions/python/PythonDependencyInstaller.cpp b/extensions/python/PythonDependencyInstaller.cpp new file mode 100644 index 000000000..f98e47bb1 --- /dev/null +++ b/extensions/python/PythonDependencyInstaller.cpp @@ -0,0 +1,168 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PythonDependencyInstaller.h" + +#include "PythonScriptException.h" +#include "PythonInterpreter.h" +#include "PyException.h" +#include "types/Types.h" + +namespace org::apache::nifi::minifi::extensions::python { + +namespace { + +std::string getPythonBinary(const std::shared_ptr<Configure> &configuration) { +#if WIN32 + std::string python_binary = "python"; +#else + std::string python_binary = "python3"; +#endif + if (auto binary = configuration->get(minifi::Configuration::nifi_python_env_setup_binary)) { + python_binary = *binary; + } + return python_binary; +} + +// On Windows when calling a system command using std::system, the whole command needs to be encapsulated in additional quotes, +// due to the std::system passing the command to 'cmd.exe /C' which needs the additional quotes to handle the command as a single argument +std::string encapsulateCommandInQuotesIfNeeded(const std::string& command) { +#if WIN32 + return "\"" + command + "\""; +#else + return command; +#endif +} + +} // namespace + +PythonDependencyInstaller::PythonDependencyInstaller(const std::shared_ptr<Configure> &configuration) { + python_binary_ = getPythonBinary(configuration); + install_python_packages_automatically_ = (configuration->get(Configuration::nifi_python_install_packages_automatically) | utils::andThen(&utils::string::toBool)).value_or(false); + if (auto path = configuration->get(minifi::Configuration::nifi_python_virtualenv_directory)) { + virtualenv_path_ = *path; + logger_->log_debug("Python virtualenv path was specified at: {}", virtualenv_path_.string()); + } else { + logger_->log_debug("No valid python virtualenv path was specified"); + } + if (auto python_processor_dir = configuration->get(minifi::Configuration::nifi_python_processor_dir)) { + python_processor_dir_ = *python_processor_dir; + logger_->log_debug("Python processor dir was specified at: {}", python_processor_dir_.string()); + } else { + logger_->log_debug("No valid python processor dir was not specified in properties"); + } + createVirtualEnvIfSpecified(); + addVirtualenvToPath(); +} + +std::vector<std::filesystem::path> PythonDependencyInstaller::getRequirementsFilePaths() const { + if (!std::filesystem::exists(python_processor_dir_)) { + return {}; + } + std::vector<std::filesystem::path> paths; + for (const auto& entry : std::filesystem::recursive_directory_iterator(std::filesystem::path{python_processor_dir_})) { + if (std::filesystem::is_regular_file(entry.path()) && entry.path().filename() == "requirements.txt") { + paths.push_back(entry.path()); + } + } + return paths; +} + +void PythonDependencyInstaller::createVirtualEnvIfSpecified() const { + if (virtualenv_path_.empty()) { + if (install_python_packages_automatically_) { + logger_->log_warn("Python virtualenv path was not specified, but automatic python dependency installation was requested. " + "Specify python virtualenv path in properties to enable automatic python dependency installation."); + } + return; + } + if (!std::filesystem::exists(virtualenv_path_) || std::filesystem::is_empty(virtualenv_path_)) { + logger_->log_info("Creating python virtual env at: {}", virtualenv_path_.string()); + auto venv_command = "\"" + python_binary_ + "\" -m venv \"" + virtualenv_path_.string() + "\""; + auto return_value = std::system(encapsulateCommandInQuotesIfNeeded(venv_command).c_str()); + if (return_value != 0) { + throw PythonScriptException(fmt::format("The following command creating python virtual env failed: '{}'", venv_command)); + } + } +} + +void PythonDependencyInstaller::installDependenciesFromRequirementsFiles() const { + if (!isPackageInstallationNeeded()) { + return; + } + auto requirement_file_paths = getRequirementsFilePaths(); + for (const auto& requirements_file_path : requirement_file_paths) { + logger_->log_info("Installing python packages from the following requirements.txt file: {}", requirements_file_path.string()); + std::string pip_command; +#if WIN32 + pip_command.append("\"").append((virtualenv_path_ / "Scripts" / "activate.bat").string()).append("\" && "); +#else + pip_command.append(". \"").append((virtualenv_path_ / "bin" / "activate").string()).append("\" && "); +#endif + pip_command.append("\"").append(python_binary_).append("\" -m pip install --no-cache-dir -r \"").append(requirements_file_path.string()).append("\""); + auto return_value = std::system(encapsulateCommandInQuotesIfNeeded(pip_command).c_str()); + if (return_value != 0) { + throw PythonScriptException(fmt::format("The following command to install python packages failed: '{}'", pip_command)); + } + } +} + +void PythonDependencyInstaller::evalScript(std::string_view script) { + GlobalInterpreterLock gil; + const auto script_file = minifi::utils::string::join_pack("# -*- coding: utf-8 -*-\n", script); + auto compiled_string = OwnedObject(Py_CompileString(script_file.c_str(), "<string>", Py_file_input)); + if (!compiled_string.get()) { + throw PyException(); + } + + OwnedDict bindings = OwnedDict::create(); + const auto result = OwnedObject(PyEval_EvalCode(compiled_string.get(), bindings.get(), bindings.get())); + if (!result.get()) { + throw PyException(); + } +} + +void PythonDependencyInstaller::addVirtualenvToPath() const { + if (virtualenv_path_.empty()) { + return; + } + Interpreter::getInterpreter(); + if (!virtualenv_path_.empty()) { +#if WIN32 + std::filesystem::path site_package_path = virtualenv_path_ / "Lib" / "site-packages"; +#else + std::string python_dir_name; + auto lib_path = virtualenv_path_ / "lib"; + for (auto const& dir_entry : std::filesystem::directory_iterator{lib_path}) { + if (minifi::utils::string::startsWith(dir_entry.path().filename().string(), "python")) { + python_dir_name = dir_entry.path().filename().string(); + break; + } + } + if (python_dir_name.empty()) { + throw PythonScriptException("Could not find python directory under virtualenv lib dir: " + lib_path.string()); + } + std::filesystem::path site_package_path = virtualenv_path_ / "lib" / python_dir_name / "site-packages"; +#endif + if (!std::filesystem::exists(site_package_path)) { + throw PythonScriptException("Could not find python site package path: " + site_package_path.string()); + } + evalScript("import sys\nsys.path.append(r'" + site_package_path.string() + "')"); + } +} + +} // namespace org::apache::nifi::minifi::extensions::python diff --git a/extensions/python/PythonDependencyInstaller.h b/extensions/python/PythonDependencyInstaller.h new file mode 100644 index 000000000..7cb271194 --- /dev/null +++ b/extensions/python/PythonDependencyInstaller.h @@ -0,0 +1,50 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <memory> +#include <filesystem> + +#include "core/logging/Logger.h" +#include "core/logging/LoggerConfiguration.h" +#include "properties/Configure.h" + +namespace org::apache::nifi::minifi::extensions::python { + +class PythonDependencyInstaller { + public: + explicit PythonDependencyInstaller(const std::shared_ptr<Configure> &configuration); + void installDependenciesFromRequirementsFiles() const; + + private: + std::vector<std::filesystem::path> getRequirementsFilePaths() const; + void createVirtualEnvIfSpecified() const; + static void evalScript(std::string_view script); + void addVirtualenvToPath() const; + bool isPackageInstallationNeeded() const { + return install_python_packages_automatically_ && !virtualenv_path_.empty(); + } + + std::filesystem::path virtualenv_path_; + std::filesystem::path python_processor_dir_; + std::string python_binary_; + bool install_python_packages_automatically_ = false; + std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PythonDependencyInstaller>::getLogger(); +}; + +} // namespace org::apache::nifi::minifi::extensions::python diff --git a/extensions/python/PythonInterpreter.cpp b/extensions/python/PythonInterpreter.cpp new file mode 100644 index 000000000..267db16fd --- /dev/null +++ b/extensions/python/PythonInterpreter.cpp @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "PythonInterpreter.h" + +#include "PythonBindings.h" + +namespace org::apache::nifi::minifi::extensions::python { + +Interpreter* Interpreter::getInterpreter() { + static Interpreter interpreter; + return &interpreter; +} + +GlobalInterpreterLock::GlobalInterpreterLock() + : gil_state_(PyGILState_Ensure()) { +} + +GlobalInterpreterLock::~GlobalInterpreterLock() { + PyGILState_Release(gil_state_); +} + +namespace { +// PyEval_InitThreads might be marked deprecated (depending on the version of Python.h) +// Python <= 3.6: This needs to be called manually after Py_Initialize to initialize threads +// Python >= 3.7: Noop function since its functionality is included in Py_Initialize +// Python >= 3.9: Marked as deprecated (still noop) +// This can be removed if we drop the support for Python 3.6 +void initThreads() { +#if defined(__clang__) + #pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wdeprecated-declarations" +#elif defined(__GNUC__) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#elif defined(WIN32) + #pragma warning(push) +#pragma warning(disable: 4996) +#endif + if (!PyEval_ThreadsInitialized()) + PyEval_InitThreads(); +#if defined(__clang__) +#pragma clang diagnostic pop +#elif defined(__GNUC__) +#pragma GCC diagnostic pop +#elif defined(WIN32) +#pragma warning(pop) +#endif +} + +} // namespace + +Interpreter::Interpreter() { + Py_Initialize(); + initThreads(); + PyInit_minifi_native(); + saved_thread_state_ = PyEval_SaveThread(); // NOLINT(cppcoreguidelines-prefer-member-initializer) +} + +Interpreter::~Interpreter() { + PyEval_RestoreThread(saved_thread_state_); + Py_Finalize(); +} + +} // namespace org::apache::nifi::minifi::extensions::python diff --git a/extensions/python/PythonInterpreter.h b/extensions/python/PythonInterpreter.h new file mode 100644 index 000000000..e13c2a9be --- /dev/null +++ b/extensions/python/PythonInterpreter.h @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "Python.h" + +namespace org::apache::nifi::minifi::extensions::python { + +class GlobalInterpreterLock { + public: + GlobalInterpreterLock(); + ~GlobalInterpreterLock(); + + private: + PyGILState_STATE gil_state_; +}; + +class Interpreter { + Interpreter(); + ~Interpreter(); + + public: + static Interpreter* getInterpreter(); + + Interpreter(const Interpreter& other) = delete; + Interpreter(Interpreter&& other) = delete; + Interpreter& operator=(const Interpreter& other) = delete; + Interpreter& operator=(Interpreter&& other) = delete; + + public: + PyThreadState* saved_thread_state_ = nullptr; +}; + +} // namespace org::apache::nifi::minifi::extensions::python diff --git a/extensions/python/PythonObjectFactory.h b/extensions/python/PythonObjectFactory.h index 729f8ad8a..25f3a825a 100644 --- a/extensions/python/PythonObjectFactory.h +++ b/extensions/python/PythonObjectFactory.h @@ -28,10 +28,6 @@ #include "ExecutePythonProcessor.h" #include "utils/StringUtils.h" -#if defined(__GNUC__) || defined(__GNUG__) -#pragma GCC visibility push(hidden) -#endif - enum class PythonProcessorType { MINIFI_TYPE, NIFI_TYPE @@ -100,7 +96,3 @@ class PythonObjectFactory : public org::apache::nifi::minifi::core::DefautObject std::vector<std::filesystem::path> python_paths_; PythonProcessorType python_processor_type_; }; - -#if defined(__GNUC__) || defined(__GNUG__) -#pragma GCC visibility pop -#endif diff --git a/extensions/python/PythonScriptEngine.cpp b/extensions/python/PythonScriptEngine.cpp index 6f215301b..568f5f9b1 100644 --- a/extensions/python/PythonScriptEngine.cpp +++ b/extensions/python/PythonScriptEngine.cpp @@ -14,12 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #include <string> -#include <filesystem> #include "PythonScriptEngine.h" -#include "PythonBindings.h" +#include "PythonConfigState.h" #include "types/PyProcessSession.h" #include "types/PyProcessContext.h" #include "types/PyProcessor.h" @@ -28,60 +26,6 @@ namespace org::apache::nifi::minifi::extensions::python { -Interpreter* Interpreter::getInterpreter() { - static Interpreter interpreter; - return &interpreter; -} - -GlobalInterpreterLock::GlobalInterpreterLock() - : gil_state_(PyGILState_Ensure()) { -} - -GlobalInterpreterLock::~GlobalInterpreterLock() { - PyGILState_Release(gil_state_); -} - -namespace { -// PyEval_InitThreads might be marked deprecated (depending on the version of Python.h) -// Python <= 3.6: This needs to be called manually after Py_Initialize to initialize threads -// Python >= 3.7: Noop function since its functionality is included in Py_Initialize -// Python >= 3.9: Marked as deprecated (still noop) -// This can be removed if we drop the support for Python 3.6 -void initThreads() { -#if defined(__clang__) - #pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wdeprecated-declarations" -#elif defined(__GNUC__) -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" -#elif defined(WIN32) - #pragma warning(push) -#pragma warning(disable: 4996) -#endif - if (!PyEval_ThreadsInitialized()) - PyEval_InitThreads(); -#if defined(__clang__) -#pragma clang diagnostic pop -#elif defined(__GNUC__) -#pragma GCC diagnostic pop -#elif defined(WIN32) -#pragma warning(pop) -#endif -} -} // namespace - -Interpreter::Interpreter() { - Py_Initialize(); - initThreads(); - PyInit_minifi_native(); - saved_thread_state_ = PyEval_SaveThread(); // NOLINT(cppcoreguidelines-prefer-member-initializer) -} - -Interpreter::~Interpreter() { - PyEval_RestoreThread(saved_thread_state_); - Py_Finalize(); -} - PythonScriptEngine::PythonScriptEngine() { Interpreter::getInterpreter(); @@ -171,7 +115,7 @@ void PythonScriptEngine::initialize(const core::Relationship& success, const cor } void PythonScriptEngine::evalInternal(std::string_view script) { - const auto script_file = "# -*- coding: utf-8 -*-\n" + std::string(script); + const auto script_file = minifi::utils::string::join_pack("# -*- coding: utf-8 -*-\n", script); auto compiled_string = OwnedObject(Py_CompileString(script_file.c_str(), "<string>", Py_file_input)); if (!compiled_string.get()) { throw PyException(); diff --git a/extensions/python/PythonScriptEngine.h b/extensions/python/PythonScriptEngine.h index e92f428c2..a25edf409 100644 --- a/extensions/python/PythonScriptEngine.h +++ b/extensions/python/PythonScriptEngine.h @@ -14,67 +14,31 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #pragma once -#include "PythonBindings.h" -#include "PyException.h" - +#include <cstdlib> #include <mutex> #include <memory> #include <utility> #include <exception> #include <string> #include <vector> +#include <filesystem> #include "core/ProcessSession.h" #include "core/Processor.h" +#include "PythonBindings.h" +#include "PyException.h" #include "PythonProcessor.h" #include "types/PyProcessSession.h" #include "PythonScriptException.h" - -#if defined(__GNUC__) || defined(__GNUG__) -#pragma GCC visibility push(hidden) -#endif +#include "properties/Configuration.h" +#include "PythonInterpreter.h" namespace org::apache::nifi::minifi::extensions::python { -#if defined(__GNUC__) || defined(__GNUG__) -class __attribute__((visibility("default"))) GlobalInterpreterLock { -#else -class GlobalInterpreterLock { -#endif - public: - GlobalInterpreterLock(); - ~GlobalInterpreterLock(); - - private: - PyGILState_STATE gil_state_; -}; - -class Interpreter { - Interpreter(); - ~Interpreter(); - - public: - static Interpreter* getInterpreter(); - - Interpreter(const Interpreter& other) = delete; - Interpreter(Interpreter&& other) = delete; - Interpreter& operator=(const Interpreter& other) = delete; - Interpreter& operator=(Interpreter&& other) = delete; - - public: - PyThreadState* saved_thread_state_ = nullptr; -}; - - -#if defined(__GNUC__) || defined(__GNUG__) -class __attribute__((visibility("default"))) PythonScriptEngine { -#else class PythonScriptEngine { -#endif public: PythonScriptEngine(); ~PythonScriptEngine(); @@ -84,8 +48,6 @@ class PythonScriptEngine { PythonScriptEngine& operator=(const PythonScriptEngine& other) = delete; PythonScriptEngine& operator=(PythonScriptEngine&& other) = delete; - static void initialize() {} - void eval(const std::string& script); void evalFile(const std::filesystem::path& file_name); @@ -173,10 +135,11 @@ class PythonScriptEngine { void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session); void initialize(const core::Relationship& success, const core::Relationship& failure, const core::Relationship& original, const std::shared_ptr<core::logging::Logger>& logger); void initializeProcessorObject(const std::string& python_class_name); + private: void evalInternal(std::string_view script); - void evaluateModuleImports(); + OwnedDict bindings_; OwnedObject processor_instance_; std::optional<std::string> processor_class_name_; @@ -184,7 +147,3 @@ class PythonScriptEngine { }; } // namespace org::apache::nifi::minifi::extensions::python - -#if defined(__GNUC__) || defined(__GNUG__) -#pragma GCC visibility pop -#endif diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index ab1c1b81e..cec669155 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -200,6 +200,9 @@ class Configuration : public Properties { static constexpr const char *controller_ssl_context_service = "controller.ssl.context.service"; static constexpr const char *nifi_flow_file_repository_check_health = "nifi.flowfile.repository.check.health"; + static constexpr const char *nifi_python_virtualenv_directory = "nifi.python.virtualenv.directory"; + static constexpr const char *nifi_python_env_setup_binary = "nifi.python.env.setup.binary"; + static constexpr const char *nifi_python_install_packages_automatically = "nifi.python.install.packages.automatically"; MINIFIAPI static const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyValidator*>> CONFIGURATION_PROPERTIES; MINIFIAPI static const std::array<const char*, 2> DEFAULT_SENSITIVE_PROPERTIES; diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 093e81747..7a4028eb0 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -152,7 +152,11 @@ const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyVal {Configuration::controller_socket_local_any_interface, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)}, {Configuration::controller_socket_host, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)}, {Configuration::controller_socket_port, gsl::make_not_null(&core::StandardPropertyTypes::PORT_TYPE)}, - {Configuration::controller_ssl_context_service, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)} + {Configuration::controller_ssl_context_service, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)}, + {Configuration::nifi_flow_file_repository_check_health, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)}, + {Configuration::nifi_python_virtualenv_directory, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)}, + {Configuration::nifi_python_env_setup_binary, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)}, + {Configuration::nifi_python_install_packages_automatically, gsl::make_not_null(&core::StandardPropertyTypes::BOOLEAN_TYPE)} }; const std::array<const char*, 2> Configuration::DEFAULT_SENSITIVE_PROPERTIES = {Configuration::nifi_security_client_pass_phrase, diff --git a/libminifi/test/resources/encrypted.minifi.properties b/libminifi/test/resources/encrypted.minifi.properties index f83e03206..2f79928f2 100644 --- a/libminifi/test/resources/encrypted.minifi.properties +++ b/libminifi/test/resources/encrypted.minifi.properties @@ -100,7 +100,6 @@ nifi.nar.deploy.directory=${MINIFI_HOME}/minifi-jni/nardeploy nifi.nar.docs.directory=${MINIFI_HOME}/minifi-jni/nardocs # must be comma separated nifi.jvm.options=-Xmx1G -nifi.python.processor.dir=${MINIFI_HOME}/minifi-python/ nifi.c2.flow.id= nifi.c2.flow.url= @@ -111,3 +110,9 @@ nifi.sensitive.props.additional.keys=c2.agent.identifier # nifi.metrics.publisher.class=PrometheusMetricsPublisher # nifi.metrics.publisher.PrometheusMetricsPublisher.port=9936 # nifi.metrics.publisher.metrics=QueueMetrics,RepositoryMetrics,GetFileMetrics,DeviceInfoNode,FlowInformation + +# Python processor properties +nifi.python.processor.dir=${MINIFI_HOME}/minifi-python/ +nifi.python.virtualenv.directory=${MINIFI_HOME}/minifi-python-env +nifi.python.install.packages.automatically=true +# nifi.python.env.setup.binary=python3
