This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.19 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 6808a5944d328305ba67babd3e86cfd5e783bcf5 Author: greyp9 <[email protected]> AuthorDate: Wed Nov 30 13:23:23 2022 -0500 NIFI-10901 - PublishKafka headers not sent in ProducerRecord (#6731) --- .../processors/kafka/pubsub/PublisherLease.java | 3 +- .../additionalDetails.html | 2 +- .../pubsub/TestPublishKafkaMockParameterized.java | 97 +++++++++------------- ...> TestPublishKafkaRecordMockParameterized.java} | 49 +++++++++-- .../Publish/parameterized/flowfileInput1.json | 8 ++ .../Publish/parameterized/flowfileInputA.json | 12 +++ .../Publish/parameterized/kafkaOutput1A.json | 18 ++++ .../Publish/parameterized/kafkaOutput1B.json | 18 ++++ .../Publish/parameterized/kafkaOutputA1.json | 22 +++++ .../Publish/parameterized/kafkaOutputA2.json | 22 +++++ .../parameterized/flowfileInputDoc1V.json | 8 ++ .../parameterized/flowfileInputDoc1W.json | 15 ++++ .../parameterized/flowfileInputDoc2W.json | 15 ++++ .../parameterized/kafkaOutputDoc1V.json | 21 +++++ .../parameterized/kafkaOutputDoc1W.json | 18 ++++ .../parameterized/kafkaOutputDoc2W.json | 16 ++++ 16 files changed, 275 insertions(+), 69 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 0949e537b6..da37db0319 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -431,7 +431,8 @@ public class PublisherLease implements Closeable { } protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker, final Integer partition) { - publish(flowFile, Collections.emptyList(), messageKey, messageContent, topic, tracker, partition); + final List<Header> headers = toHeaders(flowFile, Collections.emptyMap()); + publish(flowFile, headers, messageKey, messageContent, topic, tracker, partition); } protected void publish(final FlowFile flowFile, final List<Header> headers, final byte[] messageKey, final byte[] messageContent, diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html index ce87abc73d..e125b0f067 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html @@ -375,7 +375,7 @@ <table border="thin"> <tr> <th>Record Key</th> - <td><code>Acme Accounts</code></td> + <td><code>Acme Holdings</code></td> </tr> <tr> <th>Record Value</th> diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java index 4e97a6da84..b15f5d55a1 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java @@ -31,17 +31,12 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; -import org.apache.nifi.json.JsonRecordSetWriter; -import org.apache.nifi.json.JsonTreeReader; -import org.apache.nifi.kafka.shared.property.PublishStrategy; import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.serialization.RecordReaderFactory; -import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -57,12 +52,15 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; import static java.nio.charset.StandardCharsets.UTF_8; @@ -79,30 +77,29 @@ public class TestPublishKafkaMockParameterized { public static Stream<Arguments> testCaseParametersProvider() { return Stream.of( - arguments("PublishRecord/parameterized/flowfileInput1.json", - "account", ".*A.", getAttributes(), PublishStrategy.USE_VALUE, - "PublishRecord/parameterized/kafkaOutput1V.json"), - arguments("PublishRecord/parameterized/flowfileInput1.json", - "account", ".*B.", getAttributes(), PublishStrategy.USE_WRAPPER, - "PublishRecord/parameterized/kafkaOutput1W.json"), - arguments("PublishRecord/parameterized/flowfileInputA.json", - "key", ".*1", getAttributes(), PublishStrategy.USE_VALUE, - "PublishRecord/parameterized/kafkaOutputAV.json"), - arguments("PublishRecord/parameterized/flowfileInputA.json", - "key", ".*2", getAttributes(), PublishStrategy.USE_WRAPPER, - "PublishRecord/parameterized/kafkaOutputAW.json") + arguments("Publish/parameterized/flowfileInput1.json", + "key1A", ".*A.", getAttributes(), + "Publish/parameterized/kafkaOutput1A.json"), + arguments("Publish/parameterized/flowfileInput1.json", + "key1B", ".*B.", getAttributes(), + "Publish/parameterized/kafkaOutput1B.json"), + arguments("Publish/parameterized/flowfileInputA.json", + "keyA1", ".*1", getAttributes(), + "Publish/parameterized/kafkaOutputA1.json"), + arguments("Publish/parameterized/flowfileInputA.json", + "keyA2", ".*2", getAttributes(), + "Publish/parameterized/kafkaOutputA2.json") ); } @ParameterizedTest @MethodSource("testCaseParametersProvider") - public void testPublishKafkaRecord(final String flowfileInputResource, - final String messageKeyField, - final String attributeNameRegex, - final Map<String, String> attributes, - final PublishStrategy publishStrategy, - final String kafkaRecordExpectedOutputResource) - throws IOException, InitializationException { + public void testPublishKafka(final String flowfileInputResource, + final String messageKey, + final String attributeNameRegex, + final Map<String, String> attributes, + final String kafkaRecordExpectedOutputResource) + throws IOException { final byte[] flowfileData = IOUtils.toByteArray(Objects.requireNonNull( getClass().getClassLoader().getResource(flowfileInputResource))); logger.trace(new String(flowfileData, UTF_8)); @@ -114,12 +111,11 @@ public class TestPublishKafkaMockParameterized { final TestRunner runner = getTestRunner(producedRecords); runner.setProperty("topic", "test-topic"); runner.setProperty("attribute-name-regex", attributeNameRegex); - runner.setProperty("message-key-field", messageKeyField); - runner.setProperty("publish-strategy", publishStrategy.name()); + runner.setProperty("kafka-key", messageKey); runner.enqueue(flowFile); runner.run(1); // verify results - runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_SUCCESS, 1); assertEquals(1, producedRecords.size()); final ProducerRecord<byte[], byte[]> kafkaRecord = producedRecords.iterator().next(); final DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter() @@ -151,7 +147,7 @@ public class TestPublishKafkaMockParameterized { public void serialize(final RecordHeader recordHeader, final JsonGenerator jsonGenerator, final SerializerProvider serializerProvider) throws IOException { jsonGenerator.writeStartObject(); - jsonGenerator.writeObjectField("RecordHeader-key", + jsonGenerator.writeStringField("RecordHeader-key", (recordHeader.key() == null) ? null : recordHeader.key()); jsonGenerator.writeObjectField("RecordHeader-value", (recordHeader.value() == null) ? null : new String(recordHeader.value(), StandardCharsets.UTF_8)); @@ -174,11 +170,15 @@ public class TestPublishKafkaMockParameterized { public void serialize(ProducerRecord<byte[], byte[]> producerRecord, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { jsonGenerator.writeStartObject(); - jsonGenerator.writeObjectField("ProducerRecord-key", - (producerRecord.key() == null) ? null : objectMapper.readTree(producerRecord.key())); + jsonGenerator.writeStringField("ProducerRecord-key", + (producerRecord.key() == null) ? null : new String(producerRecord.key(), StandardCharsets.UTF_8)); jsonGenerator.writeObjectField("ProducerRecord-value", (producerRecord.value() == null) ? null : objectMapper.readTree(producerRecord.value())); - jsonGenerator.writeObjectField("ProducerRecord-headers", producerRecord.headers()); + final List<Header> headers = new ArrayList<>(); + producerRecord.headers().forEach(headers::add); + final List<Header> headersSorted = headers.stream() + .sorted(Comparator.comparing(Header::key)).collect(Collectors.toList()); + jsonGenerator.writeObjectField("ProducerRecord-headers", headersSorted); jsonGenerator.writeEndObject(); } } @@ -192,15 +192,8 @@ public class TestPublishKafkaMockParameterized { return attributes; } - private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords) - throws InitializationException { - final String readerId = "record-reader"; - final RecordReaderFactory readerService = new JsonTreeReader(); - final String writerId = "record-writer"; - final RecordSetWriterFactory writerService = new JsonRecordSetWriter(); - final String keyWriterId = "record-key-writer"; - final RecordSetWriterFactory keyWriterService = new JsonRecordSetWriter(); - final PublishKafkaRecord_2_6 processor = new PublishKafkaRecord_2_6() { + private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords) { + final PublishKafka_2_6 processor = new PublishKafka_2_6() { @Override protected PublisherPool createPublisherPool(final ProcessContext context) { return getPublisherPool(producedRecords, context); @@ -208,15 +201,6 @@ public class TestPublishKafkaMockParameterized { }; final TestRunner runner = TestRunners.newTestRunner(processor); runner.setValidateExpressionUsage(false); - runner.addControllerService(readerId, readerService); - runner.enableControllerService(readerService); - runner.setProperty(readerId, readerId); - runner.addControllerService(writerId, writerService); - runner.enableControllerService(writerService); - runner.setProperty(writerId, writerId); - runner.addControllerService(keyWriterId, keyWriterService); - runner.enableControllerService(keyWriterService); - runner.setProperty(keyWriterId, keyWriterId); return runner; } @@ -229,10 +213,8 @@ public class TestPublishKafkaMockParameterized { final boolean useTransactions = context.getProperty("use-transactions").asBoolean(); final String transactionalIdPrefix = context.getProperty("transactional-id-prefix").evaluateAttributeExpressions().getValue(); Supplier<String> transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix); - final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue()); final String charsetName = context.getProperty("message-header-encoding").evaluateAttributeExpressions().getValue(); final Charset charset = Charset.forName(charsetName); - final RecordSetWriterFactory recordKeyWriterFactory = context.getProperty("record-key-writer").asControllerService(RecordSetWriterFactory.class); return new PublisherPool( Collections.emptyMap(), @@ -243,8 +225,8 @@ public class TestPublishKafkaMockParameterized { transactionalIdSupplier, attributeNamePattern, charset, - publishStrategy, - recordKeyWriterFactory) { + null, + null) { @Override public PublisherLease obtainPublisher() { return getPublisherLease(producedRecords, context); @@ -259,9 +241,6 @@ public class TestPublishKafkaMockParameterized { final ProcessContext context) { final String attributeNameRegex = context.getProperty("attribute-name-regex").getValue(); final Pattern patternAttributeName = (attributeNameRegex == null) ? null : Pattern.compile(attributeNameRegex); - final RecordSetWriterFactory keyWriterFactory = context.getProperty("record-key-writer") - .asControllerService(RecordSetWriterFactory.class); - final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue()); final Producer<byte[], byte[]> producer = mock(ProducerBB.class); when(producer.send(any(), any())).then(invocation -> { @@ -280,8 +259,8 @@ public class TestPublishKafkaMockParameterized { true, patternAttributeName, UTF_8, - publishStrategy, - keyWriterFactory) { + null, + null) { @Override protected long getTimestamp() { return 1000000000000L; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecordMockParameterized.java similarity index 86% copy from nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java copy to nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecordMockParameterized.java index 4e97a6da84..f5785ac803 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecordMockParameterized.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.kafka.pubsub; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.util.DefaultIndenter; import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; import com.fasterxml.jackson.databind.JsonSerializer; @@ -72,7 +73,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class TestPublishKafkaMockParameterized { +public class TestPublishKafkaRecordMockParameterized { private final Logger logger = LoggerFactory.getLogger(getClass()); private final ObjectMapper mapper = getObjectMapper(); @@ -90,7 +91,17 @@ public class TestPublishKafkaMockParameterized { "PublishRecord/parameterized/kafkaOutputAV.json"), arguments("PublishRecord/parameterized/flowfileInputA.json", "key", ".*2", getAttributes(), PublishStrategy.USE_WRAPPER, - "PublishRecord/parameterized/kafkaOutputAW.json") + "PublishRecord/parameterized/kafkaOutputAW.json"), + + arguments("PublishRecord/parameterized/flowfileInputDoc1V.json", + "account", "attribute.*", getAttributesDoc1(), PublishStrategy.USE_VALUE, + "PublishRecord/parameterized/kafkaOutputDoc1V.json"), + arguments("PublishRecord/parameterized/flowfileInputDoc1W.json", + null, null, Collections.emptyMap(), PublishStrategy.USE_WRAPPER, + "PublishRecord/parameterized/kafkaOutputDoc1W.json"), + arguments("PublishRecord/parameterized/flowfileInputDoc2W.json", + null, null, Collections.emptyMap(), PublishStrategy.USE_WRAPPER, + "PublishRecord/parameterized/kafkaOutputDoc2W.json") ); } @@ -113,8 +124,12 @@ public class TestPublishKafkaMockParameterized { final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>(); final TestRunner runner = getTestRunner(producedRecords); runner.setProperty("topic", "test-topic"); - runner.setProperty("attribute-name-regex", attributeNameRegex); - runner.setProperty("message-key-field", messageKeyField); + if (attributeNameRegex != null) { + runner.setProperty("attribute-name-regex", attributeNameRegex); + } + if (messageKeyField != null) { + runner.setProperty("message-key-field", messageKeyField); + } runner.setProperty("publish-strategy", publishStrategy.name()); runner.enqueue(flowFile); runner.run(1); @@ -174,13 +189,23 @@ public class TestPublishKafkaMockParameterized { public void serialize(ProducerRecord<byte[], byte[]> producerRecord, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { jsonGenerator.writeStartObject(); - jsonGenerator.writeObjectField("ProducerRecord-key", - (producerRecord.key() == null) ? null : objectMapper.readTree(producerRecord.key())); - jsonGenerator.writeObjectField("ProducerRecord-value", - (producerRecord.value() == null) ? null : objectMapper.readTree(producerRecord.value())); + serializeField(jsonGenerator, "ProducerRecord-key", producerRecord.key()); + serializeField(jsonGenerator, "ProducerRecord-value", producerRecord.value()); jsonGenerator.writeObjectField("ProducerRecord-headers", producerRecord.headers()); jsonGenerator.writeEndObject(); } + + private void serializeField(final JsonGenerator jsonGenerator, final String key, final byte[] value) throws IOException { + if (value == null) { + jsonGenerator.writeObjectField(key, null); + } else { + try { + jsonGenerator.writeObjectField(key, objectMapper.readTree(value)); + } catch (final JsonParseException e) { + jsonGenerator.writeStringField(key, new String(value, UTF_8)); + } + } + } } private static Map<String, String> getAttributes() { @@ -192,6 +217,14 @@ public class TestPublishKafkaMockParameterized { return attributes; } + private static Map<String, String> getAttributesDoc1() { + final Map<String, String> attributes = new TreeMap<>(); + attributes.put("attributeA", "valueA"); + attributes.put("attributeB", "valueB"); + attributes.put("otherAttribute", "otherValue"); + return attributes; + } + private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords) throws InitializationException { final String readerId = "record-reader"; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInput1.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInput1.json new file mode 100644 index 0000000000..7764158297 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInput1.json @@ -0,0 +1,8 @@ +{ + "address": "1234 First Street", + "zip": "12345", + "account": { + "name": "Acme", + "number": "AC1234" + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInputA.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInputA.json new file mode 100644 index 0000000000..ee19739c31 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInputA.json @@ -0,0 +1,12 @@ +{ + "key": { + "type": "person" + }, + "value": { + "name": "Mark", + "number": 49 + }, + "headers": { + "headerA": "headerAValue" + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1A.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1A.json new file mode 100644 index 0000000000..1fb83cc47c --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1A.json @@ -0,0 +1,18 @@ +{ + "ProducerRecord-key" : "key1A", + "ProducerRecord-value" : { + "address" : "1234 First Street", + "zip" : "12345", + "account" : { + "name" : "Acme", + "number" : "AC1234" + } + }, + "ProducerRecord-headers" : [ { + "RecordHeader-key" : "attrKeyA1", + "RecordHeader-value" : "attrValueA1" + }, { + "RecordHeader-key" : "attrKeyA2", + "RecordHeader-value" : "attrValueA2" + } ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1B.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1B.json new file mode 100644 index 0000000000..49e9804e43 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1B.json @@ -0,0 +1,18 @@ +{ + "ProducerRecord-key" : "key1B", + "ProducerRecord-value" : { + "address" : "1234 First Street", + "zip" : "12345", + "account" : { + "name" : "Acme", + "number" : "AC1234" + } + }, + "ProducerRecord-headers" : [ { + "RecordHeader-key" : "attrKeyB1", + "RecordHeader-value" : "attrValueB1" + }, { + "RecordHeader-key" : "attrKeyB2", + "RecordHeader-value" : "attrValueB2" + } ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA1.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA1.json new file mode 100644 index 0000000000..baa3a1a927 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA1.json @@ -0,0 +1,22 @@ +{ + "ProducerRecord-key" : "keyA1", + "ProducerRecord-value" : { + "key" : { + "type" : "person" + }, + "value" : { + "name" : "Mark", + "number" : 49 + }, + "headers" : { + "headerA" : "headerAValue" + } + }, + "ProducerRecord-headers" : [ { + "RecordHeader-key" : "attrKeyA1", + "RecordHeader-value" : "attrValueA1" + }, { + "RecordHeader-key" : "attrKeyB1", + "RecordHeader-value" : "attrValueB1" + } ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA2.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA2.json new file mode 100644 index 0000000000..ba620f70b7 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA2.json @@ -0,0 +1,22 @@ +{ + "ProducerRecord-key" : "keyA2", + "ProducerRecord-value" : { + "key" : { + "type" : "person" + }, + "value" : { + "name" : "Mark", + "number" : 49 + }, + "headers" : { + "headerA" : "headerAValue" + } + }, + "ProducerRecord-headers" : [ { + "RecordHeader-key" : "attrKeyA2", + "RecordHeader-value" : "attrValueA2" + }, { + "RecordHeader-key" : "attrKeyB2", + "RecordHeader-value" : "attrValueB2" + } ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1V.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1V.json new file mode 100644 index 0000000000..7764158297 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1V.json @@ -0,0 +1,8 @@ +{ + "address": "1234 First Street", + "zip": "12345", + "account": { + "name": "Acme", + "number": "AC1234" + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1W.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1W.json new file mode 100644 index 0000000000..72db3c15ff --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1W.json @@ -0,0 +1,15 @@ +{ + "key": "Acme Holdings", + "value": { + "address": "1234 First Street", + "zip": "12345", + "account": { + "name": "Acme", + "number":"AC1234" + } + }, + "headers": { + "accountType": "enterprise", + "test": "true" + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc2W.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc2W.json new file mode 100644 index 0000000000..1a2087851b --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc2W.json @@ -0,0 +1,15 @@ +{ + "key": { + "accountName": "Acme Holdings", + "accountHolder": "John Doe", + "accountId": "280182830-A009" + }, + "value": { + "address": "1234 First Street", + "zip": "12345", + "account": { + "name": "Acme", + "number":"AC1234" + } + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1V.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1V.json new file mode 100644 index 0000000000..146b5fdb00 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1V.json @@ -0,0 +1,21 @@ +{ + "ProducerRecord-key" : { + "name" : "Acme", + "number" : "AC1234" + }, + "ProducerRecord-value" : { + "address" : "1234 First Street", + "zip" : "12345", + "account" : { + "name" : "Acme", + "number" : "AC1234" + } + }, + "ProducerRecord-headers" : [ { + "RecordHeader-key" : "attributeA", + "RecordHeader-value" : "valueA" + }, { + "RecordHeader-key" : "attributeB", + "RecordHeader-value" : "valueB" + } ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1W.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1W.json new file mode 100644 index 0000000000..f774579d56 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1W.json @@ -0,0 +1,18 @@ +{ + "ProducerRecord-key" : "Acme Holdings", + "ProducerRecord-value" : { + "address" : "1234 First Street", + "zip" : "12345", + "account" : { + "name" : "Acme", + "number" : "AC1234" + } + }, + "ProducerRecord-headers" : [ { + "RecordHeader-key" : "accountType", + "RecordHeader-value" : "enterprise" + }, { + "RecordHeader-key" : "test", + "RecordHeader-value" : "true" + } ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc2W.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc2W.json new file mode 100644 index 0000000000..a38fb4e6c1 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc2W.json @@ -0,0 +1,16 @@ +{ + "ProducerRecord-key" : { + "accountName" : "Acme Holdings", + "accountHolder" : "John Doe", + "accountId" : "280182830-A009" + }, + "ProducerRecord-value" : { + "address" : "1234 First Street", + "zip" : "12345", + "account" : { + "name" : "Acme", + "number" : "AC1234" + } + }, + "ProducerRecord-headers" : [ ] +} \ No newline at end of file
