This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit c73c904893a71944b41f67b2e3c848dde0af10a2 Author: Adam Markovics <[email protected]> AuthorDate: Mon Aug 22 22:02:04 2022 +0200 MINIFICPP-1912 Fix occasionally failing MQTT tests Closes #1395 Signed-off-by: Marton Szasz <[email protected]> --- docker/test/integration/features/mqtt.feature | 43 +++++++++------------- .../integration/minifi/processors/ConsumeMQTT.py | 3 +- .../integration/minifi/processors/PublishMQTT.py | 3 +- 3 files changed, 22 insertions(+), 27 deletions(-) diff --git a/docker/test/integration/features/mqtt.feature b/docker/test/integration/features/mqtt.feature index d83605b57..1f5204875 100644 --- a/docker/test/integration/features/mqtt.feature +++ b/docker/test/integration/features/mqtt.feature @@ -75,10 +75,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT When both instances start up - Then a file with the content "test" is placed in "/tmp/input" + Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" + And the MQTT broker has a log line matching "New client connected from .* as publisher-client" + And a file with the content "test" is placed in "/tmp/input" And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from .*testtopic.*\(4 bytes\)" - And the MQTT broker has a log line matching "Received SUBSCRIBE from" Scenario Outline: Subscription to topics with wildcards Given a GetFile processor with the "Input Directory" property set to "/tmp/input" @@ -95,10 +96,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT When both instances start up - Then a file with the content "test" is placed in "/tmp/input" + Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" + And the MQTT broker has a log line matching "New client connected from .* as publisher-client" + And a file with the content "test" is placed in "/tmp/input" And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from .*test/my/topic.*\(4 bytes\)" - And the MQTT broker has a log line matching "Received SUBSCRIBE from" Examples: Topic wildcard patterns | topic wildcard pattern | @@ -110,14 +112,12 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow And a PublishMQTT processor in the "publisher-client" flow And the "Quality of Service" property of the PublishMQTT processor is set to "1" - And the "Client ID" property of the PublishMQTT processor is set to "publisher-client" And the "success" relationship of the GetFile processor is connected to the PublishMQTT # consuming MQTT client And a ConsumeMQTT processor in the "consumer-client" flow And the "Quality of Service" property of the ConsumeMQTT processor is set to "1" And the "Clean Session" property of the ConsumeMQTT processor is set to "false" - And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile @@ -150,10 +150,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT When both instances start up - Then a file with the content "<message>" is placed in "/tmp/input" + Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" + And the MQTT broker has a log line matching "New client connected from .* as publisher-client" + And a file with the content "<message>" is placed in "/tmp/input" And a flowfile with the content "<message>" is placed in the monitored directory in less than 60 seconds And the MQTT broker has a log line matching "Received PUBLISH from .*<topic>" - And the MQTT broker has a log line matching "Received SUBSCRIBE from" Examples: Topic wildcard patterns | topic | message | @@ -167,12 +168,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a PublishMQTT processor set up to communicate with an MQTT broker instance And the "Quality of Service" property of the PublishMQTT processor is set to "0" - And the "Client ID" property of the PublishMQTT processor is set to "publisher-client" And the "success" relationship of the GetFile processor is connected to the PublishMQTT And a ConsumeMQTT processor set up to communicate with an MQTT broker instance And the "Quality of Service" property of the ConsumeMQTT processor is set to "0" - And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client" And a PutFile processor with the "Directory" property set to "/tmp/output" And "ConsumeMQTT" processor is a start node And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile @@ -180,9 +179,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT When both instances start up - Then a file with the content "test" is placed in "/tmp/input" + Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" + And the MQTT broker has a log line matching "New client connected from .* as publisher-client" + And a file with the content "test" is placed in "/tmp/input" And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds - And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" And the MQTT broker has a log line matching "Received PUBLISH from publisher-client \(d0, q0, r0, m0, 'testtopic'.*\(4 bytes\)" And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q0, r0, m0, 'testtopic',.*\(4 bytes\)\)" @@ -190,12 +190,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a PublishMQTT processor set up to communicate with an MQTT broker instance And the "Quality of Service" property of the PublishMQTT processor is set to "1" - And the "Client ID" property of the PublishMQTT processor is set to "publisher-client" And the "success" relationship of the GetFile processor is connected to the PublishMQTT And a ConsumeMQTT processor set up to communicate with an MQTT broker instance And the "Quality of Service" property of the ConsumeMQTT processor is set to "1" - And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client" And a PutFile processor with the "Directory" property set to "/tmp/output" And "ConsumeMQTT" processor is a start node And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile @@ -203,9 +201,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT When both instances start up - Then a file with the content "test" is placed in "/tmp/input" + Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" + And the MQTT broker has a log line matching "New client connected from .* as publisher-client" + And a file with the content "test" is placed in "/tmp/input" And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds - And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" And the MQTT broker has a log line matching "Received PUBLISH from publisher-client.*m1, 'testtopic'.*\(4 bytes\)" And the MQTT broker has a log line matching "Sending PUBACK to publisher-client \(m1, rc0\)" And the MQTT broker has a log line matching "Sending PUBLISH to consumer-client \(d0, q1, r0, m1, 'testtopic',.*\(4 bytes\)\)" @@ -216,12 +215,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And a PublishMQTT processor set up to communicate with an MQTT broker instance And the "Quality of Service" property of the PublishMQTT processor is set to "2" - And the "Client ID" property of the PublishMQTT processor is set to "publisher-client" And the "success" relationship of the GetFile processor is connected to the PublishMQTT And a ConsumeMQTT processor set up to communicate with an MQTT broker instance And the "Quality of Service" property of the ConsumeMQTT processor is set to "2" - And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client" And a PutFile processor with the "Directory" property set to "/tmp/output" And "ConsumeMQTT" processor is a start node And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile @@ -229,9 +226,10 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT And an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT When both instances start up - Then a file with the content "test" is placed in "/tmp/input" + Then the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" + And the MQTT broker has a log line matching "New client connected from .* as publisher-client" + And a file with the content "test" is placed in "/tmp/input" And a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds - And the MQTT broker has a log line matching "Received SUBSCRIBE from consumer-client" And the MQTT broker has a log line matching "Received PUBLISH from publisher-client.*m1, 'testtopic'.*\(4 bytes\)" And the MQTT broker has a log line matching "Sending PUBREC to publisher-client \(m1, rc0\)" And the MQTT broker has a log line matching "Received PUBREL from publisher-client \(Mid: 1\)" @@ -246,13 +244,11 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow And a file with the content "test" is present in "/tmp/input" And a PublishMQTT processor in the "publisher-client" flow - And the "Client ID" property of the PublishMQTT processor is set to "publisher-client" And the "Retain" property of the PublishMQTT processor is set to "true" And the "success" relationship of the GetFile processor is connected to the PublishMQTT # consuming MQTT client And a ConsumeMQTT processor in the "consumer-client" flow - And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile @@ -272,14 +268,12 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT # publishing MQTT client with last will Given a GetFile processor with the "Input Directory" property set to "/tmp/input" in the "publisher-client" flow And a PublishMQTT processor in the "publisher-client" flow - And the "Client ID" property of the PublishMQTT processor is set to "publisher-client" And the "Last Will Topic" property of the PublishMQTT processor is set to "last_will_topic" And the "Last Will Message" property of the PublishMQTT processor is set to "last_will_message" And the "success" relationship of the GetFile processor is connected to the PublishMQTT # consuming MQTT client set to consume last will topic And a ConsumeMQTT processor in the "consumer-client" flow - And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client" And the "Topic" property of the ConsumeMQTT processor is set to "last_will_topic" And a PutFile processor with the "Directory" property set to "/tmp/output" in the "consumer-client" flow And the "success" relationship of the ConsumeMQTT processor is connected to the PutFile @@ -295,7 +289,6 @@ Feature: Sending data to MQTT streaming platform using PublishMQTT Scenario: Keep Alive Given a ConsumeMQTT processor set up to communicate with an MQTT broker instance - And the "Client ID" property of the ConsumeMQTT processor is set to "consumer-client" And the "Keep Alive Interval" property of the ConsumeMQTT processor is set to "1 sec" And an MQTT broker is set up in correspondence with the ConsumeMQTT diff --git a/docker/test/integration/minifi/processors/ConsumeMQTT.py b/docker/test/integration/minifi/processors/ConsumeMQTT.py index 6b2c444c6..d32b5bb40 100644 --- a/docker/test/integration/minifi/processors/ConsumeMQTT.py +++ b/docker/test/integration/minifi/processors/ConsumeMQTT.py @@ -23,6 +23,7 @@ class ConsumeMQTT(Processor): 'ConsumeMQTT', properties={ 'Broker URI': 'mqtt-broker:1883', - 'Topic': 'testtopic'}, + 'Topic': 'testtopic', + 'Client ID': 'consumer-client'}, auto_terminate=['success'], schedule=schedule) diff --git a/docker/test/integration/minifi/processors/PublishMQTT.py b/docker/test/integration/minifi/processors/PublishMQTT.py index d4840fee0..8506d3812 100644 --- a/docker/test/integration/minifi/processors/PublishMQTT.py +++ b/docker/test/integration/minifi/processors/PublishMQTT.py @@ -23,6 +23,7 @@ class PublishMQTT(Processor): 'PublishMQTT', properties={ 'Broker URI': 'mqtt-broker:1883', - 'Topic': 'testtopic'}, + 'Topic': 'testtopic', + 'Client ID': 'publisher-client'}, auto_terminate=['success', 'failure'], schedule=schedule)
