adamdebreceni commented on a change in pull request #1053:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1053#discussion_r625028604



##########
File path: docker/test/integration/steps/steps.py
##########
@@ -120,26 +132,75 @@ def step_impl(context):
     publish_kafka.set_name("PublishKafka")
     context.test.add_node(publish_kafka)
 
+@given("a kafka producer workflow publishing files placed in \"{directory}\" 
to a broker exactly once")
+def step_impl(context, directory):
+    context.execute_steps("""
+        given a GetFile processor with the \"Input Directory\" property set to 
\"{directory}\"
+        and the \"Keep Source File\" of the GetFile processor is set to 
\"false\"
+        and a PublishKafka processor set up to communicate with a kafka broker 
instance
+        and the "success" relationship of the GetFile processor is connected 
to the PublishKafka""".
+        format(directory=directory))
+
+@given("a ConsumeKafka processor set up in a \"{cluster_name}\" flow")
+def step_impl(context, cluster_name):
+    consume_kafka = ConsumeKafka()
+    consume_kafka.set_name("ConsumeKafka")
+    context.test.add_node(consume_kafka)
+    logging.info("Acquiring " + cluster_name)
+    cluster = context.test.acquire_cluster(cluster_name)
+    # Assume that the first node declared is primary unless specified otherwise
+    if cluster.get_flow() is None:
+        cluster.set_name(cluster_name)
+        cluster.set_flow(consume_kafka)
 
 @given("the \"{property_name}\" of the {processor_name} processor is set to 
\"{property_value}\"")
 def step_impl(context, property_name, processor_name, property_value):
     processor = context.test.get_node_by_name(processor_name)
-    processor.set_property(property_name, property_value)
+    if property_value == "(not set)":
+        processor.unset_property(property_name)
+    else:
+        processor.set_property(property_name, property_value)
+
+@given("the \"{property_name}\" of the {processor_name} processor is set to 
match {key_attribute_encoding} encoded kafka message key \"{message_key}\"")
+def step_impl(context, property_name, processor_name, key_attribute_encoding, 
message_key):
+    encoded_key = ""
+    if(key_attribute_encoding.lower() == "hex"):
+        # Hex is presented upper-case to be in sync with NiFi
+        encoded_key = binascii.hexlify(message_key.encode("utf-8")).upper()
+    elif(key_attribute_encoding.lower() == "(not set)"):
+        encoded_key = message_key.encode("utf-8")
+    else:
+        encoded_key = message_key.encode(key_attribute_encoding)
+    logging.info("%s processor is set up to match encoded key \"%s\"", 
processor_name, encoded_key)
+    filtering = "${kafka.key:equals('" + encoded_key.decode("utf-8") + "')}"
+    logging.info("Filter: \"%s\"", filtering)
+    processor = context.test.get_node_by_name(processor_name)
+    processor.set_property(property_name, filtering)
 
+@given("the \"{property_name}\" of the {processor_name} processor is set to 
match the attribute \"{attribute_key}\" to \"{attribute_value}\"")
+def step_impl(context, property_name, processor_name, attribute_key, 
attribute_value):
+    processor = context.test.get_node_by_name(processor_name)
+    if attribute_value == "(not set)":

Review comment:
       matching on an attribute with value `(not set)` would make me think that 
it checks for the non-existence of said attribute, moving this behavior to when 
`attribute_key` is `(not set)` might be better in line with the intention here 
(and raising when`attribute_key` is `(not set)` while `attribute_value` is not)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to