This is an automated email from the ASF dual-hosted git repository.

lordgamez pushed a commit to branch MINIFICPP-2689
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 0d4a898695e96d8a1b402cb4930d71301370d8b4
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Nov 12 18:42:53 2025 +0100

    MINIFICPP-2689 Move Splunk tests to modular docker tests
    
    - Add more logging to the PutSpunkHTTP processor
    - Fail if returned status code is not 0
---
 .../minifi/flow_definition.py                      |   3 +
 .../steps/flow_building_steps.py                   |  11 ++
 docker/RunBehaveTests.sh                           |   3 +-
 docker/test/integration/cluster/ContainerStore.py  |   9 --
 .../test/integration/cluster/DockerTestCluster.py  |  18 ----
 docker/test/integration/cluster/ImageStore.py      |   5 -
 .../integration/cluster/checkers/SplunkChecker.py  |  80 --------------
 .../cluster/containers/SplunkContainer.py          |  44 --------
 .../features/MiNiFi_integration_test_driver.py     |  15 ---
 docker/test/integration/features/steps/steps.py    |  38 -------
 .../minifi/processors/QuerySplunkIndexingStatus.py |  31 ------
 .../integration/resources/splunk-hec/Dockerfile    |   2 -
 .../resources/splunk-hec/conf/default.yml          |   6 --
 extensions/splunk/PutSplunkHTTP.cpp                |  46 ++++++--
 .../splunk/tests/features/environment.py           |  33 +++---
 .../splunk/tests}/features/splunk.feature          |  48 ++++++---
 .../tests/features/steps/splunk_container.py       | 116 +++++++++++++++++++++
 extensions/splunk/tests/features/steps/steps.py    |  46 ++++++++
 18 files changed, 263 insertions(+), 291 deletions(-)

diff --git 
a/behave_framework/src/minifi_test_framework/minifi/flow_definition.py 
b/behave_framework/src/minifi_test_framework/minifi/flow_definition.py
index c616ebf09..e53e8d0e7 100644
--- a/behave_framework/src/minifi_test_framework/minifi/flow_definition.py
+++ b/behave_framework/src/minifi_test_framework/minifi/flow_definition.py
@@ -39,6 +39,9 @@ class FlowDefinition:
     def get_processor(self, processor_name: str) -> Processor | None:
         return next((proc for proc in self.processors if proc.name == 
processor_name), None)
 
+    def get_controller_service(self, controller_service_name: str) -> 
ControllerService | None:
+        return next((controller for controller in self.controller_services if 
controller.name == controller_service_name), None)
+
     def get_parameter_context(self, parameter_context_name: str) -> 
ParameterContext | None:
         return next((parameter_context for parameter_context in 
self.parameter_contexts if
                      parameter_context.name == parameter_context_name), None)
diff --git 
a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py 
b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py
index 11e19d104..281bbbd42 100644
--- a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py
+++ b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py
@@ -16,6 +16,7 @@
 #
 
 import logging
+import uuid
 from behave import given, step
 
 from minifi_test_framework.containers.directory import Directory
@@ -246,8 +247,18 @@ def step_impl(context: MinifiTestContext, property_name: 
str, processor_name: st
     processor.add_property(property_name, filtering)
 
 
+@given("the \"{property_name}\" properties of the {processor_name_one} and 
{processor_name_two} processors are set to the same random guid")
+def step_impl(context, property_name, processor_name_one, processor_name_two):
+    uuid_str = str(uuid.uuid4())
+    
context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name_one).add_property(property_name,
 uuid_str)
+    
context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name_two).add_property(property_name,
 uuid_str)
+
+
 # TLS
 def add_ssl_context_service_for_minifi(context: MinifiTestContext, cert_name: 
str):
+    ssl_context_service = 
context.get_or_create_default_minifi_container().flow_definition.get_controller_service("SSLContextService")
+    if ssl_context_service is not None:
+        return
     controller_service = ControllerService(class_name="SSLContextService", 
service_name="SSLContextService")
     controller_service.add_property("Client Certificate", 
f"/tmp/resources/{cert_name}.crt")
     controller_service.add_property("Private Key", 
f"/tmp/resources/{cert_name}.key")
diff --git a/docker/RunBehaveTests.sh b/docker/RunBehaveTests.sh
index aaf80a82c..a3b5c1973 100755
--- a/docker/RunBehaveTests.sh
+++ b/docker/RunBehaveTests.sh
@@ -202,4 +202,5 @@ exec \
     "${docker_dir}/../extensions/opc/tests/features" \
     "${docker_dir}/../extensions/kafka/tests/features" \
     "${docker_dir}/../extensions/couchbase/tests/features" \
-    "${docker_dir}/../extensions/elasticsearch/tests/features"
+    "${docker_dir}/../extensions/elasticsearch/tests/features" \
+    "${docker_dir}/../extensions/splunk/tests/features"
diff --git a/docker/test/integration/cluster/ContainerStore.py 
b/docker/test/integration/cluster/ContainerStore.py
index 7c1c0d6c0..873489999 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -25,7 +25,6 @@ from .containers.FakeGcsServerContainer import 
FakeGcsServerContainer
 from .containers.HttpProxyContainer import HttpProxyContainer
 from .containers.PostgreSQLServerContainer import PostgreSQLServerContainer
 from .containers.MqttBrokerContainer import MqttBrokerContainer
