markap14 commented on code in PR #6045:
URL: https://github.com/apache/nifi/pull/6045#discussion_r874777463


##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, 
final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> 
consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = 
toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = 
toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = 
toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = 
toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), 
tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), 
tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), 
tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), 
tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), 
tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final 
ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : 
consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {

Review Comment:
   Took me a minute to figure out what this mean - `KafkaProcessorUtils.RECORD` 
wasn't immediately obvious to me. Perhaps it makes sense to rename `RECORD`, 
`STRING`, etc. to something that makes more sense outside the context, such as 
`KEY_AS_RECORD`, `KEY_AS_STRING` etc.? Is a bit of a nitpick and you can feel 
free to ignore if you want.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##########
@@ -426,11 +465,13 @@ protected ConsumerPool createConsumerPool(final 
ProcessContext context, final Co
             }
 
             return new ConsumerPool(maxLeases, readerFactory, writerFactory, 
props, topics, maxUncommittedTime, securityProtocol,

Review Comment:
   Doesn't necessarily need to be done in this ticket. But probably makes sense 
to introduce a Builder pattern here instead of so many constructor args. It 
made sense before this, too, though :)



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, 
final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> 
consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = 
toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = 
toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = 
toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = 
toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), 
tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), 
tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), 
tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), 
tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), 
tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final 
ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : 
consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = 
getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = 
keyReaderFactory.createRecordReader(attributes, is, key.length, logger);
+            final Record record = reader.nextRecord();
+            final RecordField recordField = new RecordField("key", 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+            tuple = new Tuple<>(recordField, record);
+        } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) {
+            final RecordField recordField = new RecordField("key", 
RecordFieldType.STRING.getDataType());
+            tuple = new Tuple<>(recordField, new String(key, 
StandardCharsets.UTF_8));

Review Comment:
   We'll definitely want to make sure that we document that this strategy 
requires that the key be a UTF-8 compatible String. And we should probably 
ensure that we test with a non-UTF-8 compatible String. In that case, the 
record should probably go to the parse.failure relationship.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, 
final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> 
consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = 
toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = 
toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = 
toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = 
toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), 
tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), 
tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), 
tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), 
tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), 
tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final 
ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : 
consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = 
getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = 
keyReaderFactory.createRecordReader(attributes, is, key.length, logger);
+            final Record record = reader.nextRecord();
+            final RecordField recordField = new RecordField("key", 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+            tuple = new Tuple<>(recordField, record);
+        } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) {
+            final RecordField recordField = new RecordField("key", 
RecordFieldType.STRING.getDataType());
+            tuple = new Tuple<>(recordField, new String(key, 
StandardCharsets.UTF_8));
+        } else {
+            final RecordField recordField = new RecordField("key",
+                    
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
+            tuple = new Tuple<>(recordField, key);

Review Comment:
   Eventually we need to introduce a BYTES data type for Records. Right now, 
when we have an Array of type Byte, the record api expects this to be an array 
of `Byte` objects, not primitive bytes. So, as inefficient as it is, in this 
case, I think we need to create a `Byte[]` for the key instead of provide the 
`byte[]`.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java:
##########
@@ -193,10 +197,21 @@ void publish(final FlowFile flowFile, final RecordSet 
recordSet, final RecordSet
                     additionalAttributes = writeResult.getAttributes();
                     writer.flush();
                 }
-
                 final byte[] messageContent = baos.toByteArray();
-                final String key = messageKeyField == null ? null : 
record.getAsString(messageKeyField);
-                final byte[] messageKey = (key == null) ? null : 
key.getBytes(StandardCharsets.UTF_8);
+
+                final byte[] messageKey;
+                if (recordKeyWriterFactory == null) {
+                    messageKey = 
Optional.ofNullable(record.getAsString(messageKeyField))
+                            .map(s -> 
s.getBytes(StandardCharsets.UTF_8)).orElse(null);
+                } else {
+                    final ByteArrayOutputStream os = new 
ByteArrayOutputStream(1024);

Review Comment:
   Should add the BAOS to the try-with-resources below.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##########
@@ -255,6 +255,36 @@ <h3>SASL_SSL</h3>
             See the SSL section for a description of how to configure the SSL 
Context Service based on the
             ssl.client.auth property.
         </p>
+<h2>Output Modes</h2>
+<div>
+<p>This processor (NiFi 1.17+) offers multiple output strategies (configured 
via processor property "Output Strategy")
+for converting Kafka records into flow files.
+- Output Strategy "Write Value Only" (the default) emits flowfile records 
containing only the Kafka record value.
+- Output Strategy "Use Wrapper" (new) emits flowfile records containing the 
Kafka record key, value, and headers, as
+well as additional metadata from the Kafka record.</p>
+
+<p>If the Output Strategy property "Use Wrapper" is active, an additional 
processor configuration property is activated,
+to fine-tune the transformation of the incoming Kafka record.
+- The format of the Kafka record key may be interpreted via property "Key 
Format" as "Byte Array", "String", or
+"Record".
+-- "Byte Array" supplies the Kafka Record Key bytes unchanged from the 
incoming Kafka record.
+-- "String" converts the Kafka Record Key bytes into a string using the UTF-8 
character encoding.
+-- "Record" converts the Kafka Record Key bytes into a deserialized NiFi 
record, using the associated "Key Record
+Reader" controller service.</p>
+
+<p>If the Key Format property is set to "Record", an additional processor 
configuration property is activated.
+- "Key Record Reader" is used to specify the controller service that is used 
to deserialize the key bytes.  It may be
+set to any available implementation of the NiFi "RecordReaderFactory" 
interface.</p>
+
+<p>These new processor properties may be used to extend the capabilities of 
ConsumeKafkaRecord_2_6, by optionally
+incorporating additional information from the Kafka record (key, headers, 
metadata) into the outbound flowfile.  And
+the Kafka key data may now be interpreted as a record, rather than as a 
string, enabling additional decision-making by
+downstream processors in your flow.</p>
+
+<p>Additionally, the choice of the "Output Strategy" processor property 
affects the related properties "Headers to Add
+as Attributes (Regex)" and "Key Attribute Encoding".  These properties are 
available only when "Output Strategy" is set
+to "Write Value Only".</p>

Review Comment:
   Might make sense to mention the reason they are only available when Output 
Strategy = Write Value Only. I.e., because it doesn't make sense when writing 
Records, as the Headers and keys are not attributes, they are part of the 
Record/FlowFile content.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, 
final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> 
consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = 
toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = 
toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = 
toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = 
toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), 
tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), 
tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), 
tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), 
tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), 
tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final 
ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : 
consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = 
getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = 
keyReaderFactory.createRecordReader(attributes, is, key.length, logger);
+            final Record record = reader.nextRecord();
+            final RecordField recordField = new RecordField("key", 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+            tuple = new Tuple<>(recordField, record);
+        } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) {
+            final RecordField recordField = new RecordField("key", 
RecordFieldType.STRING.getDataType());
+            tuple = new Tuple<>(recordField, new String(key, 
StandardCharsets.UTF_8));
+        } else {
+            final RecordField recordField = new RecordField("key",
+                    
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
+            tuple = new Tuple<>(recordField, key);
+        }
+        return tuple;
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordValue(final Record 
record) {
+        final RecordField recordField = new RecordField(
+                "value", 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+        return new Tuple<>(recordField, record);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordHeaders(final 
ConsumerRecord<byte[], byte[]> consumerRecord) {
+        final RecordField recordField = new RecordField(
+                "headers", 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        final Map<String, String> headers = new HashMap<>();
+        Arrays.stream(consumerRecord.headers().toArray()).forEach(
+                h -> headers.put(h.key(), new String(h.value(), 
StandardCharsets.UTF_8)));

Review Comment:
   Also looks like we have a member variable already for `headerCharacterSet` - 
probably makes sense to use that, rather than `StandardCharsets.UTF_8`



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, 
final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> 
consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = 
toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = 
toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = 
toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = 
toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), 
tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), 
tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), 
tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), 
tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), 
tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final 
ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : 
consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = 
getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = 
keyReaderFactory.createRecordReader(attributes, is, key.length, logger);
+            final Record record = reader.nextRecord();
+            final RecordField recordField = new RecordField("key", 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+            tuple = new Tuple<>(recordField, record);
+        } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) {
+            final RecordField recordField = new RecordField("key", 
RecordFieldType.STRING.getDataType());
+            tuple = new Tuple<>(recordField, new String(key, 
StandardCharsets.UTF_8));
+        } else {
+            final RecordField recordField = new RecordField("key",
+                    
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
+            tuple = new Tuple<>(recordField, key);
+        }
+        return tuple;
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordValue(final Record 
record) {
+        final RecordField recordField = new RecordField(
+                "value", 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+        return new Tuple<>(recordField, record);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordHeaders(final 
ConsumerRecord<byte[], byte[]> consumerRecord) {
+        final RecordField recordField = new RecordField(
+                "headers", 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        final Map<String, String> headers = new HashMap<>();
+        Arrays.stream(consumerRecord.headers().toArray()).forEach(
+                h -> headers.put(h.key(), new String(h.value(), 
StandardCharsets.UTF_8)));

Review Comment:
   This is kind of expensive. The call to `toArray()` has to create a new array 
to hold the objects. Then we call `Arrays.stream()` and the creation of a 
Stream is quite expensive. And we do this just in order to iterate over the 
elements. But `Headers` extends `Iterable<Header>` so we could instead just use 
something like:
   ```
   for (final Header header : consumerRecord.headers()) {
     headers.put(header.key(), new String(header.value(), 
StandardCharsets.UTF_8));
   }
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, 
final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> 
consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = 
toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = 
toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = 
toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = 
toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), 
tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), 
tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), 
tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), 
tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), 
tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final 
ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : 
consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = 
getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = 
keyReaderFactory.createRecordReader(attributes, is, key.length, logger);
+            final Record record = reader.nextRecord();
+            final RecordField recordField = new RecordField("key", 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+            tuple = new Tuple<>(recordField, record);
+        } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) {
+            final RecordField recordField = new RecordField("key", 
RecordFieldType.STRING.getDataType());
+            tuple = new Tuple<>(recordField, new String(key, 
StandardCharsets.UTF_8));
+        } else {
+            final RecordField recordField = new RecordField("key",
+                    
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
+            tuple = new Tuple<>(recordField, key);
+        }
+        return tuple;
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordValue(final Record 
record) {
+        final RecordField recordField = new RecordField(
+                "value", 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+        return new Tuple<>(recordField, record);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordHeaders(final 
ConsumerRecord<byte[], byte[]> consumerRecord) {
+        final RecordField recordField = new RecordField(
+                "headers", 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        final Map<String, String> headers = new HashMap<>();
+        Arrays.stream(consumerRecord.headers().toArray()).forEach(
+                h -> headers.put(h.key(), new String(h.value(), 
StandardCharsets.UTF_8)));
+        return new Tuple<>(recordField, headers);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordMetadata(final 
ConsumerRecord<byte[], byte[]> consumerRecord) {
+        final RecordField recordField = new RecordField(
+                "metadata", 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        final Map<String, String> metadata = new HashMap<>();
+        metadata.put("topic", consumerRecord.topic());
+        metadata.put("partition", 
Integer.toString(consumerRecord.partition()));
+        metadata.put("offset", Long.toString(consumerRecord.offset()));
+        metadata.put("timestamp", Long.toString(consumerRecord.timestamp()));
+        return new Tuple<>(recordField, metadata);

Review Comment:
   Given that the keys of this map are well known, and 3 of the fields are 
numeric, perhaps rather than a Map here, we should use a Record. Then we can 
use proper numeric types, even a TIMESTAMP type for the `timestamp` field?



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##########
@@ -255,6 +255,36 @@ <h3>SASL_SSL</h3>
             See the SSL section for a description of how to configure the SSL 
Context Service based on the
             ssl.client.auth property.
         </p>
+<h2>Output Modes</h2>
+<div>
+<p>This processor (NiFi 1.17+) offers multiple output strategies (configured 
via processor property "Output Strategy")
+for converting Kafka records into flow files.
+- Output Strategy "Write Value Only" (the default) emits flowfile records 
containing only the Kafka record value.
+- Output Strategy "Use Wrapper" (new) emits flowfile records containing the 
Kafka record key, value, and headers, as
+well as additional metadata from the Kafka record.</p>

Review Comment:
   Should probably use `<ul>` with `<li>` rather than `-` for denoting lists.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, 
final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> 
consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = 
toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = 
toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = 
toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = 
toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), 
tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), 
tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), 
tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), 
tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), 
tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final 
ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : 
consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = 
getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = 
keyReaderFactory.createRecordReader(attributes, is, key.length, logger);

Review Comment:
   Should use try-with-resources here to ensure that we close the InputStream 
and the Record Reader.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##########
@@ -282,10 +314,13 @@ public class ConsumeKafkaRecord_2_6 extends 
AbstractProcessor implements Verifia
         descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
         descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
         descriptors.add(SEPARATE_BY_KEY);
+        descriptors.add(OUTPUT_STRATEGY);

Review Comment:
   Output Strategy (and related properties) is going to be a very important 
thing for the user to think through when configuring this. Because of that, I'd 
recommend moving this property up in the list to just after Group ID



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##########
@@ -255,6 +255,36 @@ <h3>SASL_SSL</h3>
             See the SSL section for a description of how to configure the SSL 
Context Service based on the
             ssl.client.auth property.
         </p>
+<h2>Output Modes</h2>
+<div>
+<p>This processor (NiFi 1.17+) offers multiple output strategies (configured 
via processor property "Output Strategy")
+for converting Kafka records into flow files.
+- Output Strategy "Write Value Only" (the default) emits flowfile records 
containing only the Kafka record value.
+- Output Strategy "Use Wrapper" (new) emits flowfile records containing the 
Kafka record key, value, and headers, as

Review Comment:
   We need to be sure that we call out the Record Schema that will be used here.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, 
final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> 
consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = 
toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = 
toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = 
toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = 
toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new 
SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), 
tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), 
tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), 
tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), 
tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), 
tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final 
ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, 
MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : 
consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = 
getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = 
keyReaderFactory.createRecordReader(attributes, is, key.length, logger);
+            final Record record = reader.nextRecord();
+            final RecordField recordField = new RecordField("key", 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+            tuple = new Tuple<>(recordField, record);
+        } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) {
+            final RecordField recordField = new RecordField("key", 
RecordFieldType.STRING.getDataType());
+            tuple = new Tuple<>(recordField, new String(key, 
StandardCharsets.UTF_8));
+        } else {
+            final RecordField recordField = new RecordField("key",
+                    
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
+            tuple = new Tuple<>(recordField, key);
+        }
+        return tuple;
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordValue(final Record 
record) {
+        final RecordField recordField = new RecordField(
+                "value", 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+        return new Tuple<>(recordField, record);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordHeaders(final 
ConsumerRecord<byte[], byte[]> consumerRecord) {
+        final RecordField recordField = new RecordField(
+                "headers", 
RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        final Map<String, String> headers = new HashMap<>();
+        Arrays.stream(consumerRecord.headers().toArray()).forEach(
+                h -> headers.put(h.key(), new String(h.value(), 
StandardCharsets.UTF_8)));

Review Comment:
   We'll also want to be sure in our documentation when it's written, that we 
mention that headers must be UTF-8 compatible. But unlike the Key, I think it 
makes sense, perhaps, to just log a warning and continue on if a Header cannot 
be parsed as a UTF-8 String.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java:
##########
@@ -193,10 +197,21 @@ void publish(final FlowFile flowFile, final RecordSet 
recordSet, final RecordSet
                     additionalAttributes = writeResult.getAttributes();
                     writer.flush();
                 }
-
                 final byte[] messageContent = baos.toByteArray();
-                final String key = messageKeyField == null ? null : 
record.getAsString(messageKeyField);
-                final byte[] messageKey = (key == null) ? null : 
key.getBytes(StandardCharsets.UTF_8);
+
+                final byte[] messageKey;
+                if (recordKeyWriterFactory == null) {
+                    messageKey = 
Optional.ofNullable(record.getAsString(messageKeyField))
+                            .map(s -> 
s.getBytes(StandardCharsets.UTF_8)).orElse(null);
+                } else {
+                    final ByteArrayOutputStream os = new 
ByteArrayOutputStream(1024);
+                    final MapRecord keyRecord = (MapRecord) 
record.getValue(messageKeyField);

Review Comment:
   Should probably be using `Record` here - not `MapRecord`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to