markap14 commented on code in PR #6131:
URL: https://github.com/apache/nifi/pull/6131#discussion_r921594339


##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java:
##########
@@ -193,10 +196,24 @@ void publish(final FlowFile flowFile, final RecordSet 
recordSet, final RecordSet
                     additionalAttributes = writeResult.getAttributes();
                     writer.flush();
                 }
-
                 final byte[] messageContent = baos.toByteArray();
-                final String key = messageKeyField == null ? null : 
record.getAsString(messageKeyField);
-                final byte[] messageKey = (key == null) ? null : 
key.getBytes(StandardCharsets.UTF_8);
+
+                final byte[] messageKey;
+                if ((recordKeyWriterFactory == null) || (messageKeyField == 
null)) {
+                    messageKey = 
Optional.ofNullable(record.getAsString(messageKeyField))
+                            .map(s -> 
s.getBytes(StandardCharsets.UTF_8)).orElse(null);
+                } else {
+                    try (final ByteArrayOutputStream os = new 
ByteArrayOutputStream(1024)) {
+                        final Record keyRecord = 
Optional.ofNullable(record.getValue(messageKeyField))
+                                
.filter(Record.class::isInstance).map(Record.class::cast)
+                                .orElseThrow(() ->  new IOException("The 
property 'Record Key Writer' is defined, but the record key is not a record"));

Review Comment:
   I think I would actually do something like:
   ```
   final Object key;
   final Object keyValue = record.getValue(messageKeyField);
   if (keyValue == null) {
     key = keyValue;
   } else if (keyValue instanceof byte[]) {
     key = (byte[]) keyValue;
   } else if (keyValue instanceof Byte[]) {
     // This case exists because in our Record API we currently don't have a 
BYTES type, we use an Array of type Byte, which creates a Byte[] instead of a 
byte[]. We should address this in the future, but we should account for the log 
here.
     final Byte[] bytes = (Byte[]) keyValue;
     key = new byte[bytes.length];
     for (int i=0; i < bytes.length; i++) {
       key[i] = bytes[i];
     }
   } else if (key instanceof Record) {
    ... // what you have below, to create record writer, write key, flush
   } else {
     final String keyString = keyValue.toString();
     key = keyString.getBytes(StandardCharsets.UTF_8);
   }
   ```
   I.e., just look at the object that you have. Using the schema is also 
feasible. But it can get really complicated if the schema indicates that it's a 
CHOICE type because then you'll probably still end up having to do this type of 
logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to