-from .containers.SplunkContainer import SplunkContainer
 from .containers.SyslogUdpClientContainer import SyslogUdpClientContainer
 from .containers.SyslogTcpClientContainer import SyslogTcpClientContainer
 from .containers.MinifiAsPodInKubernetesCluster import 
MinifiAsPodInKubernetesCluster
@@ -169,14 +168,6 @@ class ContainerStore:
                                                                   
network=self.network,
                                                                   
image_store=self.image_store,
                                                                   
command=command))
-        elif engine == 'splunk':
-            return self.containers.setdefault(container_name,
-                                              
SplunkContainer(feature_context=feature_context,
-                                                              
name=container_name,
-                                                              vols=self.vols,
-                                                              
network=self.network,
-                                                              
image_store=self.image_store,
-                                                              command=command))
         elif engine == "syslog-udp-client":
             return self.containers.setdefault(container_name,
                                               SyslogUdpClientContainer(
diff --git a/docker/test/integration/cluster/DockerTestCluster.py 
b/docker/test/integration/cluster/DockerTestCluster.py
index df94c893b..6450a78e9 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -31,7 +31,6 @@ from .checkers.AzureChecker import AzureChecker
 from .checkers.GcsChecker import GcsChecker
 from .checkers.PostgresChecker import PostgresChecker
 from .checkers.PrometheusChecker import PrometheusChecker
-from .checkers.SplunkChecker import SplunkChecker
 from .checkers.GrafanaLokiChecker import GrafanaLokiChecker
 from .checkers.ModbusChecker import ModbusChecker
 from .checkers.MqttHelper import MqttHelper
@@ -48,7 +47,6 @@ class DockerTestCluster:
         self.azure_checker = AzureChecker(self.container_communicator)
         self.gcs_checker = GcsChecker(self.container_communicator)
         self.postgres_checker = PostgresChecker(self.container_communicator)
-        self.splunk_checker = SplunkChecker(self.container_communicator)
         self.prometheus_checker = PrometheusChecker()
         self.grafana_loki_checker = GrafanaLokiChecker()
         self.minifi_controller_executor = 
MinifiControllerExecutor(self.container_communicator)
@@ -236,22 +234,6 @@ class DockerTestCluster:
     def check_azure_blob_storage_is_empty(self, timeout_seconds):
         return 
self.azure_checker.check_azure_blob_storage_is_empty(timeout_seconds)
 
-    def check_splunk_event(self, container_name, query):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return self.splunk_checker.check_splunk_event(container_name, query)
-
-    def check_splunk_event_with_attributes(self, container_name, query, 
attributes):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return 
self.splunk_checker.check_splunk_event_with_attributes(container_name, query, 
attributes)
-
-    def enable_splunk_hec_indexer(self, container_name, hec_name):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return self.splunk_checker.enable_splunk_hec_indexer(container_name, 
hec_name)
-
-    def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, 
splunk_key_pem, root_ca_cert_pem):
-        container_name = 
self.container_store.get_container_name_with_postfix(container_name)
-        return self.splunk_checker.enable_splunk_hec_ssl(container_name, 
splunk_cert_pem, splunk_key_pem, root_ca_cert_pem)
-
     def check_google_cloud_storage(self, gcs_container_name, content):
         gcs_container_name = 
self.container_store.get_container_name_with_postfix(gcs_container_name)
         return self.gcs_checker.check_google_cloud_storage(gcs_container_name, 
content)
diff --git a/docker/test/integration/cluster/ImageStore.py 
b/docker/test/integration/cluster/ImageStore.py
index 88d06cff9..4e52f7deb 100644
--- a/docker/test/integration/cluster/ImageStore.py
+++ b/docker/test/integration/cluster/ImageStore.py
@@ -65,8 +65,6 @@ class ImageStore:
             image = self.__build_postgresql_server_image()
         elif container_engine == "mqtt-broker":
             image = self.__build_mqtt_broker_image()
-        elif container_engine == "splunk":
-            image = self.__build_splunk_image()
         elif container_engine == "kinesis-server":
             image = self.__build_kinesis_image()
         elif container_engine == "reverse-proxy":
@@ -295,9 +293,6 @@ class ImageStore:
     def __build_kinesis_image(self):
         return self.__build_image_by_path(self.test_dir + 
"/resources/kinesis-mock", 'kinesis-server')
 
-    def __build_splunk_image(self):
-        return self.__build_image_by_path(self.test_dir + 
"/resources/splunk-hec", 'minifi-splunk')
-
     def __build_reverse_proxy_image(self):
         return self.__build_image_by_path(self.test_dir + 
"/resources/reverse-proxy", 'reverse-proxy')
 
diff --git a/docker/test/integration/cluster/checkers/SplunkChecker.py 
b/docker/test/integration/cluster/checkers/SplunkChecker.py
deleted file mode 100644
index 8cc2e2758..000000000
--- a/docker/test/integration/cluster/checkers/SplunkChecker.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import json
-from utils import retry_check
-
-
-class SplunkChecker:
-    def __init__(self, container_communicator):
-        self.container_communicator = container_communicator
-
-    @retry_check()
-    def check_splunk_event(self, container_name, query):
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["sudo", 
"/opt/splunk/bin/splunk", "search", query, "-auth", "admin:splunkadmin"])
-        if code != 0:
-            return False
-        return query in output.decode("utf-8")
-
-    @retry_check()
-    def check_splunk_event_with_attributes(self, container_name, query, 
attributes):
-        (code, output) = 
self.container_communicator.execute_command(container_name, ["sudo", 
"/opt/splunk/bin/splunk", "search", query, "-output", "json", "-auth", 
"admin:splunkadmin"])
-        if code != 0:
-            return False
-        result_lines = output.splitlines()
-        for result_line in result_lines:
-            try:
-                result_line_json = json.loads(result_line)
-            except json.decoder.JSONDecodeError:
-                continue
-            if "result" not in result_line_json:
-                continue
-            if "host" in attributes:
-                if result_line_json["result"]["host"] != attributes["host"]:
-                    continue
-            if "source" in attributes:
-                if result_line_json["result"]["source"] != 
attributes["source"]:
-                    continue
-            if "sourcetype" in attributes:
-                if result_line_json["result"]["sourcetype"] != 
attributes["sourcetype"]:
-                    continue
-            if "index" in attributes:
-                if result_line_json["result"]["index"] != attributes["index"]:
-                    continue
-            return True
-        return False
-
-    def enable_splunk_hec_indexer(self, container_name, hec_name):
-        (code, _) = 
self.container_communicator.execute_command(container_name, ["sudo",
-                                                                               
  "/opt/splunk/bin/splunk", "http-event-collector",
-                                                                               
  "update", hec_name,
-                                                                               
  "-uri", "https://localhost:8089";,
-                                                                               
  "-use-ack", "1",
-                                                                               
  "-disabled", "0",
-                                                                               
  "-auth", "admin:splunkadmin"])
