Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 556794b15 -> e14125e53
MINIFICPP-653: Check if empty content, if so don't produce log message that can segfault client MINIFICPP-653: Remove test MINIFICPP-653: Remove REM'd code and reindent MINIFICPP-653: Clean up python code This closes #427. Approved by arpadboda on GH. Signed-off-by: Marc Parisi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/e14125e5 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/e14125e5 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/e14125e5 Branch: refs/heads/master Commit: e14125e53597dd3281c673b63bba5dca9f0f427c Parents: 556794b Author: Marc Parisi <[email protected]> Authored: Tue Oct 23 11:51:19 2018 -0400 Committer: Marc Parisi <[email protected]> Committed: Wed Oct 31 15:19:41 2018 -0400 ---------------------------------------------------------------------- docker/DockerVerify.sh | 12 +-- docker/Dockerfile | 2 +- docker/test/integration/README.md | 3 + docker/test/integration/minifi/__init__.py | 67 +++++++----- docker/test/integration/minifi/test/__init__.py | 33 ++++-- docker/test/integration/test_https.py | 103 ------------------- docker/test/integration/test_zero_file.py | 38 +++++++ docker/test/test_https.py | 102 ++++++++++++++++++ libminifi/src/controllers/SSLContextService.cpp | 2 +- libminifi/src/sitetosite/SiteToSiteClient.cpp | 5 +- 10 files changed, 216 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e14125e5/docker/DockerVerify.sh ---------------------------------------------------------------------- diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh index 0707a12..92e706c 100755 --- a/docker/DockerVerify.sh +++ b/docker/DockerVerify.sh @@ -20,14 +20,14 @@ set -e docker_dir="$( cd ${0%/*} && pwd )" # Create virutal environment for testing -if [[ ! -d ./test-env-py2 ]]; then - echo "Creating virtual environment in ./test-env-py2" 1>&2 - virtualenv --python=python2 ./test-env-py2 +if [[ ! -d ./test-env-py3 ]]; then + echo "Creating virtual environment in ./test-env-py3" 1>&2 + virtualenv --python=python3 ./test-env-py3 fi echo "Activating virtual environment..." 1>&2 -. ./test-env-py2/bin/activate -pip install --upgrade pip setuptools +. ./test-env-py3/bin/activate +pip install --trusted-host pypi.python.org --upgrade pip setuptools # Install test dependencies echo "Installing test dependencies..." 1>&2 @@ -47,7 +47,7 @@ pip install --upgrade \ m2crypto \ watchdog -export MINIFI_VERSION=0.4.0 +export MINIFI_VERSION=0.6.0 export PYTHONPATH="${PYTHONPATH}:${docker_dir}/test/integration" exec pytest -s -v "${docker_dir}"/test/integration http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e14125e5/docker/Dockerfile ---------------------------------------------------------------------- diff --git a/docker/Dockerfile b/docker/Dockerfile index d1d608b..e66b568 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -67,7 +67,7 @@ ENV MINIFI_HOME $MINIFI_BASE_DIR/nifi-minifi-cpp-$MINIFI_VERSION RUN cd $MINIFI_BASE_DIR \ && mkdir build \ && cd build \ - && cmake .. \ + && cmake -DSKIP_TESTS=true .. \ && make -j8 package \ && tar -xzvf $MINIFI_BASE_DIR/build/nifi-minifi-cpp-$MINIFI_VERSION-bin.tar.gz -C $MINIFI_BASE_DIR http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e14125e5/docker/test/integration/README.md ---------------------------------------------------------------------- diff --git a/docker/test/integration/README.md b/docker/test/integration/README.md index c4fb9f4..1cb6625 100644 --- a/docker/test/integration/README.md +++ b/docker/test/integration/README.md @@ -19,6 +19,9 @@ Apache MiNiFi includes a suite of docker-based system integration tests. These tests are designed to test the integration between distinct MiNiFi instances as well as other systems which are available in docker, such as Apache NiFi. +* Currently test_https.py does not work due to the upgrade to NiFi 1.7. This will be resolved as + soon as possible. + ## Test Execution Lifecycle Each test involves the following stages as part of its execution lifecycle: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e14125e5/docker/test/integration/minifi/__init__.py ---------------------------------------------------------------------- diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py index ec71e74..7b6752f 100644 --- a/docker/test/integration/minifi/__init__.py +++ b/docker/test/integration/minifi/__init__.py @@ -18,7 +18,7 @@ import tarfile import uuid import xml.etree.cElementTree as elementTree from xml.etree.cElementTree import Element -from StringIO import StringIO +from io import StringIO from io import BytesIO from textwrap import dedent @@ -60,7 +60,7 @@ class SingleNodeDockerCluster(Cluster): def __init__(self): self.minifi_version = os.environ['MINIFI_VERSION'] - self.nifi_version = '1.5.0' + self.nifi_version = '1.7.0' self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version self.network = None @@ -83,7 +83,7 @@ class SingleNodeDockerCluster(Cluster): if vols is None: vols = {} - logging.info('Deploying %s flow...', engine) + logging.info('Deploying %s flow...%s', engine,name) if name is None: name = engine + '-' + str(uuid.uuid4()) @@ -110,17 +110,17 @@ class SingleNodeDockerCluster(Cluster): ADD config.yml {minifi_root}/conf/config.yml RUN chown minificpp:minificpp {minifi_root}/conf/config.yml USER minificpp - """.format(name=name, + """.format(name=name,hostname=name, base_image='apacheminificpp:' + self.minifi_version, minifi_root=self.minifi_root)) test_flow_yaml = minifi_flow_yaml(flow) logging.info('Using generated flow config yml:\n%s', test_flow_yaml) - conf_file_buffer = StringIO() + conf_file_buffer = BytesIO() try: - conf_file_buffer.write(test_flow_yaml) + conf_file_buffer.write(test_flow_yaml.encode('utf-8')) conf_file_len = conf_file_buffer.tell() conf_file_buffer.seek(0) @@ -140,7 +140,7 @@ class SingleNodeDockerCluster(Cluster): logging.info('Creating and running docker container for flow...') container = self.client.containers.run( - configured_image, + configured_image[0], detach=True, name=name, network=self.network.name, @@ -169,7 +169,7 @@ class SingleNodeDockerCluster(Cluster): try: with gzip.GzipFile(mode='wb', fileobj=conf_file_buffer) as conf_gz_file_buffer: - conf_gz_file_buffer.write(test_flow_xml) + conf_gz_file_buffer.write(test_flow_xml.encode()) conf_file_len = conf_file_buffer.tell() conf_file_buffer.seek(0) @@ -189,9 +189,10 @@ class SingleNodeDockerCluster(Cluster): logging.info('Creating and running docker container for flow...') container = self.client.containers.run( - configured_image, + configured_image[0], detach=True, name=name, + hostname=name, network=self.network.name, volumes=vols) @@ -200,17 +201,17 @@ class SingleNodeDockerCluster(Cluster): self.containers.append(container) def build_image(self, dockerfile, context_files): - conf_dockerfile_buffer = StringIO() + conf_dockerfile_buffer = BytesIO() docker_context_buffer = BytesIO() try: # Overlay conf onto base nifi image - conf_dockerfile_buffer.write(dockerfile) + conf_dockerfile_buffer.write(dockerfile.encode()) conf_dockerfile_buffer.seek(0) with tarfile.open(mode='w', fileobj=docker_context_buffer) as docker_context: dockerfile_info = tarfile.TarInfo('Dockerfile') - dockerfile_info.size = conf_dockerfile_buffer.len + dockerfile_info.size = len(conf_dockerfile_buffer.getvalue()) docker_context.addfile(dockerfile_info, fileobj=conf_dockerfile_buffer) @@ -252,8 +253,8 @@ class SingleNodeDockerCluster(Cluster): # Clean up images for image in self.images: - logging.info('Cleaning up image: %s', image.id) - self.client.images.remove(image.id, force=True) + logging.info('Cleaning up image: %s', image[0].id) + self.client.images.remove(image[0].id, force=True) # Clean up network if self.network is not None: @@ -403,6 +404,7 @@ class ListenHTTP(Processor): if cert is not None: properties['SSL Certificate'] = cert + properties['SSL Verify Peer'] = 'no' super(ListenHTTP, self).__init__('ListenHTTP', properties=properties, @@ -422,6 +424,14 @@ class GetFile(Processor): schedule={'scheduling period': '0 sec'}, auto_terminate=['success']) +class GenerateFlowFile(Processor): + def __init__(self, file_size): + super(GenerateFlowFile, self).__init__('GenerateFlowFile', + properties={'File Size': file_size}, + schedule={'scheduling period': '0 sec'}, + auto_terminate=['success']) + + class PutFile(Processor): def __init__(self, output_dir): @@ -664,8 +674,7 @@ def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None): input_port_max_concurrent_tasks = Element('maxConcurrentTasks') input_port_max_concurrent_tasks.text = '1' input_port.append(input_port_max_concurrent_tasks) - - res.iterfind('rootGroup').next().append(input_port) + next( res.iterfind('rootGroup') ).append(input_port) if isinstance(connectable, Processor): conn_destination = Element('processor') @@ -738,7 +747,7 @@ def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None): proc_run_duration_nanos.text = str(connectable.schedule['run duration nanos']) conn_destination.append(proc_run_duration_nanos) - for property_key, property_value in connectable.properties.iteritems(): + for property_key, property_value in connectable.properties.items(): proc_property = Element('property') proc_property_name = Element('name') proc_property_name.text = connectable.nifi_property_key(property_key) @@ -752,8 +761,8 @@ def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None): proc_auto_terminated_relationship = Element('autoTerminatedRelationship') proc_auto_terminated_relationship.text = auto_terminate_rel conn_destination.append(proc_auto_terminated_relationship) - - res.iterfind('rootGroup').next().append(conn_destination) + next( res.iterfind('rootGroup') ).append(conn_destination) + """ res.iterfind('rootGroup').next().append(conn_destination) """ for svc in connectable.controller_services: if svc in visited: @@ -801,8 +810,8 @@ def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None): controller_service_property_value.text = property_value controller_service_property.append(controller_service_property_value) controller_service.append(controller_service_property) - - res.iterfind('rootGroup').next().append(controller_service) + next( res.iterfind('rootGroup') ).append(controller_service) + """ res.iterfind('rootGroup').next().append(controller_service)""" for conn_name in connectable.connections: conn_destinations = connectable.connections[conn_name] @@ -816,8 +825,8 @@ def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None): label_index, conn_destination, z_index) - - res.iterfind('rootGroup').next().append(connection) + next( res.iterfind('rootGroup') ).append(connection) + """ res.iterfind('rootGroup').next().append(connection) """ if conn_destination not in visited: nifi_flow_xml(conn_destination, nifi_version, res, visited) @@ -829,8 +838,8 @@ def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None): label_index, conn_destinations, z_index) - - res.iterfind('rootGroup').next().append(connection) + next( res.iterfind('rootGroup') ).append(connection) + """ res.iterfind('rootGroup').next().append(connection) """ if conn_destinations not in visited: nifi_flow_xml(conn_destinations, nifi_version, res, visited) @@ -838,7 +847,7 @@ def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None): if root is None: return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>' + "\n" - + elementTree.tostring(res, encoding='utf-8')) + + elementTree.tostring(res, encoding='utf-8').decode('utf-8')) def nifi_flow_xml_connection(res, bend_points, conn_name, connectable, label_index, destination, z_index): @@ -860,7 +869,8 @@ def nifi_flow_xml_connection(res, bend_points, conn_name, connectable, label_ind connection.append(connection_source_id) connection_source_group_id = Element('sourceGroupId') - connection_source_group_id.text = res.iterfind('rootGroup/id').next().text + connection_source_group_id.text = next( res.iterfind('rootGroup/id') ).text + """connection_source_group_id.text = res.iterfind('rootGroup/id').next().text""" connection.append(connection_source_group_id) connection_source_type = Element('sourceType') @@ -877,7 +887,8 @@ def nifi_flow_xml_connection(res, bend_points, conn_name, connectable, label_ind connection.append(connection_destination_id) connection_destination_group_id = Element('destinationGroupId') - connection_destination_group_id.text = res.iterfind('rootGroup/id').next().text + connection_destination_group_id.text = next(res.iterfind('rootGroup/id')).text + """ connection_destination_group_id.text = res.iterfind('rootGroup/id').next().text """ connection.append(connection_destination_group_id) connection_destination_type = Element('destinationType') http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e14125e5/docker/test/integration/minifi/test/__init__.py ---------------------------------------------------------------------- diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py index dbb3888..149382f 100644 --- a/docker/test/integration/minifi/test/__init__.py +++ b/docker/test/integration/minifi/test/__init__.py @@ -28,10 +28,9 @@ from minifi import SingleNodeDockerCluster logging.basicConfig(level=logging.DEBUG) - def put_file_contents(contents, file_abs_path): logging.info('Writing %d bytes of content to file: %s', len(contents), file_abs_path) - with open(file_abs_path, 'w') as test_input_file: + with open(file_abs_path, 'wb') as test_input_file: test_input_file.write(contents) @@ -41,20 +40,23 @@ class DockerTestCluster(SingleNodeDockerCluster): # Create test input/output directories test_cluster_id = str(uuid.uuid4()) + self.segfault = False + self.tmp_test_output_dir = '/tmp/.nifi-test-output.' + test_cluster_id self.tmp_test_input_dir = '/tmp/.nifi-test-input.' + test_cluster_id self.tmp_test_resources_dir = '/tmp/.nifi-test-resources.' + test_cluster_id logging.info('Creating tmp test input dir: %s', self.tmp_test_input_dir) - os.makedirs(self.tmp_test_input_dir, mode=0777) + os.makedirs(self.tmp_test_input_dir) logging.info('Creating tmp test output dir: %s', self.tmp_test_output_dir) - os.makedirs(self.tmp_test_output_dir, mode=0777) + os.makedirs(self.tmp_test_output_dir) logging.info('Creating tmp test resource dir: %s', self.tmp_test_resources_dir) - os.makedirs(self.tmp_test_resources_dir, mode=0777) + os.makedirs(self.tmp_test_resources_dir) # Point output validator to ephemeral output dir self.output_validator = output_validator - output_validator.set_output_dir(self.tmp_test_output_dir) + if isinstance(output_validator, SingleFileOutputValidator): + output_validator.set_output_dir(self.tmp_test_output_dir) # Start observing output dir self.done_event = Event() @@ -95,7 +97,7 @@ class DockerTestCluster(SingleNodeDockerCluster): file_name = str(uuid.uuid4()) file_abs_path = join(self.tmp_test_input_dir, file_name) - put_file_contents(contents, file_abs_path) + put_file_contents(contents.encode('utf-8'), file_abs_path) def put_test_resource(self, file_name, contents): """ @@ -117,12 +119,15 @@ class DockerTestCluster(SingleNodeDockerCluster): for container in self.containers: container = self.client.containers.get(container.id) logging.info('Container logs for container \'%s\':\n%s', container.name, container.logs()) + if b'Segmentation fault' in container.logs(): + self.segfault=True if container.status == 'running': - minifi_app_logs = container.exec_run('/bin/sh -c \'test -f ' + self.minifi_root + '/minifi-app.log ' + minifi_app_logs = container.exec_run('/bin/sh -c \'test -f ' + self.minifi_root + '/logs/minifi-app.log ' '&& cat ' + - self.minifi_root + '/minifi-app.log\'') + self.minifi_root + '/logs/minifi-app.log\'') if len(minifi_app_logs) > 0: logging.info('MiNiFi app logs for container \'%s\':\n%s', container.name, minifi_app_logs) + nifi_app_logs = container.exec_run('/bin/sh -c \'test -f ' + self.nifi_root + '/logs/nifi-app.log ' '&& cat ' + self.nifi_root + '/logs/nifi-app.log\'') @@ -142,7 +147,7 @@ class DockerTestCluster(SingleNodeDockerCluster): self.wait_for_output(timeout) self.log_nifi_output() - return self.output_validator.validate() + return self.output_validator.validate() and not self.segfault def __exit__(self, exc_type, exc_val, exc_tb): """ @@ -222,3 +227,11 @@ class SingleFileOutputValidator(OutputValidator): return True return False + + +class SegfaultValidator(OutputValidator): + """ + Validate that a file was received. + """ + def validate(self): + return True http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e14125e5/docker/test/integration/test_https.py ---------------------------------------------------------------------- diff --git a/docker/test/integration/test_https.py b/docker/test/integration/test_https.py deleted file mode 100644 index 9912370..0000000 --- a/docker/test/integration/test_https.py +++ /dev/null @@ -1,103 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the \"License\"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an \"AS IS\" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import time - -from M2Crypto import X509, EVP, RSA, ASN1 - -from minifi import * -from minifi.test import * - - -def callback(): - pass - - -def test_invoke_listen_https_one_way(): - """ - Verify sending using InvokeHTTP to a receiver using ListenHTTP (with TLS). - """ - - cert, key = gen_cert() - - # TODO define SSLContextService class & generate config yml for services - crt_file = '/tmp/resources/test-crt.pem' - - invoke_flow = (GetFile('/tmp/input') - >> LogAttribute() - >> InvokeHTTP('https://minifi-listen:4430/contentListener', - method='POST', - ssl_context_service=SSLContextService(ca_cert=crt_file))) - - listen_flow = (ListenHTTP(4430, cert=crt_file) - >> LogAttribute() - >> PutFile('/tmp/output')) - - with DockerTestCluster(SingleFileOutputValidator('test')) as cluster: - cluster.put_test_resource('test-crt.pem', cert.as_pem() + key.as_pem(None, callback)) - cluster.put_test_data('test') - cluster.deploy_flow(listen_flow, name='minifi-listen') - cluster.deploy_flow(invoke_flow, name='minifi-invoke') - - assert cluster.check_output() - - -def gen_cert(): - """ - Generate TLS certificate request for testing - """ - - req, key = gen_req() - pub_key = req.get_pubkey() - subject = req.get_subject() - cert = X509.X509() - # noinspection PyTypeChecker - cert.set_serial_number(1) - cert.set_version(2) - cert.set_subject(subject) - t = long(time.time()) - now = ASN1.ASN1_UTCTIME() - now.set_time(t) - now_plus_year = ASN1.ASN1_UTCTIME() - now_plus_year.set_time(t + 60 * 60 * 24 * 365) - cert.set_not_before(now) - cert.set_not_after(now_plus_year) - issuer = X509.X509_Name() - issuer.C = 'US' - issuer.CN = 'minifi-listen' - cert.set_issuer(issuer) - cert.set_pubkey(pub_key) - cert.sign(key, 'sha256') - - return cert, key - - -def gen_req(): - """ - Generate TLS certificate request for testing - """ - - logging.info('Generating test certificate request') - key = EVP.PKey() - req = X509.Request() - rsa = RSA.gen_key(1024, 65537, callback) - key.assign_rsa(rsa) - req.set_pubkey(key) - name = req.get_subject() - name.C = 'US' - name.CN = 'minifi-listen' - req.sign(key, 'sha256') - - return req, key http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e14125e5/docker/test/integration/test_zero_file.py ---------------------------------------------------------------------- diff --git a/docker/test/integration/test_zero_file.py b/docker/test/integration/test_zero_file.py new file mode 100644 index 0000000..23a0e99 --- /dev/null +++ b/docker/test/integration/test_zero_file.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from minifi import * +from minifi.test import * + + +def test_minifi_to_nifi(): + """ + Verify sending data from a MiNiFi - C++ to NiFi using S2S protocol. + """ + + port = InputPort('from-minifi', RemoteProcessGroup('http://nifi:8080/nifi')) + + recv_flow = (port + >> LogAttribute() + >> PutFile('/tmp/output')) + + send_flow = (GenerateFlowFile('0B') + >> LogAttribute() + >> port) + + with DockerTestCluster(SegfaultValidator()) as cluster: + cluster.deploy_flow(recv_flow, name='nifi', engine='nifi') + cluster.deploy_flow(send_flow) + assert cluster.check_output(60) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e14125e5/docker/test/test_https.py ---------------------------------------------------------------------- diff --git a/docker/test/test_https.py b/docker/test/test_https.py new file mode 100644 index 0000000..79a565f --- /dev/null +++ b/docker/test/test_https.py @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the \"License\"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an \"AS IS\" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +from M2Crypto import X509, EVP, RSA, ASN1 + +from minifi import * +from minifi.test import * + + +def callback(): + pass + + +def test_invoke_listen_https_one_way(): + """ + Verify sending using InvokeHTTP to a receiver using ListenHTTP (with TLS). + """ + + cert, key = gen_cert() + + # TODO define SSLContextService class & generate config yml for services + crt_file = '/tmp/resources/test-crt.pem' + + invoke_flow = (GetFile('/tmp/input') + >> InvokeHTTP('https://minifi-listen:4430/contentListener', + method='POST', + ssl_context_service=SSLContextService(cert=crt_file, ca_cert=crt_file))) + + listen_flow = (ListenHTTP(4430, cert=crt_file) + >> LogAttribute() + >> PutFile('/tmp/output')) + + with DockerTestCluster(SingleFileOutputValidator('test')) as cluster: + cluster.put_test_resource('test-crt.pem', cert.as_pem() + key.as_pem(None, callback)) + cluster.put_test_data('test') + cluster.deploy_flow(listen_flow, name='minifi-listen') + cluster.deploy_flow(invoke_flow, name='minifi-invoke') + + assert cluster.check_output() + + +def gen_cert(): + """ + Generate TLS certificate request for testing + """ + + req, key = gen_req() + pub_key = req.get_pubkey() + subject = req.get_subject() + cert = X509.X509() + # noinspection PyTypeChecker + cert.set_serial_number(1) + cert.set_version(2) + cert.set_subject(subject) + t = int(time.time()) + now = ASN1.ASN1_UTCTIME() + now.set_time(t) + now_plus_year = ASN1.ASN1_UTCTIME() + now_plus_year.set_time(t + 60 * 60 * 24 * 365) + cert.set_not_before(now) + cert.set_not_after(now_plus_year) + issuer = X509.X509_Name() + issuer.C = 'US' + issuer.CN = 'minifi-listen' + cert.set_issuer(issuer) + cert.set_pubkey(pub_key) + cert.sign(key, 'sha256') + + return cert, key + + +def gen_req(): + """ + Generate TLS certificate request for testing + """ + + logging.info('Generating test certificate request') + key = EVP.PKey() + req = X509.Request() + rsa = RSA.gen_key(1024, 65537, callback) + key.assign_rsa(rsa) + req.set_pubkey(key) + name = req.get_subject() + name.C = 'US' + name.CN = 'minifi-listen' + req.sign(key, 'sha256') + + return req, key http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e14125e5/libminifi/src/controllers/SSLContextService.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp index 8d0a997..9cfb2f4 100644 --- a/libminifi/src/controllers/SSLContextService.cpp +++ b/libminifi/src/controllers/SSLContextService.cpp @@ -182,7 +182,7 @@ void SSLContextService::onEnable() { passphrase_file_ = test_passphrase; passphrase_.assign((std::istreambuf_iterator<char>(passphrase_file_test)), std::istreambuf_iterator<char>()); } else { - valid_ = false; + // not an invalid file since we support a passphrase of unencrypted text } passphrase_file_test.close(); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e14125e5/libminifi/src/sitetosite/SiteToSiteClient.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/sitetosite/SiteToSiteClient.cpp b/libminifi/src/sitetosite/SiteToSiteClient.cpp index 2b103a7..0eafb2b 100644 --- a/libminifi/src/sitetosite/SiteToSiteClient.cpp +++ b/libminifi/src/sitetosite/SiteToSiteClient.cpp @@ -447,8 +447,9 @@ int16_t SiteToSiteClient::send(std::string transactionID, DataPacket *packet, co int ret; std::shared_ptr<Transaction> transaction = NULL; - if (flowFile && !flowFile->getResourceClaim()->exists()) { - logger_->log_debug("Claim %s does not exist for FlowFile %s", flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr()); + if (flowFile && (flowFile->getResourceClaim() == nullptr || !flowFile->getResourceClaim()->exists())) { + auto path = flowFile->getResourceClaim() != nullptr ? flowFile->getResourceClaim()->getContentFullPath() : "nullclaim"; + logger_->log_debug("Claim %s does not exist for FlowFile %s", path, flowFile->getUUIDStr()); return -2; } if (peer_state_ != READY) {
