This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b426226207 NIFI-10993 - PublishKafkaRecord should use correct record
schema
b426226207 is described below
commit b426226207e031b2808a3a759ebf6ab05d1400f9
Author: Paul Grey <[email protected]>
AuthorDate: Mon Dec 19 18:44:20 2022 -0500
NIFI-10993 - PublishKafkaRecord should use correct record schema
This closes #6833.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../processors/kafka/pubsub/PublisherLease.java | 2 +-
.../kafka/pubsub/TestPublishKafkaMock.java | 205 +++++++++++++++++----
2 files changed, 175 insertions(+), 32 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index da37db0319..ab4a283d3a 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -356,7 +356,7 @@ public class PublisherLease implements Closeable {
}
final Record record = (Record) object;
- final RecordSchema schema = record.getSchema();
+ final RecordSchema schema =
writerFactory.getSchema(flowFile.getAttributes(), record.getSchema());
try (final ByteArrayOutputStream baos = new
ByteArrayOutputStream();
final RecordSetWriter writer =
writerFactory.createWriter(logger, schema, baos, flowFile)) {
writer.write(record);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java
index e379dc70bf..64b1bf248b 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java
@@ -19,12 +19,16 @@ package org.apache.nifi.processors.kafka.pubsub;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
+import org.apache.nifi.avro.AvroRecordSetWriter;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.kafka.shared.property.PublishStrategy;
@@ -35,13 +39,21 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -146,14 +158,16 @@ public class TestPublishKafkaMock {
@Test
public void testPublishRecordWrapperStrategyNullKey() throws
JsonProcessingException, InitializationException {
// create flowfile to publish
- final Map<String, String> attributes = new TreeMap<>();
- attributes.put("attrKeyA", "attrValueA");
- attributes.put("attrKeyB", "attrValueB");
- attributes.put("messageKey", "this-is-a-key");
- final ObjectNode node = mapper.createObjectNode().put("recordA",
1).put("recordB", "valueB");
- final String value = mapper.writeValueAsString(node);
+ final ObjectNode valueNode = mapper.createObjectNode()
+ .put("recordA", 1).put("recordB", "valueB");
+ final ObjectNode recordNode = mapper.createObjectNode();
+ recordNode.set("metadata", mapper.createObjectNode()
+ .put("topic", TEST_TOPIC_PUBLISH));
+ recordNode.set("headers", mapper.createObjectNode()
+ .put("attrKeyA", "attrValueA").put("attrKeyB", "attrValueB"));
+ recordNode.set("value", valueNode);
+ final String value = mapper.writeValueAsString(recordNode);
final MockFlowFile flowFile = new MockFlowFile(++ordinal);
- flowFile.putAttributes(attributes);
flowFile.setData(value.getBytes(UTF_8));
// publish flowfile
final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new
ArrayList<>();
@@ -174,20 +188,26 @@ public class TestPublishKafkaMock {
assertEquals("attrValueB", new
String(headers.lastHeader("attrKeyB").value(), UTF_8));
assertNull(record.key());
assertNotNull(record.value());
- assertEquals(value, new String(record.value(), UTF_8));
+ final String valueStringExpected =
mapper.writeValueAsString(valueNode);
+ final String valueStringActual = new String(record.value(), UTF_8);
+ assertEquals(valueStringExpected, valueStringActual);
}
@Test
public void testPublishRecordWrapperStrategyStringKey() throws
JsonProcessingException, InitializationException {
// create flowfile to publish
- final Map<String, String> attributes = new TreeMap<>();
- attributes.put("attrKeyA", "attrValueA");
- attributes.put("attrKeyB", "attrValueB");
- attributes.put("messageKey", "this-is-a-key");
- final ObjectNode node = mapper.createObjectNode().put("recordA",
1).put("recordB", "valueB");
- final String value = mapper.writeValueAsString(node);
+ final ObjectNode metadataNode = mapper.createObjectNode()
+ .put("topic", TEST_TOPIC_PUBLISH);
+ final ObjectNode headersNode = mapper.createObjectNode()
+ .put("attrKeyA", "attrValueA").put("attrKeyB", "attrValueB");
+ final ObjectNode valueNode = mapper.createObjectNode().put("recordA",
1).put("recordB", "valueB");
+ final ObjectNode recordNode = mapper.createObjectNode();
+ recordNode.set("metadata", metadataNode);
+ recordNode.set("headers", headersNode);
+ recordNode.put("key", "valueB");
+ recordNode.set("value", valueNode);
+ final String value = mapper.writeValueAsString(recordNode);
final MockFlowFile flowFile = new MockFlowFile(++ordinal);
- flowFile.putAttributes(attributes);
flowFile.setData(value.getBytes(UTF_8));
// publish flowfile
final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new
ArrayList<>();
@@ -204,12 +224,13 @@ public class TestPublishKafkaMock {
final ProducerRecord<byte[], byte[]> record =
producedRecords.iterator().next();
assertEquals(TEST_TOPIC_PUBLISH, record.topic());
final Headers headers = record.headers();
- assertEquals(1, headers.toArray().length);
+ assertEquals(2, headers.toArray().length);
assertEquals("attrValueB", new
String(headers.lastHeader("attrKeyB").value(), UTF_8));
assertNotNull(record.key());
assertEquals("valueB", new String(record.key(), UTF_8));
assertNotNull(record.value());
- assertEquals(value, new String(record.value(), UTF_8));
+ final String valueString = mapper.writeValueAsString(valueNode);
+ assertEquals(valueString, new String(record.value(), UTF_8));
}
@Test
@@ -219,8 +240,11 @@ public class TestPublishKafkaMock {
attributes.put("attrKeyA", "attrValueA");
attributes.put("attrKeyB", "attrValueB");
attributes.put("messageKey", "this-is-a-key");
- final ObjectNode node = mapper.createObjectNode().put("recordA",
1).put("recordB", "valueB");
- final String value = mapper.writeValueAsString(node);
+ final ObjectNode valueNode = mapper.createObjectNode().put("recordA",
1).put("recordB", "valueB");
+ final ObjectNode recordNode = mapper.createObjectNode();
+ recordNode.put("key", "valueB");
+ recordNode.set("value", valueNode);
+ final String value = mapper.writeValueAsString(recordNode);
final MockFlowFile flowFile = new MockFlowFile(++ordinal);
flowFile.putAttributes(attributes);
flowFile.setData(value.getBytes(UTF_8));
@@ -239,7 +263,8 @@ public class TestPublishKafkaMock {
assertEquals(1, producedRecords.size());
final ProducerRecord<byte[], byte[]> producedRecord =
producedRecords.iterator().next();
assertEquals("valueB", new String(producedRecord.key(), UTF_8));
- assertEquals(value, new String(producedRecord.value(), UTF_8));
+ final String valueString = mapper.writeValueAsString(valueNode);
+ assertEquals(valueString, new String(producedRecord.value(), UTF_8));
final List<MockFlowFile> success =
runner.getFlowFilesForRelationship(PublishKafkaRecord_2_6.REL_SUCCESS);
final MockFlowFile flowFile1 = success.iterator().next();
assertNotNull(flowFile1.getAttribute("uuid"));
@@ -251,10 +276,13 @@ public class TestPublishKafkaMock {
final Map<String, String> attributes = new TreeMap<>();
attributes.put("attrKeyA", "attrValueA");
attributes.put("attrKeyB", "attrValueB");
- final ObjectNode key = mapper.createObjectNode().put("recordKey",
"recordValue");
- final ObjectNode node = mapper.createObjectNode()
- .put("recordA", 1).put("recordB", "valueB").set("recordKey",
key);
- final String value = mapper.writeValueAsString(node);
+ final ObjectNode keyNode = mapper.createObjectNode().put("recordKey",
"recordValue");
+ final ObjectNode valueNode = mapper.createObjectNode()
+ .put("recordA", 1).put("recordB", "valueB");
+ final ObjectNode recordNode = mapper.createObjectNode();
+ recordNode.set("key", keyNode);
+ recordNode.set("value", valueNode);
+ final String value = mapper.writeValueAsString(recordNode);
final MockFlowFile flowFile = new MockFlowFile(++ordinal);
flowFile.putAttributes(attributes);
flowFile.setData(value.getBytes(UTF_8));
@@ -263,8 +291,7 @@ public class TestPublishKafkaMock {
final TestRunner runner = getTestRunner(producedRecords);
runner.setProperty("topic", TEST_TOPIC_PUBLISH);
runner.setProperty("publish-strategy",
PublishStrategy.USE_WRAPPER.name());
- runner.setProperty("message-key-field", "recordKey");
- runner.setProperty("record-key-writer", "record-writer");
+ runner.setProperty("record-key-writer", "record-key-writer");
runner.enqueue(flowFile);
runner.run(1);
// verify results
@@ -275,10 +302,13 @@ public class TestPublishKafkaMock {
final Headers headers = record.headers();
assertEquals(0, headers.toArray().length);
assertNotNull(record.key());
- final String keyString = mapper.writeValueAsString(key);
- assertEquals(keyString, new String(record.key(), UTF_8));
+ final String keyStringExpected = mapper.writeValueAsString(keyNode);
+ final String keyStringActual = new String(record.key(), UTF_8);
+ assertEquals(keyStringExpected, keyStringActual);
assertNotNull(record.value());
- assertEquals(value, new String(record.value(), UTF_8));
+ final String valueStringExpected =
mapper.writeValueAsString(valueNode);
+ final String valueStringActual = new String(record.value(), UTF_8);
+ assertEquals(valueStringExpected, valueStringActual);
}
private TestRunner getTestRunner(final Collection<ProducerRecord<byte[],
byte[]>> producedRecords)
@@ -309,6 +339,119 @@ public class TestPublishKafkaMock {
return runner;
}
+ @Test
+ public void testPublishRecordWrapperStrategyRecordKeySchema() throws
IOException, InitializationException {
+ // create flowfile to publish
+ final Map<String, String> attributes = new TreeMap<>();
+ attributes.put("schema-access-strategy", "schema-name");
+ attributes.put("schema.name", "schemaValue");
+ attributes.put("schema.key.name", "schemaKey");
+ final ObjectNode recordKey = mapper.createObjectNode()
+ .put("keyA", "value1")
+ .put("keyB", "value2");
+ final ObjectNode recordValue = mapper.createObjectNode()
+ .put("valueA", "value1")
+ .put("valueB", "value2");
+ final ObjectNode record = mapper.createObjectNode();
+ record.set("key", recordKey);
+ record.set("value", recordValue);
+ final String value = mapper.writeValueAsString(record);
+ final MockFlowFile flowFile = new MockFlowFile(++ordinal);
+ flowFile.putAttributes(attributes);
+ flowFile.setData(value.getBytes(UTF_8));
+ // publish flowfile
+ final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new
ArrayList<>();
+ final TestRunner runner = getTestRunnerSchemaRegistry(producedRecords);
+ runner.setProperty("topic", TEST_TOPIC_PUBLISH);
+ runner.setProperty("publish-strategy",
PublishStrategy.USE_WRAPPER.name());
+ runner.setProperty("record-key-writer", "record-key-writer");
+ runner.enqueue(flowFile);
+ runner.run(1);
+
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
+ assertEquals(1, producedRecords.size());
+ final ProducerRecord<byte[], byte[]> producerRecord =
producedRecords.iterator().next();
+
+ final DataFileStream<GenericData.Record> dataReaderKey = new
DataFileStream<>(
+ new ByteArrayInputStream(producerRecord.key()), new
GenericDatumReader<>(null));
+ final GenericData.Record genericRecordKey = dataReaderKey.next();
+ assertEquals("value1", genericRecordKey.get("keyA").toString());
+ assertEquals("value2", genericRecordKey.get("keyB").toString());
+ assertEquals("value3", genericRecordKey.get("keyC").toString());
+
+ final DataFileStream<GenericData.Record> dataReaderValue = new
DataFileStream<>(
+ new ByteArrayInputStream(producerRecord.value()), new
GenericDatumReader<>(null));
+ final GenericData.Record genericRecordValue = dataReaderValue.next();
+ assertEquals("value1", genericRecordValue.get("valueA").toString());
+ assertEquals("value2", genericRecordValue.get("valueB").toString());
+ assertEquals("value3", genericRecordValue.get("valueC").toString());
+ }
+
+ private TestRunner getTestRunnerSchemaRegistry(final
Collection<ProducerRecord<byte[], byte[]>> producedRecords)
+ throws InitializationException {
+ final RecordSchema schemaKey = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("keyA", RecordFieldType.STRING.getDataType()),
+ new RecordField("keyB", RecordFieldType.STRING.getDataType()),
+ new RecordField("keyC", RecordFieldType.STRING.getDataType(),
"value3")));
+ final RecordSchema schemaValue = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("valueA",
RecordFieldType.STRING.getDataType()),
+ new RecordField("valueB",
RecordFieldType.STRING.getDataType()),
+ new RecordField("valueC",
RecordFieldType.STRING.getDataType(), "value3")));
+ final RecordSchema schemaRecord = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("key",
RecordFieldType.RECORD.getRecordDataType(schemaKey)),
+ new RecordField("value",
RecordFieldType.RECORD.getRecordDataType(schemaValue))));
+
+ final String schemaRegistryId = "schema-registry";
+ final MockSchemaRegistry schemaRegistry = new MockSchemaRegistry();
+ schemaRegistry.addSchema("schemaKey", schemaKey);
+ schemaRegistry.addSchema("schemaValue", schemaValue);
+ schemaRegistry.addSchema("schemaRecord", schemaRecord);
+
+ final String readerId = "record-reader";
+ final RecordReaderFactory readerService = new JsonTreeReader();
+
+ final Map<String, String> propertiesReaderService = new TreeMap<>();
+
+ final String writerId = "record-writer";
+ final RecordSetWriterFactory writerService = new AvroRecordSetWriter();
+ final String keyWriterId = "record-key-writer";
+ final RecordSetWriterFactory keyWriterService = new
AvroRecordSetWriter();
+
+ final Map<String, String> propertiesWriterService = new TreeMap<>();
+ propertiesWriterService.put(schemaRegistryId, schemaRegistryId);
+ propertiesWriterService.put("schema-access-strategy", "schema-name");
+ propertiesWriterService.put("schema-name", "schemaValue");
+ final Map<String, String> propertiesWriterServiceKey = new TreeMap<>();
+ propertiesWriterServiceKey.put(schemaRegistryId, schemaRegistryId);
+ propertiesWriterServiceKey.put("schema-access-strategy",
"schema-name");
+ propertiesWriterServiceKey.put("schema-name", "schemaKey");
+
+ final PublishKafkaRecord_2_6 processor = new PublishKafkaRecord_2_6() {
+ @Override
+ protected PublisherPool createPublisherPool(final ProcessContext
context) {
+ return getPublisherPool(producedRecords, context);
+ }
+ };
+ final TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setValidateExpressionUsage(false);
+
+ runner.addControllerService(schemaRegistryId, schemaRegistry);
+ runner.enableControllerService(schemaRegistry);
+
+ runner.addControllerService(readerId, readerService,
propertiesReaderService);
+ runner.enableControllerService(readerService);
+ runner.setProperty(readerId, readerId);
+
+ runner.addControllerService(writerId, writerService,
propertiesWriterService);
+ runner.enableControllerService(writerService);
+ runner.setProperty(writerId, writerId);
+
+ runner.addControllerService(keyWriterId, keyWriterService,
propertiesWriterServiceKey);
+ runner.enableControllerService(keyWriterService);
+ runner.setProperty(keyWriterId, keyWriterId);
+
+ return runner;
+ }
+
private PublisherPool getPublisherPool(final
Collection<ProducerRecord<byte[], byte[]>> producedRecords,
final ProcessContext context) {
final int maxMessageSize =
context.getProperty("max.request.size").asDataSize(DataUnit.B).intValue();
@@ -350,7 +493,7 @@ public class TestPublishKafkaMock {
final Pattern patternAttributeName = (attributeNameRegex == null) ?
null : Pattern.compile(attributeNameRegex);
final RecordSetWriterFactory keyWriterFactory =
context.getProperty("record-key-writer")
.asControllerService(RecordSetWriterFactory.class);
-
+ final PublishStrategy publishStrategy =
PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
final Producer<byte[], byte[]> producer = mock(ProducerBB.class);
when(producer.send(any(), any())).then(invocation -> {
final ProducerRecord<byte[], byte[]> record =
invocation.getArgument(0);
@@ -368,7 +511,7 @@ public class TestPublishKafkaMock {
true,
patternAttributeName,
UTF_8,
- null,
+ publishStrategy,
keyWriterFactory);
}
}