This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ce3d1f8 MINIFICPP-1445 - Refactor docker integration test frame
ce3d1f8 is described below
commit ce3d1f8bdc2d2236f471ee960ab2ee05c1fe927f
Author: Adam Hunyadi <[email protected]>
AuthorDate: Mon Jan 4 17:18:10 2021 +0100
MINIFICPP-1445 - Refactor docker integration test frame
Signed-off-by: Arpad Boda <[email protected]>
This closes #968
---
.gitignore | 1 +
docker/test/integration/minifi/__init__.py | 1105 +-------------------
docker/test/integration/minifi/core/Cluster.py | 22 +
docker/test/integration/minifi/core/Connectable.py | 77 ++
.../integration/minifi/core/ControllerService.py | 17 +
.../__init__.py => core/DockerTestCluster.py} | 178 +---
docker/test/integration/minifi/core/InputPort.py | 7 +
.../integration/minifi/core/OutputEventHandler.py | 24 +
docker/test/integration/minifi/core/Processor.py | 45 +
.../integration/minifi/core/RemoteProcessGroup.py | 13 +
.../integration/minifi/core/SSLContextService.py | 16 +
.../minifi/core/SingleNodeDockerCluster.py | 319 ++++++
docker/test/integration/minifi/core/__init__.py | 0
.../Minifi_flow_yaml_serializer.py | 111 ++
.../flow_serialization/Nifi_flow_xml_serializer.py | 324 ++++++
.../minifi/flow_serialization/__init__.py | 0
.../minifi/processors/DeleteS3Object.py | 22 +
.../minifi/processors/GenerateFlowFile.py | 8 +
.../test/integration/minifi/processors/GetFile.py | 8 +
.../integration/minifi/processors/InvokeHTTP.py | 30 +
.../integration/minifi/processors/ListenHTTP.py | 14 +
.../integration/minifi/processors/LogAttribute.py | 7 +
.../integration/minifi/processors/PublishKafka.py | 10 +
.../minifi/processors/PublishKafkaSSL.py | 16 +
.../test/integration/minifi/processors/PutFile.py | 14 +
.../integration/minifi/processors/PutS3Object.py | 20 +
.../test/integration/minifi/processors/__init__.py | 0
.../minifi/validators/EmptyFilesOutPutValidator.py | 27 +
.../minifi/validators/FileOutputValidator.py | 8 +
.../minifi/validators/NoFileOutPutValidator.py | 25 +
.../minifi/validators/OutputValidator.py | 12 +
.../minifi/validators/SegfaultValidator.py | 8 +
.../minifi/validators/SingleFileOutputValidator.py | 44 +
.../test/integration/minifi/validators/__init__.py | 0
docker/test/integration/test_filesystem_ops.py | 2 -
docker/test/integration/test_filter_zero_file.py | 1 -
docker/test/integration/test_hash_content.py | 2 -
docker/test/integration/test_http.py | 2 -
docker/test/integration/test_rdkafka.py | 2 -
docker/test/integration/test_s2s.py | 2 -
docker/test/integration/test_s3.py | 2 -
docker/test/integration/test_zero_file.py | 1 -
docker/test/test_https.py | 2 -
43 files changed, 1298 insertions(+), 1250 deletions(-)
diff --git a/.gitignore b/.gitignore
index 46f1e22..cc40bfd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -57,6 +57,7 @@ libminifi/src/agent/agent_version.cpp
docs/generated
thirdparty/apache-rat/apache-rat*
/compile_commands.json
+__pycache__/
# Ignore source files that have been placed in the docker directory during
build
docker/minificppsource
diff --git a/docker/test/integration/minifi/__init__.py
b/docker/test/integration/minifi/__init__.py
index b64d541..fead6a1 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -30,1076 +30,39 @@ from copy import copy
import time
from collections import OrderedDict
-class Cluster(object):
- """
- Base Cluster class. This is intended to be a generic interface
- to different types of clusters. Clusters could be Kubernetes clusters,
- Docker swarms, or cloud compute/container services.
- """
+from .core.Connectable import Connectable
+from .core.Cluster import Cluster
+from .core.Connectable import Connectable
+from .core.ControllerService import ControllerService
+from .core.InputPort import InputPort
+from .core.Processor import Processor
+from .core.RemoteProcessGroup import RemoteProcessGroup
+from .core.SingleNodeDockerCluster import SingleNodeDockerCluster
+from .core.SSLContextService import SSLContextService
+from .core.DockerTestCluster import DockerTestCluster
+from .core.OutputEventHandler import OutputEventHandler
+
+from .flow_serialization.Minifi_flow_yaml_serializer import
Minifi_flow_yaml_serializer
+from .flow_serialization.Nifi_flow_xml_serializer import
Nifi_flow_xml_serializer
+
+from .processors.GenerateFlowFile import GenerateFlowFile
+from .processors.GetFile import GetFile
+from .processors.InvokeHTTP import InvokeHTTP
+from .processors.ListenHTTP import ListenHTTP
+from .processors.LogAttribute import LogAttribute
+from .processors.PublishKafka import PublishKafka
+from .processors.PublishKafkaSSL import PublishKafkaSSL
+from .processors.PutFile import PutFile
+from .processors.PutS3Object import PutS3Object
+from .processors.DeleteS3Object import DeleteS3Object
+
+from .validators.OutputValidator import OutputValidator
+from .validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator
+from .validators.SegfaultValidator import SegfaultValidator
+from .validators.NoFileOutPutValidator import NoFileOutPutValidator
+from .validators.SingleFileOutputValidator import SingleFileOutputValidator
+from .validators.FileOutputValidator import FileOutputValidator
+
+logging.basicConfig(level=logging.DEBUG)
- def deploy_flow(self, flow, name=None, vols=None):
- """
- Deploys a flow to the cluster.
- """
- def __enter__(self):
- """
- Allocate ephemeral cluster resources.
- """
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- """
- Clean up ephemeral cluster resources.
- """
-
-
-class SingleNodeDockerCluster(Cluster):
- """
- A "cluster" which consists of a single docker node. Useful for
- testing or use-cases which do not span multiple compute nodes.
- """
-
- def __init__(self):
- self.minifi_version = os.environ['MINIFI_VERSION']
- self.nifi_version = '1.7.0'
- self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version
- self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version
- self.network = None
- self.containers = OrderedDict()
- self.images = []
- self.tmp_files = []
-
- # Get docker client
- self.client = docker.from_env()
-
- def deploy_flow(self,
- flow,
- name=None,
- vols=None,
- engine='minifi-cpp'):
- """
- Compiles the flow to a valid config file and overlays it into a new
image.
- """
-
- if vols is None:
- vols = {}
-
- logging.info('Deploying %s flow...%s', engine,name)
-
- if name is None:
- name = engine + '-' + str(uuid.uuid4())
- logging.info('Flow name was not provided; using generated name
\'%s\'', name)
-
- # Create network if necessary
- if self.network is None:
- net_name = 'nifi-' + str(uuid.uuid4())
- logging.info('Creating network: %s', net_name)
- self.network = self.client.networks.create(net_name)
-
- if engine == 'nifi':
- self.deploy_nifi_flow(flow, name, vols)
- elif engine == 'minifi-cpp':
- self.deploy_minifi_cpp_flow(flow, name, vols)
- elif engine == 'kafka-broker':
- self.deploy_kafka_broker(name)
- elif engine == 'http-proxy':
- self.deploy_http_proxy()
- elif engine == 's3-server':
- self.deploy_s3_server()
- else:
- raise Exception('invalid flow engine: \'%s\'' % engine)
-
- def deploy_minifi_cpp_flow(self, flow, name, vols):
-
- # Build configured image
- dockerfile = dedent("""FROM {base_image}
- USER root
- ADD config.yml {minifi_root}/conf/config.yml
- RUN chown minificpp:minificpp {minifi_root}/conf/config.yml
- USER minificpp
- """.format(name=name,hostname=name,
- base_image='apacheminificpp:' + self.minifi_version,
- minifi_root=self.minifi_root))
-
- test_flow_yaml = minifi_flow_yaml(flow)
- logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
-
- conf_file_buffer = BytesIO()
-
- try:
- conf_file_buffer.write(test_flow_yaml.encode('utf-8'))
- conf_file_len = conf_file_buffer.tell()
- conf_file_buffer.seek(0)
-
- context_files = [
- {
- 'name': 'config.yml',
- 'size': conf_file_len,
- 'file_obj': conf_file_buffer
- }
- ]
-
- configured_image = self.build_image(dockerfile, context_files)
-
- finally:
- conf_file_buffer.close()
-
- logging.info('Creating and running docker container for flow...')
-
- container = self.client.containers.run(
- configured_image[0],
- detach=True,
- name=name,
- network=self.network.name,
- volumes=vols)
-
- logging.info('Started container \'%s\'', container.name)
-
- self.containers[container.name] = container
-
- def deploy_nifi_flow(self, flow, name, vols):
- dockerfile = dedent(r"""FROM {base_image}
- USER root
- ADD flow.xml.gz {nifi_root}/conf/flow.xml.gz
- RUN chown nifi:nifi {nifi_root}/conf/flow.xml.gz
- RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/'
{nifi_root}/conf/nifi.properties
- RUN sed -i -e
's/^\(nifi.remote.input.socket.port\)=.*/\1=5000/'
{nifi_root}/conf/nifi.properties
- USER nifi
- """.format(name=name,
- base_image='apache/nifi:' + self.nifi_version,
- nifi_root=self.nifi_root))
-
- test_flow_xml = nifi_flow_xml(flow, self.nifi_version)
- logging.info('Using generated flow config xml:\n%s', test_flow_xml)
-
- conf_file_buffer = BytesIO()
-
- try:
- with gzip.GzipFile(mode='wb', fileobj=conf_file_buffer) as
conf_gz_file_buffer:
- conf_gz_file_buffer.write(test_flow_xml.encode())
- conf_file_len = conf_file_buffer.tell()
- conf_file_buffer.seek(0)
-
- context_files = [
- {
- 'name': 'flow.xml.gz',
- 'size': conf_file_len,
- 'file_obj': conf_file_buffer
- }
- ]
-
- configured_image = self.build_image(dockerfile, context_files)
-
- finally:
- conf_file_buffer.close()
-
- logging.info('Creating and running docker container for flow...')
-
- container = self.client.containers.run(
- configured_image[0],
- detach=True,
- name=name,
- hostname=name,
- network=self.network.name,
- volumes=vols)
-
- logging.info('Started container \'%s\'', container.name)
-
- self.containers[container.name] = container
-
- def deploy_kafka_broker(self, name):
- logging.info('Creating and running docker containers for kafka
broker...')
- zookeeper = self.client.containers.run(
- self.client.images.pull("wurstmeister/zookeeper:latest"),
- detach=True,
- name='zookeeper',
- network=self.network.name,
- ports={'2181/tcp': 2181},
- )
- self.containers[zookeeper.name] = zookeeper
-
- test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on
DockerVerify.sh
- broker_image = self.build_image_by_path(test_dir +
"/resources/kafka_broker", 'minifi-kafka')
- broker = self.client.containers.run(
- broker_image[0],
- detach=True,
- name='kafka-broker',
- network=self.network.name,
- ports={'9092/tcp': 9092},
-
environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093",
"KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"],
- )
- self.containers[broker.name] = broker
-
- dockerfile = dedent("""FROM {base_image}
- USER root
- CMD $KAFKA_HOME/bin/kafka-console-consumer.sh
--bootstrap-server kafka-broker:9092 --topic test > heaven_signal.txt
- """.format(base_image='wurstmeister/kafka:2.12-2.5.0'))
- configured_image = self.build_image(dockerfile, [])
- consumer = self.client.containers.run(
- configured_image[0],
- detach=True,
- name='kafka-consumer',
- network=self.network.name,
- )
- self.containers[consumer.name] = consumer
-
- def deploy_http_proxy(self):
- logging.info('Creating and running http-proxy docker container...')
- dockerfile = dedent("""FROM {base_image}
- RUN apt update && apt install -y apache2-utils
- RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username}
{proxy_password}
- RUN echo 'auth_param basic program
/usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users' >
/etc/squid/squid.conf && \
- echo 'auth_param basic realm proxy' >>
/etc/squid/squid.conf && \
- echo 'acl authenticated proxy_auth REQUIRED' >>
/etc/squid/squid.conf && \
- echo 'http_access allow authenticated' >>
/etc/squid/squid.conf && \
- echo 'http_port {proxy_port}' >> /etc/squid/squid.conf
- ENTRYPOINT ["/sbin/entrypoint.sh"]
- """.format(base_image='sameersbn/squid:3.5.27-2',
proxy_username='admin', proxy_password='test101', proxy_port='3128'))
- configured_image = self.build_image(dockerfile, [])
- consumer = self.client.containers.run(
- configured_image[0],
- detach=True,
- name='http-proxy',
- network=self.network.name,
- ports={'3128/tcp': 3128},
- )
- self.containers[consumer.name] = consumer
-
- def deploy_s3_server(self):
- consumer = self.client.containers.run(
- "adobe/s3mock:2.1.28",
- detach=True,
- name='s3-server',
- network=self.network.name,
- ports={'9090/tcp': 9090, '9191/tcp': 9191},
- environment=["initialBuckets=test_bucket"],
- )
- self.containers[consumer.name] = consumer
-
- def build_image(self, dockerfile, context_files):
- conf_dockerfile_buffer = BytesIO()
- docker_context_buffer = BytesIO()
-
- try:
- # Overlay conf onto base nifi image
- conf_dockerfile_buffer.write(dockerfile.encode())
- conf_dockerfile_buffer.seek(0)
-
- with tarfile.open(mode='w', fileobj=docker_context_buffer) as
docker_context:
- dockerfile_info = tarfile.TarInfo('Dockerfile')
- dockerfile_info.size =
conf_dockerfile_buffer.getbuffer().nbytes
- docker_context.addfile(dockerfile_info,
- fileobj=conf_dockerfile_buffer)
-
- for context_file in context_files:
- file_info = tarfile.TarInfo(context_file['name'])
- file_info.size = context_file['size']
- docker_context.addfile(file_info,
- fileobj=context_file['file_obj'])
- docker_context_buffer.seek(0)
-
- logging.info('Creating configured image...')
- configured_image =
self.client.images.build(fileobj=docker_context_buffer,
- custom_context=True,
- rm=True,
- forcerm=True)
- logging.info('Created image with id: %s', configured_image[0].id)
- self.images.append(configured_image)
-
- finally:
- conf_dockerfile_buffer.close()
- docker_context_buffer.close()
-
- return configured_image
-
- def build_image_by_path(self, dir, name=None):
- try:
- logging.info('Creating configured image...')
- configured_image = self.client.images.build(path=dir,
- tag=name,
- rm=True,
- forcerm=True)
- logging.info('Created image with id: %s', configured_image[0].id)
- self.images.append(configured_image)
- return configured_image
- except Exception as e:
- logging.info(e)
- raise
-
- def __enter__(self):
- """
- Allocate ephemeral cluster resources.
- """
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- """
- Clean up ephemeral cluster resources
- """
-
- # Clean up containers
- for container in self.containers.values():
- logging.info('Cleaning up container: %s', container.name)
- container.remove(v=True, force=True)
-
- # Clean up images
- for image in reversed(self.images):
- logging.info('Cleaning up image: %s', image[0].id)
- self.client.images.remove(image[0].id, force=True)
-
- # Clean up network
- if self.network is not None:
- logging.info('Cleaning up network network: %s', self.network.name)
- self.network.remove()
-
- # Clean up tmp files
- for tmp_file in self.tmp_files:
- os.remove(tmp_file)
-
-
-class Connectable(object):
- def __init__(self,
- name=None,
- auto_terminate=None):
-
- self.uuid = uuid.uuid4()
-
- if name is None:
- self.name = str(self.uuid)
- else:
- self.name = name
-
- if auto_terminate is None:
- self.auto_terminate = []
- else:
- self.auto_terminate = auto_terminate
-
- self.connections = {}
- self.out_proc = self
-
- self.drop_empty_flowfiles = False
-
- def connect(self, connections):
- for rel in connections:
-
- # Ensure that rel is not auto-terminated
- if rel in self.auto_terminate:
- del self.auto_terminate[self.auto_terminate.index(rel)]
-
- # Add to set of output connections for this rel
- if rel not in self.connections:
- self.connections[rel] = []
- self.connections[rel].append(connections[rel])
-
- return self
-
- def __rshift__(self, other):
- """
- Right shift operator to support flow DSL, for example:
-
- GetFile('/input') >> LogAttribute() >> PutFile('/output')
-
- """
-
- connected = copy(self)
- connected.connections = copy(self.connections)
-
- if self.out_proc is self:
- connected.out_proc = connected
- else:
- connected.out_proc = copy(connected.out_proc)
-
- if isinstance(other, tuple):
- if isinstance(other[0], tuple):
- for rel_tuple in other:
- rel = {rel_tuple[0]: rel_tuple[1]}
- connected.out_proc.connect(rel)
- else:
- rel = {other[0]: other[1]}
- connected.out_proc.connect(rel)
- else:
- connected.out_proc.connect({'success': other})
- connected.out_proc = other
-
- return connected
-
- def __invert__(self):
- """
- Invert operation to set empty file filtering on incoming connections
- GetFile('/input') >> ~LogAttribute()
- """
- self.drop_empty_flowfiles = True
-
- return self
-
-
-class Processor(Connectable):
- def __init__(self,
- clazz,
- properties=None,
- schedule=None,
- name=None,
- controller_services=None,
- auto_terminate=None):
-
- super(Processor, self).__init__(name=name,
- auto_terminate=auto_terminate)
-
- if controller_services is None:
- controller_services = []
-
- if schedule is None:
- schedule = {}
-
- if properties is None:
- properties = {}
-
- if name is None:
- pass
-
- self.clazz = clazz
- self.properties = properties
- self.controller_services = controller_services
-
- self.schedule = {
- 'scheduling strategy': 'TIMER_DRIVEN',
- 'scheduling period': '1 sec',
- 'penalization period': '30 sec',
- 'yield period': '1 sec',
- 'run duration nanos': 0
- }
- self.schedule.update(schedule)
-
- def nifi_property_key(self, key):
- """
- Returns the Apache NiFi-equivalent property key for the given key.
This is often, but not always, the same as
- the internal key.
- """
- return key
-
-
-class InvokeHTTP(Processor):
- def __init__(self, url,
- method='GET',
- proxy_host='',
- proxy_port='',
- proxy_username='',
- proxy_password='',
- ssl_context_service=None,
- schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
- properties = {'Remote URL': url,
- 'HTTP Method': method,
- 'Proxy Host': proxy_host,
- 'Proxy Port': proxy_port,
- 'invokehttp-proxy-username': proxy_username,
- 'invokehttp-proxy-password': proxy_password}
-
- controller_services = []
-
- if ssl_context_service is not None:
- properties['SSL Context Service'] = ssl_context_service.name
- controller_services.append(ssl_context_service)
-
- super(InvokeHTTP, self).__init__('InvokeHTTP',
- properties=properties,
-
controller_services=controller_services,
- auto_terminate=['success',
- 'response',
- 'retry',
- 'failure',
- 'no retry'],
- schedule=schedule)
-
-
-class ListenHTTP(Processor):
- def __init__(self, port, cert=None, schedule=None):
- properties = {'Listening Port': port}
-
- if cert is not None:
- properties['SSL Certificate'] = cert
- properties['SSL Verify Peer'] = 'no'
-
- super(ListenHTTP, self).__init__('ListenHTTP',
- properties=properties,
- auto_terminate=['success'],
- schedule=schedule)
-
-
-class LogAttribute(Processor):
- def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
- super(LogAttribute, self).__init__('LogAttribute',
- auto_terminate=['success'],
- schedule=schedule)
-
-
-class GetFile(Processor):
- def __init__(self, input_dir, schedule={'scheduling period': '2 sec'}):
- super(GetFile, self).__init__('GetFile',
- properties={'Input Directory':
input_dir, 'Keep Source File': 'true'},
- schedule=schedule,
- auto_terminate=['success'])
-
-
-class GenerateFlowFile(Processor):
- def __init__(self, file_size, schedule={'scheduling period': '0 sec'}):
- super(GenerateFlowFile, self).__init__('GenerateFlowFile',
- properties={'File Size': file_size},
- schedule=schedule,
- auto_terminate=['success'])
-
-
-class PutFile(Processor):
- def __init__(self, output_dir, schedule={'scheduling strategy':
'EVENT_DRIVEN'}):
- super(PutFile, self).__init__('PutFile',
- properties={'Directory': output_dir,
'Directory Permissions': '777', 'Permissions': '777'},
- auto_terminate=['success', 'failure'],
- schedule=schedule)
-
- def nifi_property_key(self, key):
- if key == 'Directory Permissions':
- return None
- else:
- return key
-
-
-class PublishKafka(Processor):
- def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
- super(PublishKafka, self).__init__('PublishKafka',
- properties={'Client Name':
'nghiaxlee', 'Known Brokers': 'kafka-broker:9092', 'Topic Name': 'test',
- 'Batch Size': '10',
'Compress Codec': 'none', 'Delivery Guarantee': '1',
- 'Request Timeout': '10
sec', 'Message Timeout': '12 sec'},
- auto_terminate=['success'],
- schedule=schedule)
-
-
-class PublishKafkaSSL(Processor):
- def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
- super(PublishKafkaSSL, self).__init__('PublishKafka',
- properties={'Client Name':
'LMN', 'Known Brokers': 'kafka-broker:9093',
- 'Topic Name':
'test', 'Batch Size': '10',
- 'Compress Codec':
'none', 'Delivery Guarantee': '1',
- 'Request Timeout':
'10 sec', 'Message Timeout': '12 sec',
- 'Security CA':
'/tmp/resources/certs/ca-cert',
- 'Security Cert':
'/tmp/resources/certs/client_LMN_client.pem',
- 'Security Pass
Phrase': 'abcdefgh',
- 'Security Private
Key': '/tmp/resources/certs/client_LMN_client.key',
- 'Security Protocol':
'ssl'},
- auto_terminate=['success'],
- schedule=schedule)
-
-class PutS3Object(Processor):
- def __init__(self,
- proxy_host='',
- proxy_port='',
- proxy_username='',
- proxy_password=''):
- super(PutS3Object, self).__init__('PutS3Object',
- properties={
- 'Object Key': 'test_object_key',
- 'Bucket': 'test_bucket',
- 'Access Key': 'test_access_key',
- 'Secret Key': 'test_secret',
- 'Endpoint Override URL':
"http://s3-server:9090",
- 'Proxy Host': proxy_host,
- 'Proxy Port': proxy_port,
- 'Proxy Username': proxy_username,
- 'Proxy Password': proxy_password,
- },
- auto_terminate=['success'])
-
-class DeleteS3Object(Processor):
- def __init__(self,
- proxy_host='',
- proxy_port='',
- proxy_username='',
- proxy_password=''):
- super(DeleteS3Object, self).__init__('DeleteS3Object',
- properties={
- 'Object Key': 'test_object_key',
- 'Bucket': 'test_bucket',
- 'Access Key': 'test_access_key',
- 'Secret Key': 'test_secret',
- 'Endpoint Override URL':
"http://s3-server:9090",
- 'Proxy Host': proxy_host,
- 'Proxy Port': proxy_port,
- 'Proxy Username': proxy_username,
- 'Proxy Password': proxy_password,
- },
- auto_terminate=['success'])
-
-class InputPort(Connectable):
- def __init__(self, name=None, remote_process_group=None):
- super(InputPort, self).__init__(name=name)
-
- self.remote_process_group = remote_process_group
-
-
-class RemoteProcessGroup(object):
- def __init__(self, url,
- name=None):
-
- self.uuid = uuid.uuid4()
-
- if name is None:
- self.name = str(self.uuid)
- else:
- self.name = name
-
- self.url = url
-
-
-class ControllerService(object):
- def __init__(self, name=None, properties=None):
-
- self.id = str(uuid.uuid4())
-
- if name is None:
- self.name = str(uuid.uuid4())
- logging.info('Controller service name was not provided; using
generated name \'%s\'', self.name)
- else:
- self.name = name
-
- if properties is None:
- properties = {}
-
- self.properties = properties
-
-
-class SSLContextService(ControllerService):
- def __init__(self, name=None, cert=None, key=None, ca_cert=None):
- super(SSLContextService, self).__init__(name=name)
-
- self.service_class = 'SSLContextService'
-
- if cert is not None:
- self.properties['Client Certificate'] = cert
-
- if key is not None:
- self.properties['Private Key'] = key
-
- if ca_cert is not None:
- self.properties['CA Certificate'] = ca_cert
-
-
-def minifi_flow_yaml(connectable, root=None, visited=None):
- if visited is None:
- visited = []
-
- if root is None:
- res = {
- 'Flow Controller': {
- 'name': 'MiNiFi Flow'
- },
- 'Processors': [],
- 'Connections': [],
- 'Remote Processing Groups': [],
- 'Controller Services': []
- }
- else:
- res = root
-
- visited.append(connectable)
-
- if hasattr(connectable, 'name'):
- connectable_name = connectable.name
- else:
- connectable_name = str(connectable.uuid)
-
- if isinstance(connectable, InputPort):
- group = connectable.remote_process_group
- res_group = None
-
- for res_group_candidate in res['Remote Processing Groups']:
- assert isinstance(res_group_candidate, dict)
- if res_group_candidate['id'] == str(group.uuid):
- res_group = res_group_candidate
-
- if res_group is None:
- res_group = {
- 'name': group.name,
- 'id': str(group.uuid),
- 'url': group.url,
- 'timeout': '30 sec',
- 'yield period': '3 sec',
- 'Input Ports': []
- }
-
- res['Remote Processing Groups'].append(res_group)
-
- res_group['Input Ports'].append({
- 'id': str(connectable.uuid),
- 'name': connectable.name,
- 'max concurrent tasks': 1,
- 'Properties': {}
- })
-
- if isinstance(connectable, Processor):
- res['Processors'].append({
- 'name': connectable_name,
- 'id': str(connectable.uuid),
- 'class': 'org.apache.nifi.processors.standard.' +
connectable.clazz,
- 'scheduling strategy': connectable.schedule['scheduling strategy'],
- 'scheduling period': connectable.schedule['scheduling period'],
- 'penalization period': connectable.schedule['penalization period'],
- 'yield period': connectable.schedule['yield period'],
- 'run duration nanos': connectable.schedule['run duration nanos'],
- 'Properties': connectable.properties,
- 'auto-terminated relationships list': connectable.auto_terminate
- })
-
- for svc in connectable.controller_services:
- if svc in visited:
- continue
-
- visited.append(svc)
- res['Controller Services'].append({
- 'name': svc.name,
- 'id': svc.id,
- 'class': svc.service_class,
- 'Properties': svc.properties
- })
-
- for conn_name in connectable.connections:
- conn_procs = connectable.connections[conn_name]
-
- if isinstance(conn_procs, list):
- for proc in conn_procs:
- res['Connections'].append({
- 'name': str(uuid.uuid4()),
- 'source id': str(connectable.uuid),
- 'source relationship name': conn_name,
- 'destination id': str(proc.uuid),
- 'drop empty': ("true" if proc.drop_empty_flowfiles else
"false")
- })
- if proc not in visited:
- minifi_flow_yaml(proc, res, visited)
- else:
- res['Connections'].append({
- 'name': str(uuid.uuid4()),
- 'source id': str(connectable.uuid),
- 'source relationship name': conn_name,
- 'destination id': str(conn_procs.uuid)
- })
- if conn_procs not in visited:
- minifi_flow_yaml(conn_procs, res, visited)
-
- if root is None:
- return yaml.dump(res, default_flow_style=False)
-
-
-def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None):
- if visited is None:
- visited = []
-
- position = Element('position')
- position.set('x', '0.0')
- position.set('y', '0.0')
-
- comment = Element('comment')
- styles = Element('styles')
- bend_points = Element('bendPoints')
- label_index = Element('labelIndex')
- label_index.text = '1'
- z_index = Element('zIndex')
- z_index.text = '0'
-
- if root is None:
- res = Element('flowController')
- max_timer_driven_thread_count = Element('maxTimerDrivenThreadCount')
- max_timer_driven_thread_count.text = '10'
- res.append(max_timer_driven_thread_count)
- max_event_driven_thread_count = Element('maxEventDrivenThreadCount')
- max_event_driven_thread_count.text = '5'
- res.append(max_event_driven_thread_count)
- root_group = Element('rootGroup')
- root_group_id = Element('id')
- root_group_id_text = str(uuid.uuid4())
- root_group_id.text = root_group_id_text
- root_group.append(root_group_id)
- root_group_name = Element('name')
- root_group_name.text = root_group_id_text
- root_group.append(root_group_name)
- res.append(root_group)
- root_group.append(position)
- root_group.append(comment)
- res.append(Element('controllerServices'))
- res.append(Element('reportingTasks'))
- res.set('encoding-version', '1.2')
- else:
- res = root
-
- visited.append(connectable)
-
- if hasattr(connectable, 'name'):
- connectable_name_text = connectable.name
- else:
- connectable_name_text = str(connectable.uuid)
-
- if isinstance(connectable, InputPort):
- input_port = Element('inputPort')
-
- input_port_id = Element('id')
- input_port_id.text = str(connectable.uuid)
- input_port.append(input_port_id)
-
- input_port_name = Element('name')
- input_port_name.text = connectable_name_text
- input_port.append(input_port_name)
-
- input_port.append(position)
- input_port.append(comment)
-
- input_port_scheduled_state = Element('scheduledState')
- input_port_scheduled_state.text = 'RUNNING'
- input_port.append(input_port_scheduled_state)
-
- input_port_max_concurrent_tasks = Element('maxConcurrentTasks')
- input_port_max_concurrent_tasks.text = '1'
- input_port.append(input_port_max_concurrent_tasks)
- next( res.iterfind('rootGroup') ).append(input_port)
-
- if isinstance(connectable, Processor):
- conn_destination = Element('processor')
-
- proc_id = Element('id')
- proc_id.text = str(connectable.uuid)
- conn_destination.append(proc_id)
-
- proc_name = Element('name')
- proc_name.text = connectable_name_text
- conn_destination.append(proc_name)
-
- conn_destination.append(position)
- conn_destination.append(styles)
- conn_destination.append(comment)
-
- proc_class = Element('class')
- proc_class.text = 'org.apache.nifi.processors.standard.' +
connectable.clazz
- conn_destination.append(proc_class)
-
- proc_bundle = Element('bundle')
- proc_bundle_group = Element('group')
- proc_bundle_group.text = 'org.apache.nifi'
- proc_bundle.append(proc_bundle_group)
- proc_bundle_artifact = Element('artifact')
- proc_bundle_artifact.text = 'nifi-standard-nar'
- proc_bundle.append(proc_bundle_artifact)
- proc_bundle_version = Element('version')
- proc_bundle_version.text = nifi_version
- proc_bundle.append(proc_bundle_version)
- conn_destination.append(proc_bundle)
-
- proc_max_concurrent_tasks = Element('maxConcurrentTasks')
- proc_max_concurrent_tasks.text = '1'
- conn_destination.append(proc_max_concurrent_tasks)
-
- proc_scheduling_period = Element('schedulingPeriod')
- proc_scheduling_period.text = connectable.schedule['scheduling period']
- conn_destination.append(proc_scheduling_period)
-
- proc_penalization_period = Element('penalizationPeriod')
- proc_penalization_period.text = connectable.schedule['penalization
period']
- conn_destination.append(proc_penalization_period)
-
- proc_yield_period = Element('yieldPeriod')
- proc_yield_period.text = connectable.schedule['yield period']
- conn_destination.append(proc_yield_period)
-
- proc_bulletin_level = Element('bulletinLevel')
- proc_bulletin_level.text = 'WARN'
- conn_destination.append(proc_bulletin_level)
-
- proc_loss_tolerant = Element('lossTolerant')
- proc_loss_tolerant.text = 'false'
- conn_destination.append(proc_loss_tolerant)
-
- proc_scheduled_state = Element('scheduledState')
- proc_scheduled_state.text = 'RUNNING'
- conn_destination.append(proc_scheduled_state)
-
- proc_scheduling_strategy = Element('schedulingStrategy')
- proc_scheduling_strategy.text = connectable.schedule['scheduling
strategy']
- conn_destination.append(proc_scheduling_strategy)
-
- proc_execution_node = Element('executionNode')
- proc_execution_node.text = 'ALL'
- conn_destination.append(proc_execution_node)
-
- proc_run_duration_nanos = Element('runDurationNanos')
- proc_run_duration_nanos.text = str(connectable.schedule['run duration
nanos'])
- conn_destination.append(proc_run_duration_nanos)
-
- for property_key, property_value in connectable.properties.items():
- proc_property = Element('property')
- proc_property_name = Element('name')
- proc_property_name.text =
connectable.nifi_property_key(property_key)
- if not proc_property_name.text:
- continue
- proc_property.append(proc_property_name)
- proc_property_value = Element('value')
- proc_property_value.text = property_value
- proc_property.append(proc_property_value)
- conn_destination.append(proc_property)
-
- for auto_terminate_rel in connectable.auto_terminate:
- proc_auto_terminated_relationship =
Element('autoTerminatedRelationship')
- proc_auto_terminated_relationship.text = auto_terminate_rel
- conn_destination.append(proc_auto_terminated_relationship)
- next( res.iterfind('rootGroup') ).append(conn_destination)
- """ res.iterfind('rootGroup').next().append(conn_destination) """
-
- for svc in connectable.controller_services:
- if svc in visited:
- continue
-
- visited.append(svc)
- controller_service = Element('controllerService')
-
- controller_service_id = Element('id')
- controller_service_id.text = str(svc.id)
- controller_service.append(controller_service_id)
-
- controller_service_name = Element('name')
- controller_service_name.text = svc.name
- controller_service.append(controller_service_name)
-
- controller_service.append(comment)
-
- controller_service_class = Element('class')
- controller_service_class.text = svc.service_class,
- controller_service.append(controller_service_class)
-
- controller_service_bundle = Element('bundle')
- controller_service_bundle_group = Element('group')
- controller_service_bundle_group.text = svc.group
- controller_service_bundle.append(controller_service_bundle_group)
- controller_service_bundle_artifact = Element('artifact')
- controller_service_bundle_artifact.text = svc.artifact
-
controller_service_bundle.append(controller_service_bundle_artifact)
- controller_service_bundle_version = Element('version')
- controller_service_bundle_version.text = nifi_version
- controller_service_bundle.append(controller_service_bundle_version)
- controller_service.append(controller_service_bundle)
-
- controller_enabled = Element('enabled')
- controller_enabled.text = 'true',
- controller_service.append(controller_enabled)
-
- for property_key, property_value in svc.properties:
- controller_service_property = Element('property')
- controller_service_property_name = Element('name')
- controller_service_property_name.text = property_key
-
controller_service_property.append(controller_service_property_name)
- controller_service_property_value = Element('value')
- controller_service_property_value.text = property_value
-
controller_service_property.append(controller_service_property_value)
- controller_service.append(controller_service_property)
- next( res.iterfind('rootGroup') ).append(controller_service)
- """ res.iterfind('rootGroup').next().append(controller_service)"""
-
- for conn_name in connectable.connections:
- conn_destinations = connectable.connections[conn_name]
-
- if isinstance(conn_destinations, list):
- for conn_destination in conn_destinations:
- connection = nifi_flow_xml_connection(res,
- bend_points,
- conn_name,
- connectable,
- label_index,
- conn_destination,
- z_index)
- next( res.iterfind('rootGroup') ).append(connection)
- """ res.iterfind('rootGroup').next().append(connection) """
-
- if conn_destination not in visited:
- nifi_flow_xml(conn_destination, nifi_version, res, visited)
- else:
- connection = nifi_flow_xml_connection(res,
- bend_points,
- conn_name,
- connectable,
- label_index,
- conn_destinations,
- z_index)
- next( res.iterfind('rootGroup') ).append(connection)
- """ res.iterfind('rootGroup').next().append(connection) """
-
- if conn_destinations not in visited:
- nifi_flow_xml(conn_destinations, nifi_version, res, visited)
-
- if root is None:
- return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>'
- + "\n"
- + elementTree.tostring(res, encoding='utf-8').decode('utf-8'))
-
-
-def nifi_flow_xml_connection(res, bend_points, conn_name, connectable,
label_index, destination, z_index):
- connection = Element('connection')
-
- connection_id = Element('id')
- connection_id.text = str(uuid.uuid4())
- connection.append(connection_id)
-
- connection_name = Element('name')
- connection.append(connection_name)
-
- connection.append(bend_points)
- connection.append(label_index)
- connection.append(z_index)
-
- connection_source_id = Element('sourceId')
- connection_source_id.text = str(connectable.uuid)
- connection.append(connection_source_id)
-
- connection_source_group_id = Element('sourceGroupId')
- connection_source_group_id.text = next( res.iterfind('rootGroup/id') ).text
- """connection_source_group_id.text =
res.iterfind('rootGroup/id').next().text"""
- connection.append(connection_source_group_id)
-
- connection_source_type = Element('sourceType')
- if isinstance(connectable, Processor):
- connection_source_type.text = 'PROCESSOR'
- elif isinstance(connectable, InputPort):
- connection_source_type.text = 'INPUT_PORT'
- else:
- raise Exception('Unexpected source type: %s' % type(connectable))
- connection.append(connection_source_type)
-
- connection_destination_id = Element('destinationId')
- connection_destination_id.text = str(destination.uuid)
- connection.append(connection_destination_id)
-
- connection_destination_group_id = Element('destinationGroupId')
- connection_destination_group_id.text =
next(res.iterfind('rootGroup/id')).text
- """ connection_destination_group_id.text =
res.iterfind('rootGroup/id').next().text """
- connection.append(connection_destination_group_id)
-
- connection_destination_type = Element('destinationType')
- if isinstance(destination, Processor):
- connection_destination_type.text = 'PROCESSOR'
- elif isinstance(destination, InputPort):
- connection_destination_type.text = 'INPUT_PORT'
- else:
- raise Exception('Unexpected destination type: %s' % type(destination))
- connection.append(connection_destination_type)
-
- connection_relationship = Element('relationship')
- if not isinstance(connectable, InputPort):
- connection_relationship.text = conn_name
- connection.append(connection_relationship)
-
- connection_max_work_queue_size = Element('maxWorkQueueSize')
- connection_max_work_queue_size.text = '10000'
- connection.append(connection_max_work_queue_size)
-
- connection_max_work_queue_data_size = Element('maxWorkQueueDataSize')
- connection_max_work_queue_data_size.text = '1 GB'
- connection.append(connection_max_work_queue_data_size)
-
- connection_flow_file_expiration = Element('flowFileExpiration')
- connection_flow_file_expiration.text = '0 sec'
- connection.append(connection_flow_file_expiration)
-
- return connection
diff --git a/docker/test/integration/minifi/core/Cluster.py
b/docker/test/integration/minifi/core/Cluster.py
new file mode 100644
index 0000000..0818f37
--- /dev/null
+++ b/docker/test/integration/minifi/core/Cluster.py
@@ -0,0 +1,22 @@
+class Cluster(object):
+ """
+ Base Cluster class. This is intended to be a generic interface
+ to different types of clusters. Clusters could be Kubernetes clusters,
+ Docker swarms, or cloud compute/container services.
+ """
+
+ def deploy_flow(self, flow, name=None, vols=None):
+ """
+ Deploys a flow to the cluster.
+ """
+
+ def __enter__(self):
+ """
+ Allocate ephemeral cluster resources.
+ """
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """
+ Clean up ephemeral cluster resources.
+ """
diff --git a/docker/test/integration/minifi/core/Connectable.py
b/docker/test/integration/minifi/core/Connectable.py
new file mode 100644
index 0000000..c3472a7
--- /dev/null
+++ b/docker/test/integration/minifi/core/Connectable.py
@@ -0,0 +1,77 @@
+import uuid
+from copy import copy
+
+class Connectable(object):
+ def __init__(self,
+ name=None,
+ auto_terminate=None):
+
+ self.uuid = uuid.uuid4()
+
+ if name is None:
+ self.name = str(self.uuid)
+ else:
+ self.name = name
+
+ if auto_terminate is None:
+ self.auto_terminate = []
+ else:
+ self.auto_terminate = auto_terminate
+
+ self.connections = {}
+ self.out_proc = self
+
+ self.drop_empty_flowfiles = False
+
+ def connect(self, connections):
+ for rel in connections:
+
+ # Ensure that rel is not auto-terminated
+ if rel in self.auto_terminate:
+ del self.auto_terminate[self.auto_terminate.index(rel)]
+
+ # Add to set of output connections for this rel
+ if rel not in self.connections:
+ self.connections[rel] = []
+ self.connections[rel].append(connections[rel])
+
+ return self
+
+ def __rshift__(self, other):
+ """
+ Right shift operator to support flow DSL, for example:
+
+ GetFile('/input') >> LogAttribute() >> PutFile('/output')
+
+ """
+
+ connected = copy(self)
+ connected.connections = copy(self.connections)
+
+ if self.out_proc is self:
+ connected.out_proc = connected
+ else:
+ connected.out_proc = copy(connected.out_proc)
+
+ if isinstance(other, tuple):
+ if isinstance(other[0], tuple):
+ for rel_tuple in other:
+ rel = {rel_tuple[0]: rel_tuple[1]}
+ connected.out_proc.connect(rel)
+ else:
+ rel = {other[0]: other[1]}
+ connected.out_proc.connect(rel)
+ else:
+ connected.out_proc.connect({'success': other})
+ connected.out_proc = other
+
+ return connected
+
+ def __invert__(self):
+ """
+ Invert operation to set empty file filtering on incoming connections
+ GetFile('/input') >> ~LogAttribute()
+ """
+ self.drop_empty_flowfiles = True
+
+ return self
diff --git a/docker/test/integration/minifi/core/ControllerService.py
b/docker/test/integration/minifi/core/ControllerService.py
new file mode 100644
index 0000000..d8b4e17
--- /dev/null
+++ b/docker/test/integration/minifi/core/ControllerService.py
@@ -0,0 +1,17 @@
+import uuid
+
+class ControllerService(object):
+ def __init__(self, name=None, properties=None):
+
+ self.id = str(uuid.uuid4())
+
+ if name is None:
+ self.name = str(uuid.uuid4())
+ logging.info('Controller service name was not provided; using
generated name \'%s\'', self.name)
+ else:
+ self.name = name
+
+ if properties is None:
+ properties = {}
+
+ self.properties = properties
diff --git a/docker/test/integration/minifi/test/__init__.py
b/docker/test/integration/minifi/core/DockerTestCluster.py
similarity index 66%
rename from docker/test/integration/minifi/test/__init__.py
rename to docker/test/integration/minifi/core/DockerTestCluster.py
index bf64472..1742f83 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -1,45 +1,20 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
+import json
import logging
+import os
import shutil
-import uuid
-import tarfile
import subprocess
import sys
import time
-import subprocess
-import json
-from io import BytesIO
-from threading import Event
+import uuid
-import os
-from os import listdir
from os.path import join
+from threading import Event
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
-from minifi import SingleNodeDockerCluster
-
-logging.basicConfig(level=logging.DEBUG)
-
-def put_file_contents(contents, file_abs_path):
- logging.info('Writing %d bytes of content to file: %s', len(contents),
file_abs_path)
- with open(file_abs_path, 'wb') as test_input_file:
- test_input_file.write(contents)
-
+from .OutputEventHandler import OutputEventHandler
+from .SingleNodeDockerCluster import SingleNodeDockerCluster
+from ..validators.FileOutputValidator import FileOutputValidator
class DockerTestCluster(SingleNodeDockerCluster):
def __init__(self, output_validator):
@@ -134,7 +109,7 @@ class DockerTestCluster(SingleNodeDockerCluster):
self.test_data = contents
file_name = str(uuid.uuid4())
file_abs_path = join(self.tmp_test_input_dir, file_name)
- put_file_contents(contents.encode('utf-8'), file_abs_path)
+ self.put_file_contents(contents.encode('utf-8'), file_abs_path)
def put_test_resource(self, file_name, contents):
"""
@@ -143,7 +118,7 @@ class DockerTestCluster(SingleNodeDockerCluster):
"""
file_abs_path = join(self.tmp_test_resources_dir, file_name)
- put_file_contents(contents, file_abs_path)
+ self.put_file_contents(contents, file_abs_path)
def restart_observer_if_needed(self):
if self.observer.is_alive():
@@ -238,6 +213,12 @@ class DockerTestCluster(SingleNodeDockerCluster):
check_count += 1
time.sleep(1)
return False
+
+ def put_file_contents(self, contents, file_abs_path):
+ logging.info('Writing %d bytes of content to file: %s', len(contents),
file_abs_path)
+ with open(file_abs_path, 'wb') as test_input_file:
+ test_input_file.write(contents)
+
def __exit__(self, exc_type, exc_val, exc_tb):
"""
@@ -252,132 +233,3 @@ class DockerTestCluster(SingleNodeDockerCluster):
shutil.rmtree(self.tmp_test_resources_dir)
super(DockerTestCluster, self).__exit__(exc_type, exc_val, exc_tb)
-
-
-class OutputEventHandler(FileSystemEventHandler):
- def __init__(self, validator, done_event):
- self.validator = validator
- self.done_event = done_event
-
- def on_created(self, event):
- logging.info('Output file created: ' + event.src_path)
- self.check(event)
-
- def on_modified(self, event):
- logging.info('Output file modified: ' + event.src_path)
- self.check(event)
-
- def check(self, event):
- if self.validator.validate():
- logging.info('Output file is valid')
- self.done_event.set()
- else:
- logging.info('Output file is invalid')
-
-
-class OutputValidator(object):
- """
- Base output validator class. Validators must implement
- method validate, which returns a boolean.
- """
-
- def validate(self):
- """
- Return True if output is valid; False otherwise.
- """
- raise NotImplementedError("validate function needs to be implemented
for validators")
-
-
-
-class FileOutputValidator(OutputValidator):
- def set_output_dir(self, output_dir):
- self.output_dir = output_dir
-
- def validate(self, dir=''):
- pass
-
-class SingleFileOutputValidator(FileOutputValidator):
- """
- Validates the content of a single file in the given directory.
- """
-
- def __init__(self, expected_content, subdir=''):
- self.valid = False
- self.expected_content = expected_content
- self.subdir = subdir
-
- def validate(self):
- self.valid = False
- full_dir = os.path.join(self.output_dir, self.subdir)
- logging.info("Output folder: %s", full_dir)
-
- if not os.path.isdir(full_dir):
- return self.valid
-
- listing = listdir(full_dir)
- if listing:
- for l in listing:
- logging.info("name:: %s", l)
- out_file_name = listing[0]
- full_path = join(full_dir, out_file_name)
- if not os.path.isfile(full_path):
- return self.valid
-
- with open(full_path, 'r') as out_file:
- contents = out_file.read()
- logging.info("dir %s -- name %s", full_dir, out_file_name)
- logging.info("expected %s -- content %s",
self.expected_content, contents)
-
- if self.expected_content in contents:
- self.valid = True
-
- return self.valid
-
-
-class EmptyFilesOutPutValidator(FileOutputValidator):
- """
- Validates if all the files in the target directory are empty and at least
one exists
- """
- def __init__(self):
- self.valid = False
-
- def validate(self, dir=''):
-
- if self.valid:
- return True
-
- full_dir = self.output_dir + dir
- logging.info("Output folder: %s", full_dir)
- listing = listdir(full_dir)
- if listing:
- self.valid = all(os.path.getsize(os.path.join(full_dir,x)) == 0
for x in listing)
-
- return self.valid
-
-class NoFileOutPutValidator(FileOutputValidator):
- """
- Validates if no flowfiles were transferred
- """
- def __init__(self):
- self.valid = False
-
- def validate(self, dir=''):
-
- if self.valid:
- return True
-
- full_dir = self.output_dir + dir
- logging.info("Output folder: %s", full_dir)
- listing = listdir(full_dir)
-
- self.valid = not bool(listing)
-
- return self.valid
-
-
-class SegfaultValidator(OutputValidator):
- """
- Validate that a file was received.
- """
- def validate(self):
- return True
diff --git a/docker/test/integration/minifi/core/InputPort.py
b/docker/test/integration/minifi/core/InputPort.py
new file mode 100644
index 0000000..1f00b7f
--- /dev/null
+++ b/docker/test/integration/minifi/core/InputPort.py
@@ -0,0 +1,7 @@
+from .Connectable import Connectable
+
+class InputPort(Connectable):
+ def __init__(self, name=None, remote_process_group=None):
+ super(InputPort, self).__init__(name=name)
+
+ self.remote_process_group = remote_process_group
diff --git a/docker/test/integration/minifi/core/OutputEventHandler.py
b/docker/test/integration/minifi/core/OutputEventHandler.py
new file mode 100644
index 0000000..3d4c984
--- /dev/null
+++ b/docker/test/integration/minifi/core/OutputEventHandler.py
@@ -0,0 +1,24 @@
+import logging
+
+from watchdog.events import FileSystemEventHandler
+
+class OutputEventHandler(FileSystemEventHandler):
+ def __init__(self, validator, done_event):
+ self.validator = validator
+ self.done_event = done_event
+
+ def on_created(self, event):
+ logging.info('Output file created: ' + event.src_path)
+ self.check(event)
+
+ def on_modified(self, event):
+ logging.info('Output file modified: ' + event.src_path)
+ self.check(event)
+
+ def check(self, event):
+ if self.validator.validate():
+ logging.info('Output file is valid')
+ self.done_event.set()
+ else:
+ logging.info('Output file is invalid')
+
diff --git a/docker/test/integration/minifi/core/Processor.py
b/docker/test/integration/minifi/core/Processor.py
new file mode 100644
index 0000000..e7e41e5
--- /dev/null
+++ b/docker/test/integration/minifi/core/Processor.py
@@ -0,0 +1,45 @@
+from .Connectable import Connectable
+
+class Processor(Connectable):
+ def __init__(self,
+ clazz,
+ properties=None,
+ schedule=None,
+ name=None,
+ controller_services=None,
+ auto_terminate=None):
+
+ super(Processor, self).__init__(name=name,
+ auto_terminate=auto_terminate)
+
+ if controller_services is None:
+ controller_services = []
+
+ if schedule is None:
+ schedule = {}
+
+ if properties is None:
+ properties = {}
+
+ if name is None:
+ pass
+
+ self.clazz = clazz
+ self.properties = properties
+ self.controller_services = controller_services
+
+ self.schedule = {
+ 'scheduling strategy': 'TIMER_DRIVEN',
+ 'scheduling period': '1 sec',
+ 'penalization period': '30 sec',
+ 'yield period': '1 sec',
+ 'run duration nanos': 0
+ }
+ self.schedule.update(schedule)
+
+ def nifi_property_key(self, key):
+ """
+ Returns the Apache NiFi-equivalent property key for the given key.
This is often, but not always, the same as
+ the internal key.
+ """
+ return key
diff --git a/docker/test/integration/minifi/core/RemoteProcessGroup.py
b/docker/test/integration/minifi/core/RemoteProcessGroup.py
new file mode 100644
index 0000000..6901fb6
--- /dev/null
+++ b/docker/test/integration/minifi/core/RemoteProcessGroup.py
@@ -0,0 +1,13 @@
+import uuid
+
+class RemoteProcessGroup(object):
+ def __init__(self, url,
+ name=None):
+ self.uuid = uuid.uuid4()
+
+ if name is None:
+ self.name = str(self.uuid)
+ else:
+ self.name = name
+
+ self.url = url
diff --git a/docker/test/integration/minifi/core/SSLContextService.py
b/docker/test/integration/minifi/core/SSLContextService.py
new file mode 100644
index 0000000..5866508
--- /dev/null
+++ b/docker/test/integration/minifi/core/SSLContextService.py
@@ -0,0 +1,16 @@
+from .ControllerService import ControllerService
+
+class SSLContextService(ControllerService):
+ def __init__(self, name=None, cert=None, key=None, ca_cert=None):
+ super(SSLContextService, self).__init__(name=name)
+
+ self.service_class = 'SSLContextService'
+
+ if cert is not None:
+ self.properties['Client Certificate'] = cert
+
+ if key is not None:
+ self.properties['Private Key'] = key
+
+ if ca_cert is not None:
+ self.properties['CA Certificate'] = ca_cert
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
new file mode 100644
index 0000000..f48c288
--- /dev/null
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -0,0 +1,319 @@
+import gzip
+import docker
+import logging
+import os
+import tarfile
+import uuid
+
+from collections import OrderedDict
+from io import BytesIO
+from textwrap import dedent
+
+from .Cluster import Cluster
+from ..flow_serialization.Minifi_flow_yaml_serializer import
Minifi_flow_yaml_serializer
+from ..flow_serialization.Nifi_flow_xml_serializer import
Nifi_flow_xml_serializer
+
+class SingleNodeDockerCluster(Cluster):
+ """
+ A "cluster" which consists of a single docker node. Useful for
+ testing or use-cases which do not span multiple compute nodes.
+ """
+
+ def __init__(self):
+ self.minifi_version = os.environ['MINIFI_VERSION']
+ self.nifi_version = '1.7.0'
+ self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version
+ self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version
+ self.network = None
+ self.containers = OrderedDict()
+ self.images = []
+ self.tmp_files = []
+
+ # Get docker client
+ self.client = docker.from_env()
+
+ def deploy_flow(self,
+ flow,
+ name=None,
+ vols=None,
+ engine='minifi-cpp'):
+ """
+ Compiles the flow to a valid config file and overlays it into a new
image.
+ """
+
+ if vols is None:
+ vols = {}
+
+ logging.info('Deploying %s flow...%s', engine,name)
+
+ if name is None:
+ name = engine + '-' + str(uuid.uuid4())
+ logging.info('Flow name was not provided; using generated name
\'%s\'', name)
+
+ # Create network if necessary
+ if self.network is None:
+ net_name = 'nifi-' + str(uuid.uuid4())
+ logging.info('Creating network: %s', net_name)
+ self.network = self.client.networks.create(net_name)
+
+ if engine == 'nifi':
+ self.deploy_nifi_flow(flow, name, vols)
+ elif engine == 'minifi-cpp':
+ self.deploy_minifi_cpp_flow(flow, name, vols)
+ elif engine == 'kafka-broker':
+ self.deploy_kafka_broker(name)
+ elif engine == 'http-proxy':
+ self.deploy_http_proxy()
+ elif engine == 's3-server':
+ self.deploy_s3_server()
+ else:
+ raise Exception('invalid flow engine: \'%s\'' % engine)
+
+ def deploy_minifi_cpp_flow(self, flow, name, vols):
+
+ # Build configured image
+ dockerfile = dedent("""FROM {base_image}
+ USER root
+ ADD config.yml {minifi_root}/conf/config.yml
+ RUN chown minificpp:minificpp {minifi_root}/conf/config.yml
+ USER minificpp
+ """.format(name=name,hostname=name,
+ base_image='apacheminificpp:' + self.minifi_version,
+ minifi_root=self.minifi_root))
+
+ serializer = Minifi_flow_yaml_serializer()
+ test_flow_yaml = serializer.serialize(flow)
+ logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
+
+ conf_file_buffer = BytesIO()
+
+ try:
+ conf_file_buffer.write(test_flow_yaml.encode('utf-8'))
+ conf_file_len = conf_file_buffer.tell()
+ conf_file_buffer.seek(0)
+
+ context_files = [
+ {
+ 'name': 'config.yml',
+ 'size': conf_file_len,
+ 'file_obj': conf_file_buffer
+ }
+ ]
+
+ configured_image = self.build_image(dockerfile, context_files)
+
+ finally:
+ conf_file_buffer.close()
+
+ logging.info('Creating and running docker container for flow...')
+
+ container = self.client.containers.run(
+ configured_image[0],
+ detach=True,
+ name=name,
+ network=self.network.name,
+ volumes=vols)
+
+ logging.info('Started container \'%s\'', container.name)
+
+ self.containers[container.name] = container
+
+ def deploy_nifi_flow(self, flow, name, vols):
+ dockerfile = dedent(r"""FROM {base_image}
+ USER root
+ ADD flow.xml.gz {nifi_root}/conf/flow.xml.gz
+ RUN chown nifi:nifi {nifi_root}/conf/flow.xml.gz
+ RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/'
{nifi_root}/conf/nifi.properties
+ RUN sed -i -e
's/^\(nifi.remote.input.socket.port\)=.*/\1=5000/'
{nifi_root}/conf/nifi.properties
+ USER nifi
+ """.format(name=name,
+ base_image='apache/nifi:' + self.nifi_version,
+ nifi_root=self.nifi_root))
+
+ serializer = Nifi_flow_xml_serializer()
+ test_flow_xml = serializer.serialize(flow, self.nifi_version)
+ logging.info('Using generated flow config xml:\n%s', test_flow_xml)
+
+ conf_file_buffer = BytesIO()
+
+ try:
+ with gzip.GzipFile(mode='wb', fileobj=conf_file_buffer) as
conf_gz_file_buffer:
+ conf_gz_file_buffer.write(test_flow_xml.encode())
+ conf_file_len = conf_file_buffer.tell()
+ conf_file_buffer.seek(0)
+
+ context_files = [
+ {
+ 'name': 'flow.xml.gz',
+ 'size': conf_file_len,
+ 'file_obj': conf_file_buffer
+ }
+ ]
+
+ configured_image = self.build_image(dockerfile, context_files)
+
+ finally:
+ conf_file_buffer.close()
+
+ logging.info('Creating and running docker container for flow...')
+
+ container = self.client.containers.run(
+ configured_image[0],
+ detach=True,
+ name=name,
+ hostname=name,
+ network=self.network.name,
+ volumes=vols)
+
+ logging.info('Started container \'%s\'', container.name)
+
+ self.containers[container.name] = container
+
+ def deploy_kafka_broker(self, name):
+ logging.info('Creating and running docker containers for kafka
broker...')
+ zookeeper = self.client.containers.run(
+ self.client.images.pull("wurstmeister/zookeeper:latest"),
+ detach=True,
+ name='zookeeper',
+ network=self.network.name,
+ ports={'2181/tcp': 2181},
+ )
+ self.containers[zookeeper.name] = zookeeper
+
+ test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on
DockerVerify.sh
+ broker_image = self.build_image_by_path(test_dir +
"/resources/kafka_broker", 'minifi-kafka')
+ broker = self.client.containers.run(
+ broker_image[0],
+ detach=True,
+ name='kafka-broker',
+ network=self.network.name,
+ ports={'9092/tcp': 9092},
+
environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093",
"KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"],
+ )
+ self.containers[broker.name] = broker
+
+ dockerfile = dedent("""FROM {base_image}
+ USER root
+ CMD $KAFKA_HOME/bin/kafka-console-consumer.sh
--bootstrap-server kafka-broker:9092 --topic test > heaven_signal.txt
+ """.format(base_image='wurstmeister/kafka:2.12-2.5.0'))
+ configured_image = self.build_image(dockerfile, [])
+ consumer = self.client.containers.run(
+ configured_image[0],
+ detach=True,
+ name='kafka-consumer',
+ network=self.network.name,
+ )
+ self.containers[consumer.name] = consumer
+
+ def deploy_http_proxy(self):
+ logging.info('Creating and running http-proxy docker container...')
+ dockerfile = dedent("""FROM {base_image}
+ RUN apt update && apt install -y apache2-utils
+ RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username}
{proxy_password}
+ RUN echo 'auth_param basic program
/usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users' >
/etc/squid/squid.conf && \
+ echo 'auth_param basic realm proxy' >>
/etc/squid/squid.conf && \
+ echo 'acl authenticated proxy_auth REQUIRED' >>
/etc/squid/squid.conf && \
+ echo 'http_access allow authenticated' >>
/etc/squid/squid.conf && \
+ echo 'http_port {proxy_port}' >> /etc/squid/squid.conf
+ ENTRYPOINT ["/sbin/entrypoint.sh"]
+ """.format(base_image='sameersbn/squid:3.5.27-2',
proxy_username='admin', proxy_password='test101', proxy_port='3128'))
+ configured_image = self.build_image(dockerfile, [])
+ consumer = self.client.containers.run(
+ configured_image[0],
+ detach=True,
+ name='http-proxy',
+ network=self.network.name,
+ ports={'3128/tcp': 3128},
+ )
+ self.containers[consumer.name] = consumer
+
+ def deploy_s3_server(self):
+ consumer = self.client.containers.run(
+ "adobe/s3mock:2.1.28",
+ detach=True,
+ name='s3-server',
+ network=self.network.name,
+ ports={'9090/tcp': 9090, '9191/tcp': 9191},
+ environment=["initialBuckets=test_bucket"],
+ )
+ self.containers[consumer.name] = consumer
+
+ def build_image(self, dockerfile, context_files):
+ conf_dockerfile_buffer = BytesIO()
+ docker_context_buffer = BytesIO()
+
+ try:
+ # Overlay conf onto base nifi image
+ conf_dockerfile_buffer.write(dockerfile.encode())
+ conf_dockerfile_buffer.seek(0)
+
+ with tarfile.open(mode='w', fileobj=docker_context_buffer) as
docker_context:
+ dockerfile_info = tarfile.TarInfo('Dockerfile')
+ dockerfile_info.size =
conf_dockerfile_buffer.getbuffer().nbytes
+ docker_context.addfile(dockerfile_info,
+ fileobj=conf_dockerfile_buffer)
+
+ for context_file in context_files:
+ file_info = tarfile.TarInfo(context_file['name'])
+ file_info.size = context_file['size']
+ docker_context.addfile(file_info,
+ fileobj=context_file['file_obj'])
+ docker_context_buffer.seek(0)
+
+ logging.info('Creating configured image...')
+ configured_image =
self.client.images.build(fileobj=docker_context_buffer,
+ custom_context=True,
+ rm=True,
+ forcerm=True)
+ logging.info('Created image with id: %s', configured_image[0].id)
+ self.images.append(configured_image)
+
+ finally:
+ conf_dockerfile_buffer.close()
+ docker_context_buffer.close()
+
+ return configured_image
+
+ def build_image_by_path(self, dir, name=None):
+ try:
+ logging.info('Creating configured image...')
+ configured_image = self.client.images.build(path=dir,
+ tag=name,
+ rm=True,
+ forcerm=True)
+ logging.info('Created image with id: %s', configured_image[0].id)
+ self.images.append(configured_image)
+ return configured_image
+ except Exception as e:
+ logging.info(e)
+ raise
+
+ def __enter__(self):
+ """
+ Allocate ephemeral cluster resources.
+ """
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ """
+ Clean up ephemeral cluster resources
+ """
+
+ # Clean up containers
+ for container in self.containers.values():
+ logging.info('Cleaning up container: %s', container.name)
+ container.remove(v=True, force=True)
+
+ # Clean up images
+ for image in reversed(self.images):
+ logging.info('Cleaning up image: %s', image[0].id)
+ self.client.images.remove(image[0].id, force=True)
+
+ # Clean up network
+ if self.network is not None:
+ logging.info('Cleaning up network network: %s', self.network.name)
+ self.network.remove()
+
+ # Clean up tmp files
+ for tmp_file in self.tmp_files:
+ os.remove(tmp_file)
diff --git a/docker/test/integration/minifi/core/__init__.py
b/docker/test/integration/minifi/core/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git
a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
new file mode 100644
index 0000000..c7a9e0d
--- /dev/null
+++
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
@@ -0,0 +1,111 @@
+import uuid
+import yaml
+
+from ..core.Processor import Processor
+from ..core.InputPort import InputPort
+
+class Minifi_flow_yaml_serializer:
+ def serialize(self, connectable, root=None, visited=None):
+ if visited is None:
+ visited = []
+
+ if root is None:
+ res = {
+ 'Flow Controller': {
+ 'name': 'MiNiFi Flow'
+ },
+ 'Processors': [],
+ 'Connections': [],
+ 'Remote Processing Groups': [],
+ 'Controller Services': []
+ }
+ else:
+ res = root
+
+ visited.append(connectable)
+
+ if hasattr(connectable, 'name'):
+ connectable_name = connectable.name
+ else:
+ connectable_name = str(connectable.uuid)
+
+ if isinstance(connectable, InputPort):
+ group = connectable.remote_process_group
+ res_group = None
+
+ for res_group_candidate in res['Remote Processing Groups']:
+ assert isinstance(res_group_candidate, dict)
+ if res_group_candidate['id'] == str(group.uuid):
+ res_group = res_group_candidate
+
+ if res_group is None:
+ res_group = {
+ 'name': group.name,
+ 'id': str(group.uuid),
+ 'url': group.url,
+ 'timeout': '30 sec',
+ 'yield period': '3 sec',
+ 'Input Ports': []
+ }
+
+ res['Remote Processing Groups'].append(res_group)
+
+ res_group['Input Ports'].append({
+ 'id': str(connectable.uuid),
+ 'name': connectable.name,
+ 'max concurrent tasks': 1,
+ 'Properties': {}
+ })
+
+ if isinstance(connectable, Processor):
+ res['Processors'].append({
+ 'name': connectable_name,
+ 'id': str(connectable.uuid),
+ 'class': 'org.apache.nifi.processors.standard.' +
connectable.clazz,
+ 'scheduling strategy': connectable.schedule['scheduling
strategy'],
+ 'scheduling period': connectable.schedule['scheduling period'],
+ 'penalization period': connectable.schedule['penalization
period'],
+ 'yield period': connectable.schedule['yield period'],
+ 'run duration nanos': connectable.schedule['run duration
nanos'],
+ 'Properties': connectable.properties,
+ 'auto-terminated relationships list':
connectable.auto_terminate
+ })
+
+ for svc in connectable.controller_services:
+ if svc in visited:
+ continue
+
+ visited.append(svc)
+ res['Controller Services'].append({
+ 'name': svc.name,
+ 'id': svc.id,
+ 'class': svc.service_class,
+ 'Properties': svc.properties
+ })
+
+ for conn_name in connectable.connections:
+ conn_procs = connectable.connections[conn_name]
+
+ if isinstance(conn_procs, list):
+ for proc in conn_procs:
+ res['Connections'].append({
+ 'name': str(uuid.uuid4()),
+ 'source id': str(connectable.uuid),
+ 'source relationship name': conn_name,
+ 'destination id': str(proc.uuid),
+ 'drop empty': ("true" if proc.drop_empty_flowfiles
else "false")
+ })
+ if proc not in visited:
+ self.serialize(proc, res, visited)
+ else:
+ res['Connections'].append({
+ 'name': str(uuid.uuid4()),
+ 'source id': str(connectable.uuid),
+ 'source relationship name': conn_name,
+ 'destination id': str(conn_procs.uuid)
+ })
+ if conn_procs not in visited:
+ self.serialize(conn_procs, res, visited)
+
+ if root is None:
+ return yaml.dump(res, default_flow_style=False)
diff --git
a/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py
b/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py
new file mode 100644
index 0000000..22fa098
--- /dev/null
+++
b/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py
@@ -0,0 +1,324 @@
+import uuid
+
+import xml.etree.cElementTree as elementTree
+from xml.etree.cElementTree import Element
+
+from ..core.Processor import Processor
+from ..core.InputPort import InputPort
+
+class Nifi_flow_xml_serializer:
+ def serialize(self, connectable, nifi_version=None, root=None,
visited=None):
+ if visited is None:
+ visited = []
+
+ position = Element('position')
+ position.set('x', '0.0')
+ position.set('y', '0.0')
+
+ comment = Element('comment')
+ styles = Element('styles')
+ bend_points = Element('bendPoints')
+ label_index = Element('labelIndex')
+ label_index.text = '1'
+ z_index = Element('zIndex')
+ z_index.text = '0'
+
+ if root is None:
+ res = Element('flowController')
+ max_timer_driven_thread_count =
Element('maxTimerDrivenThreadCount')
+ max_timer_driven_thread_count.text = '10'
+ res.append(max_timer_driven_thread_count)
+ max_event_driven_thread_count =
Element('maxEventDrivenThreadCount')
+ max_event_driven_thread_count.text = '5'
+ res.append(max_event_driven_thread_count)
+ root_group = Element('rootGroup')
+ root_group_id = Element('id')
+ root_group_id_text = str(uuid.uuid4())
+ root_group_id.text = root_group_id_text
+ root_group.append(root_group_id)
+ root_group_name = Element('name')
+ root_group_name.text = root_group_id_text
+ root_group.append(root_group_name)
+ res.append(root_group)
+ root_group.append(position)
+ root_group.append(comment)
+ res.append(Element('controllerServices'))
+ res.append(Element('reportingTasks'))
+ res.set('encoding-version', '1.2')
+ else:
+ res = root
+
+ visited.append(connectable)
+
+ if hasattr(connectable, 'name'):
+ connectable_name_text = connectable.name
+ else:
+ connectable_name_text = str(connectable.uuid)
+
+ if isinstance(connectable, InputPort):
+ input_port = Element('inputPort')
+
+ input_port_id = Element('id')
+ input_port_id.text = str(connectable.uuid)
+ input_port.append(input_port_id)
+
+ input_port_name = Element('name')
+ input_port_name.text = connectable_name_text
+ input_port.append(input_port_name)
+
+ input_port.append(position)
+ input_port.append(comment)
+
+ input_port_scheduled_state = Element('scheduledState')
+ input_port_scheduled_state.text = 'RUNNING'
+ input_port.append(input_port_scheduled_state)
+
+ input_port_max_concurrent_tasks = Element('maxConcurrentTasks')
+ input_port_max_concurrent_tasks.text = '1'
+ input_port.append(input_port_max_concurrent_tasks)
+ next( res.iterfind('rootGroup') ).append(input_port)
+
+ if isinstance(connectable, Processor):
+ conn_destination = Element('processor')
+
+ proc_id = Element('id')
+ proc_id.text = str(connectable.uuid)
+ conn_destination.append(proc_id)
+
+ proc_name = Element('name')
+ proc_name.text = connectable_name_text
+ conn_destination.append(proc_name)
+
+ conn_destination.append(position)
+ conn_destination.append(styles)
+ conn_destination.append(comment)
+
+ proc_class = Element('class')
+ proc_class.text = 'org.apache.nifi.processors.standard.' +
connectable.clazz
+ conn_destination.append(proc_class)
+
+ proc_bundle = Element('bundle')
+ proc_bundle_group = Element('group')
+ proc_bundle_group.text = 'org.apache.nifi'
+ proc_bundle.append(proc_bundle_group)
+ proc_bundle_artifact = Element('artifact')
+ proc_bundle_artifact.text = 'nifi-standard-nar'
+ proc_bundle.append(proc_bundle_artifact)
+ proc_bundle_version = Element('version')
+ proc_bundle_version.text = nifi_version
+ proc_bundle.append(proc_bundle_version)
+ conn_destination.append(proc_bundle)
+
+ proc_max_concurrent_tasks = Element('maxConcurrentTasks')
+ proc_max_concurrent_tasks.text = '1'
+ conn_destination.append(proc_max_concurrent_tasks)
+
+ proc_scheduling_period = Element('schedulingPeriod')
+ proc_scheduling_period.text = connectable.schedule['scheduling
period']
+ conn_destination.append(proc_scheduling_period)
+
+ proc_penalization_period = Element('penalizationPeriod')
+ proc_penalization_period.text = connectable.schedule['penalization
period']
+ conn_destination.append(proc_penalization_period)
+
+ proc_yield_period = Element('yieldPeriod')
+ proc_yield_period.text = connectable.schedule['yield period']
+ conn_destination.append(proc_yield_period)
+
+ proc_bulletin_level = Element('bulletinLevel')
+ proc_bulletin_level.text = 'WARN'
+ conn_destination.append(proc_bulletin_level)
+
+ proc_loss_tolerant = Element('lossTolerant')
+ proc_loss_tolerant.text = 'false'
+ conn_destination.append(proc_loss_tolerant)
+
+ proc_scheduled_state = Element('scheduledState')
+ proc_scheduled_state.text = 'RUNNING'
+ conn_destination.append(proc_scheduled_state)
+
+ proc_scheduling_strategy = Element('schedulingStrategy')
+ proc_scheduling_strategy.text = connectable.schedule['scheduling
strategy']
+ conn_destination.append(proc_scheduling_strategy)
+
+ proc_execution_node = Element('executionNode')
+ proc_execution_node.text = 'ALL'
+ conn_destination.append(proc_execution_node)
+
+ proc_run_duration_nanos = Element('runDurationNanos')
+ proc_run_duration_nanos.text = str(connectable.schedule['run
duration nanos'])
+ conn_destination.append(proc_run_duration_nanos)
+
+ for property_key, property_value in connectable.properties.items():
+ proc_property = Element('property')
+ proc_property_name = Element('name')
+ proc_property_name.text =
connectable.nifi_property_key(property_key)
+ if not proc_property_name.text:
+ continue
+ proc_property.append(proc_property_name)
+ proc_property_value = Element('value')
+ proc_property_value.text = property_value
+ proc_property.append(proc_property_value)
+ conn_destination.append(proc_property)
+
+ for auto_terminate_rel in connectable.auto_terminate:
+ proc_auto_terminated_relationship =
Element('autoTerminatedRelationship')
+ proc_auto_terminated_relationship.text = auto_terminate_rel
+ conn_destination.append(proc_auto_terminated_relationship)
+ next( res.iterfind('rootGroup') ).append(conn_destination)
+ """ res.iterfind('rootGroup').next().append(conn_destination) """
+
+ for svc in connectable.controller_services:
+ if svc in visited:
+ continue
+
+ visited.append(svc)
+ controller_service = Element('controllerService')
+
+ controller_service_id = Element('id')
+ controller_service_id.text = str(svc.id)
+ controller_service.append(controller_service_id)
+
+ controller_service_name = Element('name')
+ controller_service_name.text = svc.name
+ controller_service.append(controller_service_name)
+
+ controller_service.append(comment)
+
+ controller_service_class = Element('class')
+ controller_service_class.text = svc.service_class,
+ controller_service.append(controller_service_class)
+
+ controller_service_bundle = Element('bundle')
+ controller_service_bundle_group = Element('group')
+ controller_service_bundle_group.text = svc.group
+
controller_service_bundle.append(controller_service_bundle_group)
+ controller_service_bundle_artifact = Element('artifact')
+ controller_service_bundle_artifact.text = svc.artifact
+
controller_service_bundle.append(controller_service_bundle_artifact)
+ controller_service_bundle_version = Element('version')
+ controller_service_bundle_version.text = nifi_version
+
controller_service_bundle.append(controller_service_bundle_version)
+ controller_service.append(controller_service_bundle)
+
+ controller_enabled = Element('enabled')
+ controller_enabled.text = 'true',
+ controller_service.append(controller_enabled)
+
+ for property_key, property_value in svc.properties:
+ controller_service_property = Element('property')
+ controller_service_property_name = Element('name')
+ controller_service_property_name.text = property_key
+
controller_service_property.append(controller_service_property_name)
+ controller_service_property_value = Element('value')
+ controller_service_property_value.text = property_value
+
controller_service_property.append(controller_service_property_value)
+ controller_service.append(controller_service_property)
+ next( res.iterfind('rootGroup') ).append(controller_service)
+ """
res.iterfind('rootGroup').next().append(controller_service)"""
+
+ for conn_name in connectable.connections:
+ conn_destinations = connectable.connections[conn_name]
+
+ if isinstance(conn_destinations, list):
+ for conn_destination in conn_destinations:
+ connection =
self.build_nifi_flow_xml_connection_element(res,
+ bend_points,
+ conn_name,
+ connectable,
+ label_index,
+ conn_destination,
+ z_index)
+ next( res.iterfind('rootGroup') ).append(connection)
+ """ res.iterfind('rootGroup').next().append(connection) """
+
+ if conn_destination not in visited:
+ self.serialize(conn_destination, nifi_version, res,
visited)
+ else:
+ connection = self.build_nifi_flow_xml_connection_element(res,
+ bend_points,
+ conn_name,
+ connectable,
+ label_index,
+ conn_destinations,
+ z_index)
+ next( res.iterfind('rootGroup') ).append(connection)
+ """ res.iterfind('rootGroup').next().append(connection) """
+
+ if conn_destinations not in visited:
+ self.serialize(conn_destinations, nifi_version, res,
visited)
+
+ if root is None:
+ return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>'
+ + "\n"
+ + elementTree.tostring(res,
encoding='utf-8').decode('utf-8'))
+
+ def build_nifi_flow_xml_connection_element(self, res, bend_points,
conn_name, connectable, label_index, destination, z_index):
+ connection = Element('connection')
+
+ connection_id = Element('id')
+ connection_id.text = str(uuid.uuid4())
+ connection.append(connection_id)
+
+ connection_name = Element('name')
+ connection.append(connection_name)
+
+ connection.append(bend_points)
+ connection.append(label_index)
+ connection.append(z_index)
+
+ connection_source_id = Element('sourceId')
+ connection_source_id.text = str(connectable.uuid)
+ connection.append(connection_source_id)
+
+ connection_source_group_id = Element('sourceGroupId')
+ connection_source_group_id.text = next( res.iterfind('rootGroup/id')
).text
+ """connection_source_group_id.text =
res.iterfind('rootGroup/id').next().text"""
+ connection.append(connection_source_group_id)
+
+ connection_source_type = Element('sourceType')
+ if isinstance(connectable, Processor):
+ connection_source_type.text = 'PROCESSOR'
+ elif isinstance(connectable, InputPort):
+ connection_source_type.text = 'INPUT_PORT'
+ else:
+ raise Exception('Unexpected source type: %s' % type(connectable))
+ connection.append(connection_source_type)
+
+ connection_destination_id = Element('destinationId')
+ connection_destination_id.text = str(destination.uuid)
+ connection.append(connection_destination_id)
+
+ connection_destination_group_id = Element('destinationGroupId')
+ connection_destination_group_id.text =
next(res.iterfind('rootGroup/id')).text
+ """ connection_destination_group_id.text =
res.iterfind('rootGroup/id').next().text """
+ connection.append(connection_destination_group_id)
+
+ connection_destination_type = Element('destinationType')
+ if isinstance(destination, Processor):
+ connection_destination_type.text = 'PROCESSOR'
+ elif isinstance(destination, InputPort):
+ connection_destination_type.text = 'INPUT_PORT'
+ else:
+ raise Exception('Unexpected destination type: %s' %
type(destination))
+ connection.append(connection_destination_type)
+
+ connection_relationship = Element('relationship')
+ if not isinstance(connectable, InputPort):
+ connection_relationship.text = conn_name
+ connection.append(connection_relationship)
+
+ connection_max_work_queue_size = Element('maxWorkQueueSize')
+ connection_max_work_queue_size.text = '10000'
+ connection.append(connection_max_work_queue_size)
+
+ connection_max_work_queue_data_size = Element('maxWorkQueueDataSize')
+ connection_max_work_queue_data_size.text = '1 GB'
+ connection.append(connection_max_work_queue_data_size)
+
+ connection_flow_file_expiration = Element('flowFileExpiration')
+ connection_flow_file_expiration.text = '0 sec'
+ connection.append(connection_flow_file_expiration)
+
+ return connection
+
diff --git a/docker/test/integration/minifi/flow_serialization/__init__.py
b/docker/test/integration/minifi/flow_serialization/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/docker/test/integration/minifi/processors/DeleteS3Object.py
b/docker/test/integration/minifi/processors/DeleteS3Object.py
new file mode 100644
index 0000000..003a9cd
--- /dev/null
+++ b/docker/test/integration/minifi/processors/DeleteS3Object.py
@@ -0,0 +1,22 @@
+from ..core.Processor import Processor
+
+class DeleteS3Object(Processor):
+ def __init__(self,
+ proxy_host = '',
+ proxy_port = '',
+ proxy_username = '',
+ proxy_password = ''):
+ super(DeleteS3Object, self).__init__('DeleteS3Object',
+ properties = {
+ 'Object Key': 'test_object_key',
+ 'Bucket': 'test_bucket',
+ 'Access Key': 'test_access_key',
+ 'Secret Key': 'test_secret',
+ 'Endpoint Override URL': "http://s3-server:9090",
+ 'Proxy Host': proxy_host,
+ 'Proxy Port': proxy_port,
+ 'Proxy Username': proxy_username,
+ 'Proxy Password': proxy_password,
+ },
+ auto_terminate=['success'])
+
diff --git a/docker/test/integration/minifi/processors/GenerateFlowFile.py
b/docker/test/integration/minifi/processors/GenerateFlowFile.py
new file mode 100644
index 0000000..93d42ca
--- /dev/null
+++ b/docker/test/integration/minifi/processors/GenerateFlowFile.py
@@ -0,0 +1,8 @@
+from ..core.Processor import Processor
+
+class GenerateFlowFile(Processor):
+ def __init__(self, file_size, schedule={'scheduling period': '0 sec'}):
+ super(GenerateFlowFile, self).__init__('GenerateFlowFile',
+ properties={'File Size': file_size},
+ schedule=schedule,
+ auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/GetFile.py
b/docker/test/integration/minifi/processors/GetFile.py
new file mode 100644
index 0000000..29b8180
--- /dev/null
+++ b/docker/test/integration/minifi/processors/GetFile.py
@@ -0,0 +1,8 @@
+from ..core.Processor import Processor
+
+class GetFile(Processor):
+ def __init__(self, input_dir, schedule={'scheduling period': '2 sec'}):
+ super(GetFile, self).__init__('GetFile',
+ properties={'Input Directory': input_dir, 'Keep Source
File': 'true'},
+ schedule=schedule,
+ auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/InvokeHTTP.py
b/docker/test/integration/minifi/processors/InvokeHTTP.py
new file mode 100644
index 0000000..135c8d5
--- /dev/null
+++ b/docker/test/integration/minifi/processors/InvokeHTTP.py
@@ -0,0 +1,30 @@
+from ..core.Processor import Processor
+
+class InvokeHTTP(Processor):
+ def __init__(self, url,
+ method='GET',
+ proxy_host='',
+ proxy_port='',
+ proxy_username='',
+ proxy_password='',
+ ssl_context_service=None,
+ schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+ properties = {
+ 'Remote URL': url,
+ 'HTTP Method': method,
+ 'Proxy Host': proxy_host,
+ 'Proxy Port': proxy_port,
+ 'invokehttp-proxy-username': proxy_username,
+ 'invokehttp-proxy-password': proxy_password }
+
+ controller_services = []
+
+ if ssl_context_service is not None:
+ properties['SSL Context Service'] = ssl_context_service.name
+ controller_services.append(ssl_context_service)
+
+ super(InvokeHTTP, self).__init__('InvokeHTTP',
+ properties = properties,
+ controller_services = controller_services,
+ auto_terminate = ['success', 'response', 'retry', 'failure',
'no retry'],
+ schedule = schedule)
diff --git a/docker/test/integration/minifi/processors/ListenHTTP.py
b/docker/test/integration/minifi/processors/ListenHTTP.py
new file mode 100644
index 0000000..6f5bca1
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ListenHTTP.py
@@ -0,0 +1,14 @@
+from ..core.Processor import Processor
+
+class ListenHTTP(Processor):
+ def __init__(self, port, cert=None, schedule=None):
+ properties = {'Listening Port': port}
+
+ if cert is not None:
+ properties['SSL Certificate'] = cert
+ properties['SSL Verify Peer'] = 'no'
+
+ super(ListenHTTP, self).__init__('ListenHTTP',
+ properties=properties,
+ auto_terminate=['success'],
+ schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/LogAttribute.py
b/docker/test/integration/minifi/processors/LogAttribute.py
new file mode 100644
index 0000000..1be4c38
--- /dev/null
+++ b/docker/test/integration/minifi/processors/LogAttribute.py
@@ -0,0 +1,7 @@
+from ..core.Processor import Processor
+
+class LogAttribute(Processor):
+ def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+ super(LogAttribute, self).__init__('LogAttribute',
+ auto_terminate=['success'],
+ schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PublishKafka.py
b/docker/test/integration/minifi/processors/PublishKafka.py
new file mode 100644
index 0000000..698cd74
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PublishKafka.py
@@ -0,0 +1,10 @@
+from ..core.Processor import Processor
+
+class PublishKafka(Processor):
+ def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+ super(PublishKafka, self).__init__('PublishKafka',
+ properties={'Client Name': 'nghiaxlee', 'Known
Brokers': 'kafka-broker:9092', 'Topic Name': 'test',
+ 'Batch Size': '10', 'Compress Codec': 'none',
'Delivery Guarantee': '1',
+ 'Request Timeout': '10 sec', 'Message Timeout':
'12 sec'},
+ auto_terminate=['success'],
+ schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PublishKafkaSSL.py
b/docker/test/integration/minifi/processors/PublishKafkaSSL.py
new file mode 100644
index 0000000..82c33f6
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PublishKafkaSSL.py
@@ -0,0 +1,16 @@
+from ..core.Processor import Processor
+
+class PublishKafkaSSL(Processor):
+ def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+ super(PublishKafkaSSL, self).__init__('PublishKafka',
+ properties={'Client Name': 'LMN', 'Known Brokers':
'kafka-broker:9093',
+ 'Topic Name': 'test', 'Batch Size': '10',
+ 'Compress Codec': 'none', 'Delivery Guarantee': '1',
+ 'Request Timeout': '10 sec', 'Message Timeout': '12 sec',
+ 'Security CA': '/tmp/resources/certs/ca-cert',
+ 'Security Cert': '/tmp/resources/certs/client_LMN_client.pem',
+ 'Security Pass Phrase': 'abcdefgh',
+ 'Security Private Key':
'/tmp/resources/certs/client_LMN_client.key',
+ 'Security Protocol': 'ssl'},
+ auto_terminate=['success'],
+ schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PutFile.py
b/docker/test/integration/minifi/processors/PutFile.py
new file mode 100644
index 0000000..047d32d
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PutFile.py
@@ -0,0 +1,14 @@
+from ..core.Processor import Processor
+
+class PutFile(Processor):
+ def __init__(self, output_dir, schedule={'scheduling strategy':
'EVENT_DRIVEN'}):
+ super(PutFile, self).__init__('PutFile',
+ properties={'Directory': output_dir, 'Directory Permissions':
'777', 'Permissions': '777'},
+ auto_terminate=['success', 'failure'],
+ schedule=schedule)
+
+ def nifi_property_key(self, key):
+ if key == 'Directory Permissions':
+ return None
+ else:
+ return key
diff --git a/docker/test/integration/minifi/processors/PutS3Object.py
b/docker/test/integration/minifi/processors/PutS3Object.py
new file mode 100644
index 0000000..74fb4a8
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PutS3Object.py
@@ -0,0 +1,20 @@
+from ..core.Processor import Processor
+
+class PutS3Object(Processor):
+ def __init__(self,
+ proxy_host='',
+ proxy_port='',
+ proxy_username='',
+ proxy_password=''):
+ super(PutS3Object, self).__init__('PutS3Object',
+ properties = {
+ 'Object Key': 'test_object_key',
+ 'Bucket': 'test_bucket',
+ 'Access Key': 'test_access_key',
+ 'Secret Key': 'test_secret',
+ 'Endpoint Override URL': "http://s3-server:9090",
+ 'Proxy Host': proxy_host,
+ 'Proxy Port': proxy_port,
+ 'Proxy Username': proxy_username,
+ 'Proxy Password': proxy_password },
+ auto_terminate = ['success'])
diff --git a/docker/test/integration/minifi/processors/__init__.py
b/docker/test/integration/minifi/processors/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git
a/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py
b/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py
new file mode 100644
index 0000000..689cf7d
--- /dev/null
+++ b/docker/test/integration/minifi/validators/EmptyFilesOutPutValidator.py
@@ -0,0 +1,27 @@
+import logging
+import os
+
+from os import listdir
+
+from .FileOutputValidator import FileOutputValidator
+
+class EmptyFilesOutPutValidator(FileOutputValidator):
+
+ """
+ Validates if all the files in the target directory are empty and at least
one exists
+ """
+ def __init__(self):
+ self.valid = False
+
+ def validate(self, dir=''):
+
+ if self.valid:
+ return True
+
+ full_dir = self.output_dir + dir
+ logging.info("Output folder: %s", full_dir)
+ listing = listdir(full_dir)
+ if listing:
+ self.valid = all(os.path.getsize(os.path.join(full_dir,x)) == 0
for x in listing)
+
+ return self.valid
diff --git a/docker/test/integration/minifi/validators/FileOutputValidator.py
b/docker/test/integration/minifi/validators/FileOutputValidator.py
new file mode 100644
index 0000000..d558c43
--- /dev/null
+++ b/docker/test/integration/minifi/validators/FileOutputValidator.py
@@ -0,0 +1,8 @@
+from .OutputValidator import OutputValidator
+
+class FileOutputValidator(OutputValidator):
+ def set_output_dir(self, output_dir):
+ self.output_dir = output_dir
+
+ def validate(self, dir=''):
+ pass
diff --git a/docker/test/integration/minifi/validators/NoFileOutPutValidator.py
b/docker/test/integration/minifi/validators/NoFileOutPutValidator.py
new file mode 100644
index 0000000..f60a008
--- /dev/null
+++ b/docker/test/integration/minifi/validators/NoFileOutPutValidator.py
@@ -0,0 +1,25 @@
+import logging
+
+from os import listdir
+
+from .FileOutputValidator import FileOutputValidator
+
+class NoFileOutPutValidator(FileOutputValidator):
+ """
+ Validates if no flowfiles were transferred
+ """
+ def __init__(self):
+ self.valid = False
+
+ def validate(self, dir=''):
+
+ if self.valid:
+ return True
+
+ full_dir = self.output_dir + dir
+ logging.info("Output folder: %s", full_dir)
+ listing = listdir(full_dir)
+
+ self.valid = not bool(listing)
+
+ return self.valid
diff --git a/docker/test/integration/minifi/validators/OutputValidator.py
b/docker/test/integration/minifi/validators/OutputValidator.py
new file mode 100644
index 0000000..c05d5fa
--- /dev/null
+++ b/docker/test/integration/minifi/validators/OutputValidator.py
@@ -0,0 +1,12 @@
+class OutputValidator(object):
+ """
+ Base output validator class. Validators must implement
+ method validate, which returns a boolean.
+ """
+
+ def validate(self):
+ """
+ Return True if output is valid; False otherwise.
+ """
+ raise NotImplementedError("validate function needs to be implemented
for validators")
+
diff --git a/docker/test/integration/minifi/validators/SegfaultValidator.py
b/docker/test/integration/minifi/validators/SegfaultValidator.py
new file mode 100644
index 0000000..ee0227d
--- /dev/null
+++ b/docker/test/integration/minifi/validators/SegfaultValidator.py
@@ -0,0 +1,8 @@
+from .OutputValidator import OutputValidator
+
+class SegfaultValidator(OutputValidator):
+ """
+ Validate that a file was received.
+ """
+ def validate(self):
+ return True
diff --git
a/docker/test/integration/minifi/validators/SingleFileOutputValidator.py
b/docker/test/integration/minifi/validators/SingleFileOutputValidator.py
new file mode 100644
index 0000000..7466b41
--- /dev/null
+++ b/docker/test/integration/minifi/validators/SingleFileOutputValidator.py
@@ -0,0 +1,44 @@
+import logging
+import os
+
+from os import listdir
+from os.path import join
+
+from .FileOutputValidator import FileOutputValidator
+
+class SingleFileOutputValidator(FileOutputValidator):
+ """
+ Validates the content of a single file in the given directory.
+ """
+
+ def __init__(self, expected_content, subdir=''):
+ self.valid = False
+ self.expected_content = expected_content
+ self.subdir = subdir
+
+ def validate(self):
+ self.valid = False
+ full_dir = os.path.join(self.output_dir, self.subdir)
+ logging.info("Output folder: %s", full_dir)
+
+ if not os.path.isdir(full_dir):
+ return self.valid
+
+ listing = listdir(full_dir)
+ if listing:
+ for l in listing:
+ logging.info("name:: %s", l)
+ out_file_name = listing[0]
+ full_path = join(full_dir, out_file_name)
+ if not os.path.isfile(full_path):
+ return self.valid
+
+ with open(full_path, 'r') as out_file:
+ contents = out_file.read()
+ logging.info("dir %s -- name %s", full_dir, out_file_name)
+ logging.info("expected %s -- content %s",
self.expected_content, contents)
+
+ if self.expected_content in contents:
+ self.valid = True
+
+ return self.valid
diff --git a/docker/test/integration/minifi/validators/__init__.py
b/docker/test/integration/minifi/validators/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/docker/test/integration/test_filesystem_ops.py
b/docker/test/integration/test_filesystem_ops.py
index 3ae4ff5..5b48e6e 100644
--- a/docker/test/integration/test_filesystem_ops.py
+++ b/docker/test/integration/test_filesystem_ops.py
@@ -14,8 +14,6 @@
# limitations under the License.
from minifi import *
-from minifi.test import *
-
def test_get_put():
"""
diff --git a/docker/test/integration/test_filter_zero_file.py
b/docker/test/integration/test_filter_zero_file.py
index b56b23f..918582b 100644
--- a/docker/test/integration/test_filter_zero_file.py
+++ b/docker/test/integration/test_filter_zero_file.py
@@ -14,7 +14,6 @@
# limitations under the License.
from minifi import *
-from minifi.test import *
def test_filter_zero_file():
"""
diff --git a/docker/test/integration/test_hash_content.py
b/docker/test/integration/test_hash_content.py
index 1482014..e60d3a3 100644
--- a/docker/test/integration/test_hash_content.py
+++ b/docker/test/integration/test_hash_content.py
@@ -14,8 +14,6 @@
# limitations under the License.
from minifi import *
-from minifi.test import *
-
def test_hash_invoke():
"""
diff --git a/docker/test/integration/test_http.py
b/docker/test/integration/test_http.py
index facc45c..f9431d4 100644
--- a/docker/test/integration/test_http.py
+++ b/docker/test/integration/test_http.py
@@ -14,8 +14,6 @@
# limitations under the License.
from minifi import *
-from minifi.test import *
-
def test_invoke_listen():
"""
diff --git a/docker/test/integration/test_rdkafka.py
b/docker/test/integration/test_rdkafka.py
index 980e66f..bea36db 100644
--- a/docker/test/integration/test_rdkafka.py
+++ b/docker/test/integration/test_rdkafka.py
@@ -14,8 +14,6 @@
# limitations under the License.
from minifi import *
-from minifi.test import *
-
def test_publish_kafka():
"""
diff --git a/docker/test/integration/test_s2s.py
b/docker/test/integration/test_s2s.py
index 3125139..ff1a312 100644
--- a/docker/test/integration/test_s2s.py
+++ b/docker/test/integration/test_s2s.py
@@ -14,8 +14,6 @@
# limitations under the License.
from minifi import *
-from minifi.test import *
-
def test_minifi_to_nifi():
"""
diff --git a/docker/test/integration/test_s3.py
b/docker/test/integration/test_s3.py
index d961003..75b361a 100644
--- a/docker/test/integration/test_s3.py
+++ b/docker/test/integration/test_s3.py
@@ -14,8 +14,6 @@
# limitations under the License.
from minifi import *
-from minifi.test import *
-
def test_put_s3_object():
"""
diff --git a/docker/test/integration/test_zero_file.py
b/docker/test/integration/test_zero_file.py
index 3223329..1a0cf05 100644
--- a/docker/test/integration/test_zero_file.py
+++ b/docker/test/integration/test_zero_file.py
@@ -14,7 +14,6 @@
# limitations under the License.
from minifi import *
-from minifi.test import *
def test_zero_file():
"""
diff --git a/docker/test/test_https.py b/docker/test/test_https.py
index 79a565f..2ea1bed 100644
--- a/docker/test/test_https.py
+++ b/docker/test/test_https.py
@@ -18,8 +18,6 @@ import time
from M2Crypto import X509, EVP, RSA, ASN1
from minifi import *
-from minifi.test import *
-
def callback():
pass