markap14 commented on code in PR #6131:
URL: https://github.com/apache/nifi/pull/6131#discussion_r930324902


##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##########
@@ -256,5 +256,133 @@ <h3>SASL_SSL</h3>
             ssl.client.auth property.
         </p>
 
+        <h2>Output Modes</h2>
+        <div>
+            <p>This processor (NiFi 1.17+) offers multiple output strategies 
(configured via processor property 'Consume
+                Strategy') for converting Kafka records into FlowFiles.</p>
+            <ul>
+                <li>Consume Strategy 'Write Value Only' (the default) emits 
flowfile records containing only the Kafka
+                    record value.
+                </li>
+                <li>Consume Strategy 'Use Wrapper' (new) emits flowfile 
records containing the Kafka record key, value,
+                    and headers, as well as additional metadata from the Kafka 
record.
+                </li>
+            </ul>
+
+
+            <p>The record schema that is used when 'Use Wrapper' is active is 
as follows (in Avro format):</p>
+<code>
+<pre>
+[
+  {
+    "type": "record",
+    "name": "kafka:ConsumeRecord:metadata",
+    "namespace": "org.apache.nifi",
+    "fields": [{
+      "name": "key",
+      "type": ["bytes", "string", "record"]
+    }, {
+      "name": "topic",
+      "type": "string"
+    }, {
+      "name": "partition",
+      "type": "int"
+    }, {
+      "name": "offset",
+      "type": "long"
+    }, {
+      "name": "timestamp",
+      "type": "long",
+      "logicalType": "timestamp-millis"
+    }]
+  },
+  {
+    "type": "record",
+    "name": "kafka:ConsumeRecord:wrapper",
+    "namespace": "org.apache.nifi",
+    "fields": [{
+      "name": "key",
+      "type": ["bytes", "string", "record"]
+    }, {
+      "name": "value",
+      "type": "record"
+    }, {
+      "name": "headers",
+      "type": "map",
+      "values": "string"
+    }, {
+      "name": "metadata",
+      "type": "kafka:ConsumeRecord:metadata"
+    }]
+  }
+]

Review Comment:
   I think I would document the schema as:
   ```
   {
     "type": "record",
     "name": "nifiRecord",
     "namespace": "org.apache.nifi",
     "fields": [{
         "name": "key",
         "type": [{
             <Schema is determined by the Key Record Reader, or will be 
"string" or "bytes", depending on the "Key Format" property>
           }, "null"]
       },
       {
         "name": "value",
         "type": [
           {
             <Schema is determined by the Value Record Reader, or will be 
"string" or "bytes", depending on the "Key Format" property>
           },
           "null"
         ]
       },
       {
         "name": "headers",
         "type": [
           { "type": "map", "values": "string", "default": {}},
           "null"]
       },
       {
         "name": "metadata",
         "type": [
           {
             "type": "record",
             "name": "metadataType",
             "fields": [
               { "name": "topic", "type": ["string", "null"] },
               { "name": "partition", "type": ["int", "null"] },
               { "name": "offset", "type": ["int", "null"] },
               { "name": "timestamp", "type": ["long", "null"] }
             ]
           },
           "null"
         ]
       }
     ]
   }
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##########
@@ -256,5 +256,133 @@ <h3>SASL_SSL</h3>
             ssl.client.auth property.
         </p>
 
+        <h2>Output Modes</h2>
+        <div>
+            <p>This processor (NiFi 1.17+) offers multiple output strategies 
(configured via processor property 'Consume

Review Comment:
   Looks like this will likely not make it into 1.17, unfortunately. Will 
probably need to update that comment. Can just remove the version all together, 
as we generate documentation per version.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##########
@@ -271,7 +284,13 @@ public class PublishKafkaRecord_2_6 extends 
AbstractProcessor implements Verifia
         .defaultValue("UTF-8")
         .required(false)
         .build();
-
+    static final PropertyDescriptor RECORD_KEY_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-key-writer")
+            .displayName("Record Key Writer")

Review Comment:
   Given that we're adding a Record Key Writer, we should probably change 
display name of "Record Writer" to "Record Value Writer" for clarity



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##########
@@ -212,6 +217,31 @@ public class ConsumeKafkaRecord_2_6 extends 
AbstractProcessor implements Verifia
         .defaultValue("UTF-8")
         .required(false)
         .build();
+    static final PropertyDescriptor OUTPUT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("output-strategy")
+            .displayName("Output Strategy")
+            .description("The format used to output the Kafka record into a 
FlowFile record.")
+            .required(true)
+            .defaultValue(OUTPUT_USE_VALUE.getValue())
+            .allowableValues(OUTPUT_USE_VALUE, OUTPUT_USE_WRAPPER)
+            .build();
+    static final PropertyDescriptor KEY_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("key-format")
+            .displayName("Key Format")
+            .description("Specifies how to represent the Kafka Record's Key in 
the output")
+            .required(true)
+            .defaultValue(KEY_AS_BYTE_ARRAY.getValue())
+            .allowableValues(KEY_AS_STRING, KEY_AS_BYTE_ARRAY, KEY_AS_RECORD)
+            .dependsOn(OUTPUT_STRATEGY, OUTPUT_USE_WRAPPER)
+            .build();
+    static final PropertyDescriptor KEY_RECORD_READER = new 
PropertyDescriptor.Builder()

Review Comment:
   Given that we're adding a new Key Record Reader, we should change the 
display name of "Record Reader" to "Value Record Reader".



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html:
##########
@@ -189,5 +189,77 @@ <h3>SASL_SSL</h3>
             See the SSL section for a description of how to configure the SSL 
Context Service based on the
             ssl.client.auth property.
         </p>
+        <h2>Record Key Output</h2>
+        <div>
+            <p>This processor includes optional properties that control the 
serialization of the key into the target
+                Kafka record.</p>
+            <ul>
+                <li>'Publish Strategy'</li>
+                <li>'Record Key Writer'</li>
+            </ul>
+
+            <p>'Publish Strategy' controls the mode used to convert the 
FlowFile record into a Kafka record.</p>
+            <ul>
+                <li>'Use Content as Record Value' (the default) persists the 
record key as a byte array.</li>
+                <li>'Use Wrapper' persists the record key as a record.</li>
+            </ul>
+
+            <p>If Publish Strategy 'Use Wrapper' is enabled, an additional 
processor configuration property is
+                activated: 'Record Key Writer'.</p>
+
+            <p>'Record Key Writer', if enabled, serializes the NiFi record key 
field into the Kafka record using the
+                specified implementation of the NiFi 'RecordSetWriter' 
interface. This may be used to emit the key field
+                as JSON, Avro, XML, or some other data format, where it may be 
used in the decision-making process of
+                downstream data processing (including that available in 
ConsumeKafkaRecord_2_6). If not defined, the
+                default implementation serializes the record key as an 
unmodified byte array (unchanged from previous
+                versions of the processor).</p>
+
+<p>Here is an example of a Kafka Record that is emitted by 
<code>JsonRecordSetWriter</code> when strategy "Use Wrapper" is active:</p>

Review Comment:
   I think this section is inaccurate. This describes how the processor would 
behave if the "Publish Strategy" is set to "Use Content as Record Value", not 
"Use Wrapper".
   I do think this is as good example, as it illustrates how a key can be 
specified, etc. But we should also have an example of how to use the Publish 
Strategy of "Use Wrapper". In this case, we wouldn't specify the Key Field, and 
the FlowFile content would be expected to have content that matches the 
wrapper's format. E.g.,
   ```
   {
     "key": {"name":"Acme","number":"AC1234"},
     "value": {"address":"1234 First 
Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}},
     "headers": {}
   }
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html:
##########
@@ -189,5 +189,77 @@ <h3>SASL_SSL</h3>
             See the SSL section for a description of how to configure the SSL 
