lordgamez commented on a change in pull request #1076:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1076#discussion_r648050633
##########
File path: docker/test/integration/features/kafka.feature
##########
@@ -57,3 +57,180 @@ Feature: Sending data to using Kafka streaming platform
using PublishKafka
When both instances start up
Then a flowfile with the content "test" is placed in the monitored
directory in less than 60 seconds
+
+ Scenario: MiNiFi consumes data from a kafka topic
+ Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
in the "kafka-consumer-flow" flow
+ And the "success" relationship of the ConsumeKafka processor is connected
to the PutFile
+
+ And a kafka broker "broker" is set up in correspondence with the
third-party kafka publisher
+
+ When all instances start up
+ And a message with content "some test message" is published to the
"ConsumeKafkaTest" topic
+
+ Then at least one flowfile with the content "some test message" is placed
in the monitored directory in less than 60 seconds
+
+ Scenario Outline: ConsumeKafka parses and uses kafka topics and topic name
formats
+ Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+ And the "Topic Names" of the ConsumeKafka processor is set to "<topic
names>"
+ And the "Topic Name Format" of the ConsumeKafka processor is set to
"<topic name format>"
+ And the "Offset Reset" of the ConsumeKafka processor is set to "earliest"
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
in the "kafka-consumer-flow" flow
+ And the "success" relationship of the ConsumeKafka processor is connected
to the PutFile
+
+ And a kafka broker "broker" is set up in correspondence with the
third-party kafka publisher
+ And the kafka broker "broker" is started
+ And the topic "ConsumeKafkaTest" is initialized on the kafka broker
+
+ When all other processes start up
+ And a message with content "<message 1>" is published to the
"ConsumeKafkaTest" topic
+ And a message with content "<message 2>" is published to the
"ConsumeKafkaTest" topic
+
+ Then two flowfiles with the contents "<message 1>" and "<message 2>" are
placed in the monitored directory in less than 90 seconds
+
+ Examples: Topic names and formats to test
+ | message 1 | message 2 | topic names |
topic name format |
+ | Ulysses | James Joyce | ConsumeKafkaTest |
(not set) |
+ | The Great Gatsby | F. Scott Fitzgerald | ConsumeKafkaTest |
Names |
+ | War and Peace | Lev Tolstoy | a,b,c,ConsumeKafkaTest,d |
Names |
+ | Nineteen Eighty Four | George Orwell | ConsumeKafkaTest |
Patterns |
+ | Hamlet | William Shakespeare | Cons[emu]*KafkaTest |
Patterns |
+
+ Scenario Outline: ConsumeKafka key attribute is encoded according to the
"Key Attribute Encoding" property
+ Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+ And the "Key Attribute Encoding" of the ConsumeKafka processor is set to
"<key attribute encoding>"
+ And a RouteOnAttribute processor in the "kafka-consumer-flow" flow
+ And a LogAttribute processor in the "kafka-consumer-flow" flow
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
in the "kafka-consumer-flow" flow
+ And the "success" of the RouteOnAttribute processor is set to match <key
attribute encoding> encoded kafka message key "consume_kafka_test_key"
+
+ And the "success" relationship of the ConsumeKafka processor is connected
to the LogAttribute
+ And the "success" relationship of the LogAttribute processor is connected
to the RouteOnAttribute
+ And the "success" relationship of the RouteOnAttribute processor is
connected to the PutFile
+
+ And a kafka broker "broker" is set up in correspondence with the
third-party kafka publisher
+
+ When all instances start up
+ And a message with content "<message 1>" is published to the
"ConsumeKafkaTest" topic with key "consume_kafka_test_key"
+ And a message with content "<message 2>" is published to the
"ConsumeKafkaTest" topic with key "consume_kafka_test_key"
+
+ Then two flowfiles with the contents "<message 1>" and "<message 2>" are
placed in the monitored directory in less than 45 seconds
+
+ Examples: Key attribute encoding values
+ | message 1 | message 2 | key attribute
encoding |
+ | The Odyssey | Ὅμηρος | (not set)
|
+ | Lolita | Владимир Владимирович Набоков | utf-8
|
+ | Crime and Punishment | Фёдор Михайлович Достоевский | hex
|
+ | Paradise Lost | John Milton | hEX
|
+
+ Scenario Outline: ConsumeKafka transactional behaviour is supported
+ Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+ And the "Topic Names" of the ConsumeKafka processor is set to
"ConsumeKafkaTest"
+ And the "Honor Transactions" of the ConsumeKafka processor is set to
"<honor transactions>"
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
in the "kafka-consumer-flow" flow
+ And the "success" relationship of the ConsumeKafka processor is connected
to the PutFile
+
+ And a kafka broker "broker" is set up in correspondence with the
third-party kafka publisher
+
+ When all instances start up
+ And the publisher performs a <transaction type> transaction publishing to
the "ConsumeKafkaTest" topic these messages: <messages sent>
+
+ Then <number of flowfiles expected> flowfiles are placed in the monitored
directory in less than 30 seconds
+
+ Examples: Transaction descriptions
+ | messages sent | transaction type | honor
transactions | number of flowfiles expected |
+ | Pride and Prejudice, Jane Austen | SINGLE_COMMITTED_TRANSACTION | (not
set) | 2 |
+ | Dune, Frank Herbert | TWO_SEPARATE_TRANSACTIONS | (not
set) | 2 |
+ | The Black Sheep, Honore De Balzac | NON_COMMITTED_TRANSACTION | (not
set) | 0 |
+ | Gospel of Thomas | CANCELLED_TRANSACTION | (not
set) | 0 |
+ | Operation Dark Heart | CANCELLED_TRANSACTION | true
| 0 |
+ | Brexit | CANCELLED_TRANSACTION | false
| 1 |
+
+ Scenario Outline: Headers on consumed kafka messages are extracted into
attributes if requested on ConsumeKafka
+ Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+ And the "Headers To Add As Attributes" of the ConsumeKafka processor is
set to "<headers to add as attributes>"
+ And the "Duplicate Header Handling" of the ConsumeKafka processor is set
to "<duplicate header handling>"
+ And a RouteOnAttribute processor in the "kafka-consumer-flow" flow
+ And a LogAttribute processor in the "kafka-consumer-flow" flow
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
+ And the "success" of the RouteOnAttribute processor is set to match the
attribute "<headers to add as attributes>" to "<expected value>"
+
+ And the "success" relationship of the ConsumeKafka processor is connected
to the LogAttribute
+ And the "success" relationship of the LogAttribute processor is connected
to the RouteOnAttribute
+ And the "success" relationship of the RouteOnAttribute processor is
connected to the PutFile
+
+ And a kafka broker "broker" is set up in correspondence with the
third-party kafka publisher
+
+ When all instances start up
+ And a message with content "<message 1>" is published to the
"ConsumeKafkaTest" topic with headers "<message headers sent>"
+ And a message with content "<message 2>" is published to the
"ConsumeKafkaTest" topic with headers "<message headers sent>"
+
+ Then two flowfiles with the contents "<message 1>" and "<message 2>" are
placed in the monitored directory in less than 45 seconds
+
+ Examples: Messages with headers
+ | message 1 | message 2 | message headers sent
| headers to add as attributes | expected value | duplicate header
handling |
+ | Homeland | R. A. Salvatore | Contains dark elves: yes
| (not set) | (not set) | (not set)
|
+ | Magician | Raymond E. Feist | Rating: 10/10
| Rating | 10/10 | (not set)
|
+ | Mistborn | Brandon Sanderson | Metal: Copper; Metal: Iron
| Metal | Copper | Keep First
|
+ | Mistborn | Brandon Sanderson | Metal: Copper; Metal: Iron
| Metal | Iron | Keep Latest
|
+ | Mistborn | Brandon Sanderson | Metal: Copper; Metal: Iron
| Metal | Copper, Iron | Comma-separated Merge
|
+ | The Lord of the Rings | J. R. R. Tolkien | Parts: First, second, third
| Parts | First, second, third | (not set)
|
+
+ Scenario: Messages are separated into multiple flowfiles if the message
demarcator is present in the message
+ Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+ And the "Message Demarcator" of the ConsumeKafka processor is set to "a"
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
+
+ And the "success" relationship of the ConsumeKafka processor is connected
to the PutFile
+
+ And a kafka broker "broker" is set up in correspondence with the
third-party kafka publisher
+
+ When all instances start up
+ And a message with content "Barbapapa" is published to the
"ConsumeKafkaTest" topic
+ And a message with content "Anette Tison and Talus Taylor" is published to
the "ConsumeKafkaTest" topic
+
+ Then flowfiles with these contents are placed in the monitored directory
in less than 45 seconds: "B,rb,p,Anette Tison ,nd T,lus T,ylor"
+
+ Scenario Outline: The ConsumeKafka "Maximum Poll Records" property sets a
limit on the messages processed in a single batch
+ Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+ And a LogAttribute processor in the "kafka-consumer-flow" flow
+ And a PutFile processor with the "Directory" property set to "/tmp/output"
+
+ And the "Max Poll Records" of the ConsumeKafka processor is set to "<max
poll records>"
+ And the scheduling period of the ConsumeKafka processor is set to
"<scheduling period>"
+ And the scheduling period of the LogAttribute processor is set to
"<scheduling period>"
+ And the "FlowFiles To Log" of the LogAttribute processor is set to "<max
poll records>"
+
+ And the "success" relationship of the ConsumeKafka processor is connected
to the LogAttribute
+ And the "success" relationship of the LogAttribute processor is connected
to the PutFile
+
+ And a kafka broker "broker" is set up in correspondence with the
third-party kafka publisher
+
+ When all instances start up
+ And 1000 kafka messages are sent to the topic "ConsumeKafkaTest"
+
+ Then minimum <min expected messages>, maximum <max expected messages>
flowfiles are produced and placed in the monitored directory in less than
<polling time>
+
+ Examples: Message batching
+ | max poll records | scheduling period | polling time | min expected
messages | max expected messages |
+ | 3 | 10 sec | 60 seconds | 12
| 24 |
+ | 6 | 10 sec | 60 seconds | 24
| 48 |
Review comment:
Update in cdb014f306f614bfad5e6120f089ab821c904677 to expect half of
those messages in half of the polling interval
##########
File path: docker/test/integration/steps/steps.py
##########
@@ -270,9 +367,119 @@ def step_impl(context, content, file_name, path, seconds):
context.test.add_test_data(path, content, file_name)
+# Kafka
+def delivery_report(err, msg):
+ if err is not None:
+ logging.info('Message delivery failed: {}'.format(err))
+ else:
+ logging.info('Message delivered to {} [{}]'.format(msg.topic(),
msg.partition()))
+
+
+@when("a message with content \"{content}\" is published to the
\"{topic_name}\" topic")
+def step_impl(context, content, topic_name):
+ producer = Producer({"bootstrap.servers": "localhost:29092", "client.id":
socket.gethostname()})
+ producer.produce(topic_name, content.encode("utf-8"),
callback=delivery_report)
+ producer.flush(10)
+
+
+# Used for testing transactional message consumption
+@when("the publisher performs a {transaction_type} transaction publishing to
the \"{topic_name}\" topic these messages: {messages}")
+def step_impl(context, transaction_type, topic_name, messages):
+ producer = Producer({"bootstrap.servers": "localhost:29092",
"transactional.id": "1001"})
+ producer.init_transactions()
+ logging.info("Transaction type: %s", transaction_type)
+ logging.info("Messages: %s", messages.split(", "))
+ if transaction_type == "SINGLE_COMMITTED_TRANSACTION":
+ producer.begin_transaction()
+ for content in messages.split(", "):
+ producer.produce(topic_name, content.encode("utf-8"),
callback=delivery_report)
+ producer.commit_transaction()
+ producer.flush(10)
+ elif transaction_type == "TWO_SEPARATE_TRANSACTIONS":
+ for content in messages.split(", "):
+ producer.begin_transaction()
+ producer.produce(topic_name, content.encode("utf-8"),
callback=delivery_report)
+ producer.commit_transaction()
+ producer.flush(10)
+ elif transaction_type == "NON_COMMITTED_TRANSACTION":
+ producer.begin_transaction()
+ for content in messages.split(", "):
+ producer.produce(topic_name, content.encode("utf-8"),
callback=delivery_report)
+ producer.flush(10)
+ elif transaction_type == "CANCELLED_TRANSACTION":
+ producer.begin_transaction()
+ for content in messages.split(", "):
+ producer.produce(topic_name, content.encode("utf-8"),
callback=delivery_report)
+ producer.flush(10)
+ producer.abort_transaction()
+ else:
+ raise Exception("Unknown transaction type.")
+
+
+@when("a message with content \"{content}\" is published to the
\"{topic_name}\" topic with key \"{message_key}\"")
+def step_impl(context, content, topic_name, message_key):
+ producer = Producer({"bootstrap.servers": "localhost:29092", "client.id":
socket.gethostname()})
+ # Asynchronously produce a message, the delivery report callback
+ # will be triggered from poll() above, or flush() below, when the message
has
+ # been successfully delivered or failed permanently.
+ producer.produce(topic_name, content.encode("utf-8"),
callback=delivery_report, key=message_key.encode("utf-8"))
+ # Wait for any outstanding messages to be delivered and delivery report
+ # callbacks to be triggered.
+ producer.flush(10)
+
+
+@when("{number_of_messages} kafka messages are sent to the topic
\"{topic_name}\"")
+def step_impl(context, number_of_messages, topic_name):
+ producer = Producer({"bootstrap.servers": "localhost:29092", "client.id":
socket.gethostname()})
+ for i in range(0, int(number_of_messages)):
+ producer.produce(topic_name, str(uuid.uuid4()).encode("utf-8"))
+ producer.flush(10)
+
+
+@when("a message with content \"{content}\" is published to the
\"{topic_name}\" topic with headers \"{semicolon_separated_headers}\"")
+def step_impl(context, content, topic_name, semicolon_separated_headers):
+ # Confluent kafka does not support multiple headers with same key, another
API must be used here.
+ headers = []
+ for header in semicolon_separated_headers.split(";"):
+ kv = header.split(":")
+ headers.append((kv[0].strip(), kv[1].strip().encode("utf-8")))
+ producer = KafkaProducer(bootstrap_servers='localhost:29092')
+ future = producer.send(topic_name, content.encode("utf-8"),
headers=headers)
+ assert future.get(timeout=60)
+
+
@then("a flowfile with the content \"{content}\" is placed in the monitored
directory in less than {duration}")
def step_impl(context, content, duration):
- context.test.check_for_file_with_content_generated(content,
timeparse(duration))
+ context.test.check_for_single_file_with_content_generated(content,
timeparse(duration))
+
+
+@then("at least one flowfile with the content \"{content}\" is placed in the
monitored directory in less than {duration}")
+def step_impl(context, content, duration):
+ context.test.check_for_at_least_one_file_with_content_generated(content,
timeparse(duration))
+
+
+@then("{num_flowfiles} flowfiles are placed in the monitored directory in less
than {duration}")
+def step_impl(context, num_flowfiles, duration):
+ if num_flowfiles == 0:
+ context.execute_steps("""no files are placed in the monitored
directory in {duration} of running time""".format(duration=duration))
+ return
+ context.test.check_for_num_files_generated(int(num_flowfiles),
timeparse(duration))
+
+
+@then("two flowfiles with the contents \"{content_1}\" and \"{content_2}\" are
placed in the monitored directory in less than {duration}")
+def step_impl(context, content_1, content_2, duration):
+ context.test.check_for_multiple_files_generated(2, timeparse(duration),
[content_1, content_2])
+
+
+@then("flowfiles with these contents are placed in the monitored directory in
less than {duration}: \"{contents}\"")
+def step_impl(context, duration, contents):
+ contents_arr = contents.split(",")
+ context.test.check_for_multiple_files_generated(len(contents_arr),
timeparse(duration), contents_arr)
+
+
+@then("minimum {lower_bound}, maximum {upper_bound} flowfiles are produced and
placed in the monitored directory in less than {duration}")
Review comment:
Update in cdb014f306f614bfad5e6120f089ab821c904677
##########
File path: docker/DockerVerify.sh
##########
@@ -72,28 +74,12 @@ export PATH
PYTHONPATH="${PYTHONPATH}:${docker_dir}/test/integration"
export PYTHONPATH
+# Add --no-logcapture to see logs inline
Review comment:
Update in cdb014f306f614bfad5e6120f089ab821c904677
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]