adamdebreceni commented on a change in pull request #1053:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1053#discussion_r625019088



##########
File path: docker/test/integration/steps/steps.py
##########
@@ -269,11 +361,117 @@ def step_impl(context, content, file_name, path, 
seconds):
     time.sleep(seconds)
     context.test.add_test_data(path, content, file_name)
 
+# Kafka
+def delivery_report(err, msg):
+    if err is not None:
+        logging.info('Message delivery failed: {}'.format(err))
+    else:
+        logging.info('Message delivered to {} [{}]'.format(msg.topic(), 
msg.partition()))
+
+@when("a message with content \"{content}\" is published to the 
\"{topic_name}\" topic")
+def step_impl(context, content, topic_name):
+    producer = Producer({"bootstrap.servers": "localhost:29092", "client.id": 
socket.gethostname()})
+    producer.produce(topic_name, content.encode("utf-8"), 
callback=delivery_report)
+    producer.flush(10)
+
+# Used for testing transactional message consumption
+@when("the publisher performs a {transaction_type} transaction publishing to 
the \"{topic_name}\" topic these messages: {messages}")
+def step_impl(context, transaction_type, topic_name, messages):
+    producer = Producer({"bootstrap.servers": "localhost:29092", 
"transactional.id": "1001"})
+    producer.init_transactions()
+    logging.info("Transaction type: %s", transaction_type)
+    logging.info("Messages: %s", messages.split(", "))
+    if transaction_type == "SINGLE_COMMITTED_TRANSACTION":
+        producer.begin_transaction()
+        for content in messages.split(", "):
+            producer.produce(topic_name, content.encode("utf-8"), 
callback=delivery_report)
+        producer.commit_transaction()
+        producer.flush(10)
+    elif transaction_type == "TWO_SEPARATE_TRANSACTIONS":
+        for content in messages.split(", "):
+            producer.begin_transaction()
+            producer.produce(topic_name, content.encode("utf-8"), 
callback=delivery_report)
+            producer.commit_transaction()
+        producer.flush(10)
+    elif transaction_type == "NON_COMMITTED_TRANSACTION":
+        producer.begin_transaction()
+        for content in messages.split(", "):
+            producer.produce(topic_name, content.encode("utf-8"), 
callback=delivery_report)
+        producer.flush(10)
+    elif transaction_type == "CANCELLED_TRANSACTION":
+        producer.begin_transaction()
+        for content in messages.split(", "):
+            producer.produce(topic_name, content.encode("utf-8"), 
callback=delivery_report)
+        producer.flush(10)
+        producer.abort_transaction()
+    else:
+        raise Exception("Unknown transaction type.")
+
+@when("a message with content \"{content}\" is published to the 
\"{topic_name}\" topic with key \"{message_key}\"")
+def step_impl(context, content, topic_name, message_key):
+    producer = Producer({"bootstrap.servers": "localhost:29092", "client.id": 
socket.gethostname()})
+    # Asynchronously produce a message, the delivery report callback
+    # will be triggered from poll() above, or flush() below, when the message 
has
+    # been successfully delivered or failed permanently.
+    producer.produce(topic_name, content.encode("utf-8"), 
callback=delivery_report, key=message_key.encode("utf-8"))
+    # Wait for any outstanding messages to be delivered and delivery report
+    # callbacks to be triggered.
+    producer.flush(10)
+
+@when("{number_of_messages} kafka messages are sent to the topic 
\"{topic_name}\"")
+def step_impl(context, number_of_messages, topic_name):
+    producer = Producer({"bootstrap.servers": "localhost:29092", "client.id": 
socket.gethostname()})
+    for i in range(0, int(number_of_messages)):
+        producer.produce(topic_name, str(uuid.uuid4()).encode("utf-8"))
+    producer.flush(10)
+
+@when("a message with content \"{content}\" is published to the 
\"{topic_name}\" topic with headers \"{semicolon_separated_headers}\"")
+def step_impl(context, content, topic_name, semicolon_separated_headers):
+    # Confluent kafka does not support multiple headers with same key, another 
API must be used here.
+    headers = []
+    for header in semicolon_separated_headers.split(";"):
+        kv = header.split(":")
+        headers.append((kv[0].strip(), kv[1].strip().encode("utf-8")))
+    producer = KafkaProducer(bootstrap_servers='localhost:29092')
+    future = producer.send(topic_name, content.encode("utf-8"), 
headers=headers)
+    result = future.get(timeout=60)
 
 @then("a flowfile with the content \"{content}\" is placed in the monitored 
directory in less than {duration}")
 def step_impl(context, content, duration):
-    context.test.check_for_file_with_content_generated(content, 
timeparse(duration))
+    context.test.check_for_single_file_with_content_generated(content, 
timeparse(duration))
 
+@then("at least one flowfile with the content \"{content}\" is placed in the 
monitored directory in less than {duration}")
+def step_impl(context, content, duration):
+    context.test.check_for_at_least_one_file_with_content_generated(content, 
timeparse(duration))
+
+@then("{num_flowfiles} flowfiles are placed in the monitored directory in less 
than {duration}")
+def step_impl(context, num_flowfiles, duration):
+    if num_flowfiles == 0:
+        context.execute_steps("""no files are placed in the monitored 
directory in {duration} of running time""".
+            format(duration=duration))
+        return
+    context.test.check_for_num_files_generated(int(num_flowfiles), 
timeparse(duration))
+
+@then("two flowfiles with the contents \"{content_1}\" and \"{content_2}\" are 
placed in the monitored directory in less than {duration}")
+def step_impl(context, content_1, content_2, duration):
+    wait_start_time = time.perf_counter()
+    context.test.wait_for_multiple_output_files(timeparse(duration), 2)
+    context.execute_steps("""
+        then a flowfile with the content \"{content}\" is placed in the 
monitored directory in less than {duration}""".
+        format(content=content_1, duration=duration))
+    context.execute_steps("""
+        then a flowfile with the content \"{content}\" is placed in the 
monitored directory in less than {duration}""".
+        format(content=content_1, duration=str(timeparse(duration) - 
(time.perf_counter() - wait_start_time)) + " seconds"))

Review comment:
       shouldn't this be `content_2`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to