-        return code == 0
-
-    def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, 
splunk_key_pem, root_ca_cert_pem):
-        assert 
self.container_communicator.write_content_to_container(splunk_cert_pem.decode() 
+ splunk_key_pem.decode() + root_ca_cert_pem.decode(), container_name, 
'/opt/splunk/etc/auth/splunk_cert.pem')
-        assert 
self.container_communicator.write_content_to_container(root_ca_cert_pem.decode(),
 container_name, '/opt/splunk/etc/auth/root_ca.pem')
-        (code, _) = 
self.container_communicator.execute_command(container_name, ["sudo",
-                                                                               
  "/opt/splunk/bin/splunk", "http-event-collector",
-                                                                               
  "update",
-                                                                               
  "-uri", "https://localhost:8089";,
-                                                                               
  "-enable-ssl", "1",
-                                                                               
  "-server-cert", "/opt/splunk/etc/auth/splunk_cert.pem",
-                                                                               
  "-ca-cert-file", "/opt/splunk/etc/auth/root_ca.pem",
-                                                                               
  "-require-client-cert", "1",
-                                                                               
  "-auth", "admin:splunkadmin"])
-        return code == 0
diff --git a/docker/test/integration/cluster/containers/SplunkContainer.py 
b/docker/test/integration/cluster/containers/SplunkContainer.py
deleted file mode 100644
index 9a5734276..000000000
--- a/docker/test/integration/cluster/containers/SplunkContainer.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import logging
-from .Container import Container
-
-
-class SplunkContainer(Container):
-    def __init__(self, feature_context, name, vols, network, image_store, 
command=None):
-        super().__init__(feature_context, name, 'splunk', vols, network, 
image_store, command)
-
-    def get_startup_finished_log_entry(self):
-        return "Ansible playbook complete, will begin streaming 
splunkd_stderr.log"
-
-    def deploy(self):
-        if not self.set_deployed():
-            return
-
-        logging.info('Creating and running Splunk docker container...')
-        self.client.containers.run(
-            self.image_store.get_image(self.get_engine()),
-            detach=True,
-            name=self.name,
-            network=self.network.name,
-            environment=[
-                "SPLUNK_LICENSE_URI=Free",
-                "SPLUNK_START_ARGS=--accept-license",
-                "SPLUNK_PASSWORD=splunkadmin"
-            ],
-            entrypoint=self.command)
-        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py 
b/docker/test/integration/features/MiNiFi_integration_test_driver.py
index 7a42e2320..eb15dded3 100644
--- a/docker/test/integration/features/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py
@@ -64,12 +64,6 @@ class MiNiFi_integration_test:
     def acquire_transient_minifi(self, context, name, engine='minifi-cpp'):
         return self.cluster.acquire_transient_minifi(context=context, 
name=name, engine=engine)
 
-    def start_splunk(self, context):
-        self.cluster.acquire_container(context=context, name='splunk', 
engine='splunk')
-        self.cluster.deploy_container(name='splunk')
-        assert self.cluster.wait_for_container_startup_to_finish('splunk') or 
self.cluster.log_app_output()
-        assert self.cluster.enable_splunk_hec_indexer('splunk', 
'splunk_hec_token') or self.cluster.log_app_output()
-
     def start_minifi_c2_server(self, context):
         self.cluster.acquire_container(context=context, 
name="minifi-c2-server", engine="minifi-c2-server")
         self.cluster.deploy_container('minifi-c2-server')
@@ -303,12 +297,6 @@ class MiNiFi_integration_test:
     def check_azure_storage_server_data(self, azure_container_name, 
object_data):
         assert 
self.cluster.check_azure_storage_server_data(azure_container_name, object_data) 
or self.cluster.log_app_output()
 