Context Service based on the
             ssl.client.auth property.
         </p>
+        <h2>Record Key Output</h2>
+        <div>
+            <p>This processor includes optional properties that control the 
serialization of the key into the target
+                Kafka record.</p>
+            <ul>
+                <li>'Publish Strategy'</li>
+                <li>'Record Key Writer'</li>
+            </ul>
+
+            <p>'Publish Strategy' controls the mode used to convert the 
FlowFile record into a Kafka record.</p>
+            <ul>
+                <li>'Use Content as Record Value' (the default) persists the 
record key as a byte array.</li>
+                <li>'Use Wrapper' persists the record key as a record.</li>
+            </ul>
+
+            <p>If Publish Strategy 'Use Wrapper' is enabled, an additional 
processor configuration property is
+                activated: 'Record Key Writer'.</p>
+
+            <p>'Record Key Writer', if enabled, serializes the NiFi record key 
field into the Kafka record using the
+                specified implementation of the NiFi 'RecordSetWriter' 
interface. This may be used to emit the key field
+                as JSON, Avro, XML, or some other data format, where it may be 
used in the decision-making process of
+                downstream data processing (including that available in 
ConsumeKafkaRecord_2_6). If not defined, the
+                default implementation serializes the record key as an 
unmodified byte array (unchanged from previous
+                versions of the processor).</p>
+
+<p>Here is an example of a Kafka Record that is emitted by 
<code>JsonRecordSetWriter</code> when strategy "Use Wrapper" is active:</p>

