hunyadi-dev commented on a change in pull request #1053:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1053#discussion_r626393007



##########
File path: docker/test/integration/steps/steps.py
##########
@@ -120,26 +132,75 @@ def step_impl(context):
     publish_kafka.set_name("PublishKafka")
     context.test.add_node(publish_kafka)
 
+@given("a kafka producer workflow publishing files placed in \"{directory}\" 
to a broker exactly once")
+def step_impl(context, directory):
+    context.execute_steps("""
+        given a GetFile processor with the \"Input Directory\" property set to 
\"{directory}\"
+        and the \"Keep Source File\" of the GetFile processor is set to 
\"false\"
+        and a PublishKafka processor set up to communicate with a kafka broker 
instance
+        and the "success" relationship of the GetFile processor is connected 
to the PublishKafka""".
+        format(directory=directory))
+
+@given("a ConsumeKafka processor set up in a \"{cluster_name}\" flow")
+def step_impl(context, cluster_name):
+    consume_kafka = ConsumeKafka()
+    consume_kafka.set_name("ConsumeKafka")
+    context.test.add_node(consume_kafka)
+    logging.info("Acquiring " + cluster_name)
+    cluster = context.test.acquire_cluster(cluster_name)
+    # Assume that the first node declared is primary unless specified otherwise
+    if cluster.get_flow() is None:
+        cluster.set_name(cluster_name)
+        cluster.set_flow(consume_kafka)
 
 @given("the \"{property_name}\" of the {processor_name} processor is set to 
\"{property_value}\"")
 def step_impl(context, property_name, processor_name, property_value):
     processor = context.test.get_node_by_name(processor_name)
-    processor.set_property(property_name, property_value)
+    if property_value == "(not set)":
+        processor.unset_property(property_name)
+    else:
+        processor.set_property(property_name, property_value)
+
+@given("the \"{property_name}\" of the {processor_name} processor is set to 
match {key_attribute_encoding} encoded kafka message key \"{message_key}\"")
+def step_impl(context, property_name, processor_name, key_attribute_encoding, 
message_key):
+    encoded_key = ""
+    if(key_attribute_encoding.lower() == "hex"):
+        # Hex is presented upper-case to be in sync with NiFi
+        encoded_key = binascii.hexlify(message_key.encode("utf-8")).upper()
+    elif(key_attribute_encoding.lower() == "(not set)"):
+        encoded_key = message_key.encode("utf-8")
+    else:
+        encoded_key = message_key.encode(key_attribute_encoding)
+    logging.info("%s processor is set up to match encoded key \"%s\"", 
processor_name, encoded_key)
+    filtering = "${kafka.key:equals('" + encoded_key.decode("utf-8") + "')}"
+    logging.info("Filter: \"%s\"", filtering)
+    processor = context.test.get_node_by_name(processor_name)
+    processor.set_property(property_name, filtering)
 
+@given("the \"{property_name}\" of the {processor_name} processor is set to 
match the attribute \"{attribute_key}\" to \"{attribute_value}\"")
+def step_impl(context, property_name, processor_name, attribute_key, 
attribute_value):
+    processor = context.test.get_node_by_name(processor_name)
+    if attribute_value == "(not set)":

Review comment:
       I understand, but example/context tables are much easier to add like 
this as it keeps the property name consistent:
   ```gherkin
   Examples:
       | message 1            | message 2           | topic names              
| topic name format |
       | Ulysses              | James Joyce         | ConsumeKafkaTest         
| (not set)         |
       | The Great Gatsby     | F. Scott Fitzgerald | ConsumeKafkaTest         
| Names             |
       ...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to