-    def check_splunk_event(self, splunk_container_name, query):
-        assert self.cluster.check_splunk_event(splunk_container_name, query) 
or self.cluster.log_app_output()
-
-    def check_splunk_event_with_attributes(self, splunk_container_name, query, 
attributes):
-        assert 
self.cluster.check_splunk_event_with_attributes(splunk_container_name, query, 
attributes) or self.cluster.log_app_output()
-
     def check_google_cloud_storage(self, gcs_container_name, content):
         assert self.cluster.check_google_cloud_storage(gcs_container_name, 
content) or self.cluster.log_app_output()
 
@@ -384,9 +372,6 @@ class MiNiFi_integration_test:
     def enable_prometheus_with_ssl_in_minifi(self):
         self.cluster.enable_prometheus_with_ssl_in_minifi()
 
-    def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, 
splunk_key_pem, root_ca_cert_pem):
-        self.cluster.enable_splunk_hec_ssl(container_name, splunk_cert_pem, 
splunk_key_pem, root_ca_cert_pem)
-
     def enable_sql_in_minifi(self):
         self.cluster.enable_sql_in_minifi()
 
diff --git a/docker/test/integration/features/steps/steps.py 
b/docker/test/integration/features/steps/steps.py
index e57e4e987..c0305746f 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -15,7 +15,6 @@
 
 from filesystem_validation.FileSystemObserver import FileSystemObserver
 from minifi.core.RemoteProcessGroup import RemoteProcessGroup
-from ssl_utils.SSL_cert_utils import make_server_cert
 from minifi.core.Funnel import Funnel
 
 from minifi.controllers.SSLContextService import SSLContextService
@@ -36,7 +35,6 @@ import logging
 import time
 import uuid
 import humanfriendly
-import OpenSSL.crypto
 
 import os
 