Review Comment:
   It also is important here to note that the `metadata` field is currently 
ignored, if it is.
   However, I don't think we should actually ignore the field. Or, at least, we 
should optionally ignore it. It would make sense to use both the `topic` and 
the `partition` values of the metadata fields. This would allow us to send in 
data that goes to multiple topics, even as a single transaction, which would be 
a pretty nice feature.
   It's okay to ignore it for now, as long as the documentation very clearly & 
explicitly states that we ignore that field.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##########
@@ -426,11 +466,13 @@ protected ConsumerPool createConsumerPool(final 
ProcessContext context, final Co
             }
 
             return new ConsumerPool(maxLeases, readerFactory, writerFactory, 
props, topics, maxUncommittedTime, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, 
headerNamePattern, separateByKey, keyEncoding, partitionsToConsume, 
commitOffsets);
+                    bootstrapServers, log, honorTransactions, charset, 
headerNamePattern, separateByKey, keyEncoding, partitionsToConsume,
+                    commitOffsets, outputStrategy, keyFormat, 
keyReaderFactory);

Review Comment:
   While it makes sense to have `String outputStrategy` at this level, as it 
was obtained via Property value, it can only have 2 legitimate values. As-is, 
the ConsumerPool has to then compare the value using 
`KafkaProcessorUtils.OUTPUT_USE_WRAPPER.getValue().equals(consumeStrategy)`.
   It would probably be much cleaner to instead have `enum` for the possible 
Output Strategies and pass that enum value into the ConsumerPool.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -593,6 +609,9 @@ private void writeRecordData(final ProcessSession session, 
final List<ConsumerRe
                     try {
                         Record record;
                         while ((record = reader.nextRecord()) != null) {

Review Comment:
   There's a special case that we need to consider here.
   It's possible for a Kafka record to have a null value. In that case, it will 
produce no NiFi Records using the Record Reader. However, if using the Output 
Strategy of Wrapper, we should still output the wrapper. It may have a Key or 
Headers that are still of value.
   So here, we should keep track of the fact that our Record Reader has 
produced a Record. When we end this `while` loop, if no NiFi Record was 
produced, we should output a Record where the value is `null` but the Key and 
Headers are still populated.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##########
@@ -312,6 +331,8 @@ public class PublishKafkaRecord_2_6 extends 
AbstractProcessor implements Verifia
         properties.add(KafkaProcessorUtils.TOKEN_AUTH);
         properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
         properties.add(MESSAGE_KEY_FIELD);
+        properties.add(PUBLISH_STRATEGY);

Review Comment:
   Is probably best to add Publish Strategy before Message Key Field, since 
Message Key Field depends on Publish Strategy. Otherwise, we have a situation 
where changing the value of Publish Strategy introduce a new property above it, 
which is kind of an odd user experience.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##########
@@ -239,6 +251,7 @@ public class PublishKafkaRecord_2_6 extends 
AbstractProcessor implements Verifia
             + "If not specified, no FlowFile attributes will be added as 
headers.")
         .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
         .expressionLanguageSupported(NONE)
+        .dependsOn(PUBLISH_STRATEGY, PUBLISH_USE_VALUE)

Review Comment:
   If this property now depends on Publish Strategy, it should come after 
Publish Strategy in the Properties list (i.e., in `this.properties`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to