@@ -127,7 +125,6 @@ def step_impl(context, processor_type, 
minifi_container_name):
 @given("a {processor_type} processor set up to communicate with the same s3 
server")
 @given("a {processor_type} processor set up to communicate with an Azure blob 
storage")
 @given("a {processor_type} processor set up to communicate with an MQTT broker 
instance")
-@given("a {processor_type} processor set up to communicate with the Splunk HEC 
instance")
 @given("a {processor_type} processor set up to communicate with the kinesis 
server")
 def step_impl(context, processor_type):
     __create_processor(context, processor_type, processor_type, None, None, 
"minifi-cpp-flow")
@@ -557,35 +554,12 @@ def setUpSslContextServiceForRPG(context, rpg_name: str):
     rpg.add_property("SSL Context Service", ssl_context_service.name)
 
 
-# splunk hec
-@given("a Splunk HEC is set up and running")
-def step_impl(context):
-    context.test.start_splunk(context)
-
-
 # TCP client
 @given('a TCP client is set up to send a test TCP message to minifi')
 def step_impl(context):
     context.test.acquire_container(context=context, name="tcp-client", 
engine="tcp-client")
 
 
-@given("SSL is enabled for the Splunk HEC and the SSL context service is set 
up for PutSplunkHTTP and QuerySplunkIndexingStatus")
-def step_impl(context):
-    minifi_crt_file = '/tmp/resources/minifi_client.crt'
-    minifi_key_file = '/tmp/resources/minifi_client.key'
-    root_ca_crt_file = '/tmp/resources/root_ca.crt'
-    ssl_context_service = SSLContextService(name='SSLContextService', 
cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file)
-
-    splunk_cert, splunk_key = 
make_server_cert(context.test.get_container_name_with_postfix("splunk"), 
context.root_ca_cert, context.root_ca_key)
-    put_splunk_http = context.test.get_node_by_name("PutSplunkHTTP")
-    put_splunk_http.controller_services.append(ssl_context_service)
-    put_splunk_http.set_property("SSL Context Service", 
ssl_context_service.name)
-    query_splunk_indexing_status = 
context.test.get_node_by_name("QuerySplunkIndexingStatus")
-    
query_splunk_indexing_status.controller_services.append(ssl_context_service)
-    query_splunk_indexing_status.set_property("SSL Context Service", 
ssl_context_service.name)
-    context.test.enable_splunk_hec_ssl('splunk', 
OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, splunk_cert), 
OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, splunk_key), 
OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, 
context.root_ca_cert))
-
-
 @given(u'the {processor_one} processor is set up with a 
GCPCredentialsControllerService to communicate with the Google Cloud storage 
server')
 def step_impl(context, processor_one):
     gcp_controller_service = 
GCPCredentialsControllerService(credentials_location="Use Anonymous 
credentials")
@@ -911,18 +885,6 @@ def step_impl(context):
     context.test.check_empty_gcs_bucket("fake-gcs-server")
 
 
-# Splunk
-@then('an event is registered in Splunk HEC with the content \"{content}\"')
-def step_imp(context, content):
-    context.test.check_splunk_event("splunk", content)
-
-
-@then('an event is registered in Splunk HEC with the content \"{content}\" 
with \"{source}\" set as source and \"{source_type}\" set as sourcetype and 
\"{host}\" set as host')
-def step_imp(context, content, source, source_type, host):
-    attr = {"source": source, "sourcetype": source_type, "host": host}
-    context.test.check_splunk_event_with_attributes("splunk", content, attr)
-
-
 # Prometheus
 @given("a Prometheus server is set up")
 def step_impl(context):
diff --git 
a/docker/test/integration/minifi/processors/QuerySplunkIndexingStatus.py 
b/docker/test/integration/minifi/processors/QuerySplunkIndexingStatus.py
deleted file mode 100644
index e4638cec8..000000000
--- a/docker/test/integration/minifi/processors/QuerySplunkIndexingStatus.py
+++ /dev/null
@@ -1,31 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from ..core.Processor import Processor
-
-
-class QuerySplunkIndexingStatus(Processor):
-    def __init__(self, context, schedule={'scheduling strategy': 
'EVENT_DRIVEN', 'penalization period': '1 sec'}):
-        super(QuerySplunkIndexingStatus, self).__init__(
-            context=context,
-            clazz='QuerySplunkIndexingStatus',
-            properties={
-                'Hostname': 'splunk',
-                'Port': '8088',
-                'Token': 'Splunk 176fae97-f59d-4f08-939a-aa6a543f2485'  # 
Token of the default splunk_hec_token HTTP Event Collector in the Splunk 
container image
-            },
-            auto_terminate=['acknowledged', 'unacknowledged', 'undetermined', 
'failure'],
-            schedule=schedule)
diff --git a/docker/test/integration/resources/splunk-hec/Dockerfile 
b/docker/test/integration/resources/splunk-hec/Dockerfile
deleted file mode 100644
index a56160096..000000000
--- a/docker/test/integration/resources/splunk-hec/Dockerfile
+++ /dev/null
@@ -1,2 +0,0 @@
-FROM splunk/splunk:9.2.1-patch2
-ADD conf/default.yml /tmp/defaults/default.yml
diff --git a/docker/test/integration/resources/splunk-hec/conf/default.yml 
b/docker/test/integration/resources/splunk-hec/conf/default.yml
deleted file mode 100644
index a59ce2ac5..000000000
--- a/docker/test/integration/resources/splunk-hec/conf/default.yml
+++ /dev/null
@@ -1,6 +0,0 @@
-splunk:
-  hec:
-    enable: True
-    ssl: False
-    port: 8088
-    token: 176fae97-f59d-4f08-939a-aa6a543f2485
diff --git a/extensions/splunk/PutSplunkHTTP.cpp 
b/extensions/splunk/PutSplunkHTTP.cpp
index 953966e4e..3e59934ff 100644
--- a/extensions/splunk/PutSplunkHTTP.cpp
+++ b/extensions/splunk/PutSplunkHTTP.cpp
@@ -66,31 +66,52 @@ std::optional<std::string> 
getContentType(core::ProcessContext& context, const c
   return context.getProperty(PutSplunkHTTP::ContentType) | utils::toOptional() 
| utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");});
 }
 
-bool setAttributesFromClientResponse(core::FlowFile& flow_file, 
http::HTTPClient& client) {
+bool setAttributesFromClientResponse(core::FlowFile& flow_file, 
http::HTTPClient& client, const std::shared_ptr<core::logging::Logger>& logger) 
{
   rapidjson::Document response_json;
   rapidjson::ParseResult parse_result = 
response_json.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
   bool result = true;
-  if (parse_result.IsError())
+  if (parse_result.IsError()) {
+    logger->log_error("Failed to parse Splunk HEC response JSON");
     return false;
+  }
 
-  if (response_json.HasMember("code") && response_json["code"].IsInt())
-    flow_file.setAttribute(SPLUNK_RESPONSE_CODE, 
std::to_string(response_json["code"].GetInt()));
-  else
+  if (response_json.HasMember("code") && response_json["code"].IsInt()) {
+    auto code = response_json["code"].GetInt();
+    flow_file.setAttribute(SPLUNK_RESPONSE_CODE, std::to_string(code));
+    if (code != 0) {
+      logger->log_error("Splunk HEC returned error code: {}", code);
+      result = false;
+    }
+  } else {
+    logger->log_error("Splunk HEC response JSON does not contain a valid 
'code' field");
     result = false;
+  }
 
-  if (response_json.HasMember("ackId") && response_json["ackId"].IsUint64())
+  if (response_json.HasMember("ackId") && response_json["ackId"].IsUint64()) {
     flow_file.setAttribute(SPLUNK_ACK_ID, 
std::to_string(response_json["ackId"].GetUint64()));
-  else
+  } else {
+    logger->log_error("Splunk HEC response JSON does not contain a valid 
'ackId' field");
     result = false;
+  }
 
   return result;
 }
 
-bool enrichFlowFileWithAttributes(core::FlowFile& flow_file, http::HTTPClient& 
client) {
+bool enrichFlowFileWithAttributes(core::FlowFile& flow_file, http::HTTPClient& 
client, const std::shared_ptr<core::logging::Logger>& logger) {
   flow_file.setAttribute(SPLUNK_STATUS_CODE, 
std::to_string(client.getResponseCode()));
   flow_file.setAttribute(SPLUNK_RESPONSE_TIME, 
std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count()));
 
-  return setAttributesFromClientResponse(flow_file, client) && 
client.getResponseCode() == 200;
+  auto result = true;
+  if (client.getResponseCode() != 200) {
+    logger->log_error("Received failure response code from Splunk HEC: {}", 
client.getResponseCode());
+    result = false;
+  }
+
+  if (!setAttributesFromClientResponse(flow_file, client, logger)) {
+    return false;
+  }
+
+  return result;
 }
 
 void setFlowFileAsPayload(core::ProcessSession& session,
@@ -142,8 +163,11 @@ void PutSplunkHTTP::onTrigger(core::ProcessContext& 
context, core::ProcessSessio
   setFlowFileAsPayload(session, context, *client, flow_file);
 
   bool success = false;
-  if (client->submit())
-    success = enrichFlowFileWithAttributes(*flow_file, *client);
+  if (client->submit()) {
+    success = enrichFlowFileWithAttributes(*flow_file, *client, logger_);
+  } else {
+    logger_->log_error("Failed to submit HTTP request to Splunk HEC");
+  }
 
   session.transfer(flow_file, success ? Success : Failure);
 }
diff --git a/docker/test/integration/minifi/processors/PutSplunkHTTP.py 
b/extensions/splunk/tests/features/environment.py
similarity index 55%
rename from docker/test/integration/minifi/processors/PutSplunkHTTP.py
rename to extensions/splunk/tests/features/environment.py
index 1bf26e59d..347b35688 100644
--- a/docker/test/integration/minifi/processors/PutSplunkHTTP.py
+++ b/extensions/splunk/tests/features/environment.py
@@ -13,19 +13,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import platform
 
-from ..core.Processor import Processor
-
-
-class PutSplunkHTTP(Processor):
-    def __init__(self, context, schedule={'scheduling strategy': 
'EVENT_DRIVEN'}):
-        super(PutSplunkHTTP, self).__init__(
-            context=context,
-            clazz='PutSplunkHTTP',
-            properties={
-                'Hostname': 'splunk',
-                'Port': '8088',
-                'Token': 'Splunk 176fae97-f59d-4f08-939a-aa6a543f2485'  # 
Token of the default splunk_hec_token HTTP Event Collector in the Splunk 
container image
-            },
-            auto_terminate=['success', 'failure'],
-            schedule=schedule)
+from minifi_test_framework.core.hooks import common_before_scenario
+from minifi_test_framework.core.hooks import common_after_scenario
+
+
+def before_feature(context, feature):
+    if "x86_x64_only" in feature.tags:
+        is_x86 = platform.machine() in ("i386", "AMD64", "x86_64")
+        if not is_x86:
+            feature.skip("This feature is only x86/x64 compatible")
+
+
+def before_scenario(context, scenario):
+    common_before_scenario(context, scenario)
+
+
+def after_scenario(context, scenario):
+    common_after_scenario(context, scenario)
diff --git a/docker/test/integration/features/splunk.feature 
b/extensions/splunk/tests/features/splunk.feature
similarity index 67%
rename from docker/test/integration/features/splunk.feature
rename to extensions/splunk/tests/features/splunk.feature
index a6f99285e..ee488ed08 100644
--- a/docker/test/integration/features/splunk.feature
+++ b/extensions/splunk/tests/features/splunk.feature
@@ -17,51 +17,67 @@
 @ENABLE_SPLUNK
 Feature: Sending data to Splunk HEC using PutSplunkHTTP
 
-  Background:
-    Given the content of "/tmp/output" is monitored
-
   Scenario: A MiNiFi instance transfers data to a Splunk HEC
     Given a Splunk HEC is set up and running
     And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content "foobar" is present in "/tmp/input"
-    And a PutSplunkHTTP processor set up to communicate with the Splunk HEC 
instance
-    And a QuerySplunkIndexingStatus processor set up to communicate with the 
Splunk HEC Instance
+    And a PutSplunkHTTP processor
+    And PutSplunkHTTP is EVENT_DRIVEN
+    And a QuerySplunkIndexingStatus processor
+    And QuerySplunkIndexingStatus is EVENT_DRIVEN
     And the "Splunk Request Channel" properties of the PutSplunkHTTP and 
QuerySplunkIndexingStatus processors are set to the same random guid
     And the "Source" property of the PutSplunkHTTP processor is set to 
"my-source"
     And the "Source Type" property of the PutSplunkHTTP processor is set to 
"my-source-type"
     And the "Host" property of the PutSplunkHTTP processor is set to "my-host"
+    And the "Hostname" property of the PutSplunkHTTP processor is set to 
"http://splunk-${scenario_id}";
+    And the "Port" property of the PutSplunkHTTP processor is set to "8088"
+    And the "Token" property of the PutSplunkHTTP processor is set to "Splunk 
176fae97-f59d-4f08-939a-aa6a543f2485"
+    And the "Hostname" property of the QuerySplunkIndexingStatus processor is 
set to "http://splunk-${scenario_id}";
+    And the "Port" property of the QuerySplunkIndexingStatus processor is set 
to "8088"
+    And the "Token" property of the QuerySplunkIndexingStatus processor is set 
to "Splunk 176fae97-f59d-4f08-939a-aa6a543f2485"
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
+
     And the "success" relationship of the GetFile processor is connected to 
the PutSplunkHTTP
     And the "success" relationship of the PutSplunkHTTP processor is connected 
to the QuerySplunkIndexingStatus
     And the "undetermined" relationship of the QuerySplunkIndexingStatus 
processor is connected to the QuerySplunkIndexingStatus
     And the "acknowledged" relationship of the QuerySplunkIndexingStatus 
processor is connected to the PutFile
-    And the "Hostname" property of the PutSplunkHTTP processor is set to 
"http://splunk-${feature_id}";
-    And the "Hostname" property of the QuerySplunkIndexingStatus processor is 
set to "http://splunk-${feature_id}";
+    And PutFile's success relationship is auto-terminated
 
-    When both instances start up
-    Then a flowfile with the content "foobar" is placed in the monitored 
directory in less than 20 seconds
+    When the MiNiFi instance starts up
+    Then there is a single file with "foobar" content in the "/tmp/output" 
directory in less than 120 seconds
     And an event is registered in Splunk HEC with the content "foobar" with 
"my-source" set as source and "my-source-type" set as sourcetype and "my-host" 
set as host
 
-
   Scenario: A MiNiFi instance transfers data to a Splunk HEC with SSL enabled
     Given a Splunk HEC is set up and running
     And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content "foobar" is present in "/tmp/input"
-    And a PutSplunkHTTP processor set up to communicate with the Splunk HEC 
instance
-    And a QuerySplunkIndexingStatus processor set up to communicate with the 
Splunk HEC Instance
+    And a PutSplunkHTTP processor
+    And PutSplunkHTTP is EVENT_DRIVEN
+    And a QuerySplunkIndexingStatus processor
+    And QuerySplunkIndexingStatus is EVENT_DRIVEN
     And the "Splunk Request Channel" properties of the PutSplunkHTTP and 
QuerySplunkIndexingStatus processors are set to the same random guid
     And the "Source" property of the PutSplunkHTTP processor is set to 
"my-source"
     And the "Source Type" property of the PutSplunkHTTP processor is set to 
"my-source-type"
     And the "Host" property of the PutSplunkHTTP processor is set to "my-host"
+    And the "Hostname" property of the PutSplunkHTTP processor is set to 
"https://splunk-${scenario_id}";
+    And the "Port" property of the PutSplunkHTTP processor is set to "8088"
+    And the "Token" property of the PutSplunkHTTP processor is set to "Splunk 
176fae97-f59d-4f08-939a-aa6a543f2485"
+    And the "Hostname" property of the QuerySplunkIndexingStatus processor is 
set to "https://splunk-${scenario_id}";
+    And the "Port" property of the QuerySplunkIndexingStatus processor is set 
to "8088"
+    And the "Token" property of the QuerySplunkIndexingStatus processor is set 
to "Splunk 176fae97-f59d-4f08-939a-aa6a543f2485"
+    And an ssl context service is set up for PutSplunkHTTP
+    And an ssl context service is set up for QuerySplunkIndexingStatus
     And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And PutFile is EVENT_DRIVEN
+
     And the "success" relationship of the GetFile processor is connected to 
the PutSplunkHTTP
     And the "success" relationship of the PutSplunkHTTP processor is connected 
to the QuerySplunkIndexingStatus
     And the "undetermined" relationship of the QuerySplunkIndexingStatus 
processor is connected to the QuerySplunkIndexingStatus
     And the "acknowledged" relationship of the QuerySplunkIndexingStatus 
processor is connected to the PutFile
+    And PutFile's success relationship is auto-terminated
     And SSL is enabled for the Splunk HEC and the SSL context service is set 
up for PutSplunkHTTP and QuerySplunkIndexingStatus
-    And the "Hostname" property of the PutSplunkHTTP processor is set to 
"https://splunk-${feature_id}";
-    And the "Hostname" property of the QuerySplunkIndexingStatus processor is 
set to "https://splunk-${feature_id}";
 
-    When both instances start up
-    Then a flowfile with the content "foobar" is placed in the monitored 
directory in less than 20 seconds
+    When the MiNiFi instance starts up
+    Then there is a single file with "foobar" content in the "/tmp/output" 
directory in less than 120 seconds
     And an event is registered in Splunk HEC with the content "foobar" with 
"my-source" set as source and "my-source-type" set as sourcetype and "my-host" 
set as host
diff --git a/extensions/splunk/tests/features/steps/splunk_container.py 
b/extensions/splunk/tests/features/steps/splunk_container.py
new file mode 100644
index 000000000..9b0692904
--- /dev/null
+++ b/extensions/splunk/tests/features/steps/splunk_container.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from OpenSSL import crypto
+from minifi_test_framework.containers.container import Container
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
+from minifi_test_framework.core.helpers import wait_for_condition, retry_check
+from minifi_test_framework.containers.file import File
+from minifi_test_framework.core.ssl_utils import make_server_cert
+
+
+class SplunkContainer(Container):
+    def __init__(self, test_context: MinifiTestContext):
+        super().__init__("splunk/splunk:9.2.1-patch2", 
f"splunk-{test_context.scenario_id}", test_context.network)
+        self.user = None
+
+        self.environment = ["SPLUNK_LICENSE_URI=Free",
+                            "SPLUNK_START_ARGS=--accept-license",
+                            "SPLUNK_PASSWORD=splunkadmin"]
+
+        splunk_config_content = """
+splunk:
+  hec:
+    enable: True
+    ssl: False
+    port: 8088
+    token: 176fae97-f59d-4f08-939a-aa6a543f2485
+"""
+        self.files.append(File("/tmp/defaults/default.yml", 
splunk_config_content, mode="rw", permissions=0o644))
+
+        splunk_cert, splunk_key = make_server_cert(self.container_name, 
test_context.root_ca_cert, test_context.root_ca_key)
+        splunk_cert_content = crypto.dump_certificate(crypto.FILETYPE_PEM, 
splunk_cert)
+        splunkt_key_content = crypto.dump_privatekey(crypto.FILETYPE_PEM, 
splunk_key)
+        root_ca_content = crypto.dump_certificate(crypto.FILETYPE_PEM, 
test_context.root_ca_cert)
+        self.files.append(File("/opt/splunk/etc/auth/splunk_cert.pem", 
splunk_cert_content.decode() + splunkt_key_content.decode() + 
root_ca_content.decode(), permissions=0o644))
+        self.files.append(File("/opt/splunk/etc/auth/root_ca.pem", 
root_ca_content.decode(), permissions=0o644))
+
+    def deploy(self):
+        super().deploy()
+        finished_str = "Ansible playbook complete, will begin streaming 
splunkd_stderr.log"
+        return wait_for_condition(
+            condition=lambda: finished_str in self.get_logs(),
+            timeout_seconds=300,
+            bail_condition=lambda: False,
+            context=None)
+
+    @retry_check()
+    def check_splunk_event(self, query: str) -> bool:
+        (code, output) = self.exec_run(["sudo", "/opt/splunk/bin/splunk", 
"search", query, "-auth", "admin:splunkadmin"])
+        if code != 0:
+            return False
+        return query in output.decode("utf-8")
+
+    @retry_check()
+    def check_splunk_event_with_attributes(self, query: str, attributes: 
dict[str, str]) -> bool:
+        (code, output) = self.exec_run(["sudo", "/opt/splunk/bin/splunk", 
"search", query, "-output", "json", "-auth", "admin:splunkadmin"])
+        if code != 0:
+            return False
+        result_lines = output.splitlines()
+        for result_line in result_lines:
+            try:
+                result_line_json = json.loads(result_line)
+            except json.decoder.JSONDecodeError:
+                continue
+            if "result" not in result_line_json:
+                continue
+            if "host" in attributes:
+                if result_line_json["result"]["host"] != attributes["host"]:
+                    continue
+            if "source" in attributes:
+                if result_line_json["result"]["source"] != 
attributes["source"]:
+                    continue
+            if "sourcetype" in attributes:
+                if result_line_json["result"]["sourcetype"] != 
attributes["sourcetype"]:
+                    continue
+            if "index" in attributes:
+                if result_line_json["result"]["index"] != attributes["index"]:
+                    continue
+            return True
+        return False
+
+    def enable_splunk_hec_indexer(self, hec_name: str):
+        (code, _) = self.exec_run(["sudo",
+                                   "/opt/splunk/bin/splunk", 
"http-event-collector",
+                                   "update", hec_name,
+                                   "-uri", "https://localhost:8089";,
+                                   "-use-ack", "1",
+                                   "-disabled", "0",
+                                   "-auth", "admin:splunkadmin"])
+        return code == 0
+
+    def enable_splunk_hec_ssl(self):
+        (code, _) = self.exec_run(["sudo",
+                                   "/opt/splunk/bin/splunk", 
"http-event-collector",
+                                   "update",
+                                   "-uri", "https://localhost:8089";,
+                                   "-enable-ssl", "1",
+                                   "-server-cert", 
"/opt/splunk/etc/auth/splunk_cert.pem",
+                                   "-ca-cert-file", 
"/opt/splunk/etc/auth/root_ca.pem",
+                                   "-require-client-cert", "1",
+                                   "-auth", "admin:splunkadmin"])
+        return code == 0
diff --git a/extensions/splunk/tests/features/steps/steps.py 
b/extensions/splunk/tests/features/steps/steps.py
new file mode 100644
index 000000000..16a8d66e9
--- /dev/null
+++ b/extensions/splunk/tests/features/steps/steps.py
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from behave import step, then
+
+from minifi_test_framework.steps import checking_steps        # noqa: F401
+from minifi_test_framework.steps import configuration_steps   # noqa: F401
+from minifi_test_framework.steps import core_steps            # noqa: F401
+from minifi_test_framework.steps import flow_building_steps   # noqa: F401
+from minifi_test_framework.core.helpers import log_due_to_failure
+from minifi_test_framework.core.minifi_test_context import MinifiTestContext
+from splunk_container import SplunkContainer
+
+
+@step("a Splunk HEC is set up and running")
+def step_impl(context: MinifiTestContext):
+    context.containers["splunk"] = SplunkContainer(context)
+    assert context.containers["splunk"].deploy()
+    assert 
context.containers["splunk"].enable_splunk_hec_indexer('splunk_hec_token') or 
context.containers["splunk"].log_app_output()
+
+
+@then('an event is registered in Splunk HEC with the content \"{content}\"')
+def step_imp(context, content):
+    assert context.containers["splunk"].check_splunk_event(content) or 
log_due_to_failure(context)
+
+
+@then('an event is registered in Splunk HEC with the content \"{content}\" 
with \"{source}\" set as source and \"{source_type}\" set as sourcetype and 
\"{host}\" set as host')
+def step_imp(context, content, source, source_type, host):
+    attr = {"source": source, "sourcetype": source_type, "host": host}
+    assert 
context.containers["splunk"].check_splunk_event_with_attributes(content, attr) 
or log_due_to_failure(context)
+
+
+@step("SSL is enabled for the Splunk HEC and the SSL context service is set up 
for PutSplunkHTTP and QuerySplunkIndexingStatus")
+def step_impl(context):
+    assert context.containers["splunk"].enable_splunk_hec_ssl() or 
context.containers["splunk"].log_app_output()


Reply via email to