markap14 commented on code in PR #6131:
URL: https://github.com/apache/nifi/pull/6131#discussion_r930324902
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##########
@@ -256,5 +256,133 @@ <h3>SASL_SSL</h3>
ssl.client.auth property.
</p>
+ <h2>Output Modes</h2>
+ <div>
+ <p>This processor (NiFi 1.17+) offers multiple output strategies
(configured via processor property 'Consume
+ Strategy') for converting Kafka records into FlowFiles.</p>
+ <ul>
+ <li>Consume Strategy 'Write Value Only' (the default) emits
flowfile records containing only the Kafka
+ record value.
+ </li>
+ <li>Consume Strategy 'Use Wrapper' (new) emits flowfile
records containing the Kafka record key, value,
+ and headers, as well as additional metadata from the Kafka
record.
+ </li>
+ </ul>
+
+
+ <p>The record schema that is used when 'Use Wrapper' is active is
as follows (in Avro format):</p>
+<code>
+<pre>
+[
+ {
+ "type": "record",
+ "name": "kafka:ConsumeRecord:metadata",
+ "namespace": "org.apache.nifi",
+ "fields": [{
+ "name": "key",
+ "type": ["bytes", "string", "record"]
+ }, {
+ "name": "topic",
+ "type": "string"
+ }, {
+ "name": "partition",
+ "type": "int"
+ }, {
+ "name": "offset",
+ "type": "long"
+ }, {
+ "name": "timestamp",
+ "type": "long",
+ "logicalType": "timestamp-millis"
+ }]
+ },
+ {
+ "type": "record",
+ "name": "kafka:ConsumeRecord:wrapper",
+ "namespace": "org.apache.nifi",
+ "fields": [{
+ "name": "key",
+ "type": ["bytes", "string", "record"]
+ }, {
+ "name": "value",
+ "type": "record"
+ }, {
+ "name": "headers",
+ "type": "map",
+ "values": "string"
+ }, {
+ "name": "metadata",
+ "type": "kafka:ConsumeRecord:metadata"
+ }]
+ }
+]
Review Comment:
I think I would document the schema as:
```
{
"type": "record",
"name": "nifiRecord",
"namespace": "org.apache.nifi",
"fields": [{
"name": "key",
"type": [{
<Schema is determined by the Key Record Reader, or will be
"string" or "bytes", depending on the "Key Format" property>
}, "null"]
},
{
"name": "value",
"type": [
{
<Schema is determined by the Value Record Reader, or will be
"string" or "bytes", depending on the "Key Format" property>
},
"null"
]
},
{
"name": "headers",
"type": [
{ "type": "map", "values": "string", "default": {}},
"null"]
},
{
"name": "metadata",
"type": [
{
"type": "record",
"name": "metadataType",
"fields": [
{ "name": "topic", "type": ["string", "null"] },
{ "name": "partition", "type": ["int", "null"] },
{ "name": "offset", "type": ["int", "null"] },
{ "name": "timestamp", "type": ["long", "null"] }
]
},
"null"
]
}
]
}
```
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##########
@@ -256,5 +256,133 @@ <h3>SASL_SSL</h3>
ssl.client.auth property.
</p>
+ <h2>Output Modes</h2>
+ <div>
+ <p>This processor (NiFi 1.17+) offers multiple output strategies
(configured via processor property 'Consume
Review Comment:
Looks like this will likely not make it into 1.17, unfortunately. Will
probably need to update that comment. Can just remove the version all together,
as we generate documentation per version.
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##########
@@ -271,7 +284,13 @@ public class PublishKafkaRecord_2_6 extends
AbstractProcessor implements Verifia
.defaultValue("UTF-8")
.required(false)
.build();
-
+ static final PropertyDescriptor RECORD_KEY_WRITER = new
PropertyDescriptor.Builder()
+ .name("record-key-writer")
+ .displayName("Record Key Writer")
Review Comment:
Given that we're adding a Record Key Writer, we should probably change
display name of "Record Writer" to "Record Value Writer" for clarity
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##########
@@ -212,6 +217,31 @@ public class ConsumeKafkaRecord_2_6 extends
AbstractProcessor implements Verifia
.defaultValue("UTF-8")
.required(false)
.build();
+ static final PropertyDescriptor OUTPUT_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("output-strategy")
+ .displayName("Output Strategy")
+ .description("The format used to output the Kafka record into a
FlowFile record.")
+ .required(true)
+ .defaultValue(OUTPUT_USE_VALUE.getValue())
+ .allowableValues(OUTPUT_USE_VALUE, OUTPUT_USE_WRAPPER)
+ .build();
+ static final PropertyDescriptor KEY_FORMAT = new
PropertyDescriptor.Builder()
+ .name("key-format")
+ .displayName("Key Format")
+ .description("Specifies how to represent the Kafka Record's Key in
the output")
+ .required(true)
+ .defaultValue(KEY_AS_BYTE_ARRAY.getValue())
+ .allowableValues(KEY_AS_STRING, KEY_AS_BYTE_ARRAY, KEY_AS_RECORD)
+ .dependsOn(OUTPUT_STRATEGY, OUTPUT_USE_WRAPPER)
+ .build();
+ static final PropertyDescriptor KEY_RECORD_READER = new
PropertyDescriptor.Builder()
Review Comment:
Given that we're adding a new Key Record Reader, we should change the
display name of "Record Reader" to "Value Record Reader".
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html:
##########
@@ -189,5 +189,77 @@ <h3>SASL_SSL</h3>
See the SSL section for a description of how to configure the SSL
Context Service based on the
ssl.client.auth property.
</p>
+ <h2>Record Key Output</h2>
+ <div>
+ <p>This processor includes optional properties that control the
serialization of the key into the target
+ Kafka record.</p>
+ <ul>
+ <li>'Publish Strategy'</li>
+ <li>'Record Key Writer'</li>
+ </ul>
+
+ <p>'Publish Strategy' controls the mode used to convert the
FlowFile record into a Kafka record.</p>
+ <ul>
+ <li>'Use Content as Record Value' (the default) persists the
record key as a byte array.</li>
+ <li>'Use Wrapper' persists the record key as a record.</li>
+ </ul>
+
+ <p>If Publish Strategy 'Use Wrapper' is enabled, an additional
processor configuration property is
+ activated: 'Record Key Writer'.</p>
+
+ <p>'Record Key Writer', if enabled, serializes the NiFi record key
field into the Kafka record using the
+ specified implementation of the NiFi 'RecordSetWriter'
interface. This may be used to emit the key field
+ as JSON, Avro, XML, or some other data format, where it may be
used in the decision-making process of
+ downstream data processing (including that available in
ConsumeKafkaRecord_2_6). If not defined, the
+ default implementation serializes the record key as an
unmodified byte array (unchanged from previous
+ versions of the processor).</p>
+
+<p>Here is an example of a Kafka Record that is emitted by
<code>JsonRecordSetWriter</code> when strategy "Use Wrapper" is active:</p>
Review Comment:
I think this section is inaccurate. This describes how the processor would
behave if the "Publish Strategy" is set to "Use Content as Record Value", not
"Use Wrapper".
I do think this is as good example, as it illustrates how a key can be
specified, etc. But we should also have an example of how to use the Publish
Strategy of "Use Wrapper". In this case, we wouldn't specify the Key Field, and
the FlowFile content would be expected to have content that matches the
wrapper's format. E.g.,
```
{
"key": {"name":"Acme","number":"AC1234"},
"value": {"address":"1234 First
Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}},
"headers": {}
}
```
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html:
##########
@@ -189,5 +189,77 @@ <h3>SASL_SSL</h3>
See the SSL section for a description of how to configure the SSL
Context Service based on the
ssl.client.auth property.
</p>
+ <h2>Record Key Output</h2>
+ <div>
+ <p>This processor includes optional properties that control the
serialization of the key into the target
+ Kafka record.</p>
+ <ul>
+ <li>'Publish Strategy'</li>
+ <li>'Record Key Writer'</li>
+ </ul>
+
+ <p>'Publish Strategy' controls the mode used to convert the
FlowFile record into a Kafka record.</p>
+ <ul>
+ <li>'Use Content as Record Value' (the default) persists the
record key as a byte array.</li>
+ <li>'Use Wrapper' persists the record key as a record.</li>
+ </ul>
+
+ <p>If Publish Strategy 'Use Wrapper' is enabled, an additional
processor configuration property is
+ activated: 'Record Key Writer'.</p>
+
+ <p>'Record Key Writer', if enabled, serializes the NiFi record key
field into the Kafka record using the
+ specified implementation of the NiFi 'RecordSetWriter'
interface. This may be used to emit the key field
+ as JSON, Avro, XML, or some other data format, where it may be
used in the decision-making process of
+ downstream data processing (including that available in
ConsumeKafkaRecord_2_6). If not defined, the
+ default implementation serializes the record key as an
unmodified byte array (unchanged from previous
+ versions of the processor).</p>
+
+<p>Here is an example of a Kafka Record that is emitted by
<code>JsonRecordSetWriter</code> when strategy "Use Wrapper" is active:</p>
Review Comment:
It also is important here to note that the `metadata` field is currently
ignored, if it is.
However, I don't think we should actually ignore the field. Or, at least, we
should optionally ignore it. It would make sense to use both the `topic` and
the `partition` values of the metadata fields. This would allow us to send in
data that goes to multiple topics, even as a single transaction, which would be
a pretty nice feature.
It's okay to ignore it for now, as long as the documentation very clearly &
explicitly states that we ignore that field.
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##########
@@ -426,11 +466,13 @@ protected ConsumerPool createConsumerPool(final
ProcessContext context, final Co
}
return new ConsumerPool(maxLeases, readerFactory, writerFactory,
props, topics, maxUncommittedTime, securityProtocol,
- bootstrapServers, log, honorTransactions, charset,
headerNamePattern, separateByKey, keyEncoding, partitionsToConsume,
commitOffsets);
+ bootstrapServers, log, honorTransactions, charset,
headerNamePattern, separateByKey, keyEncoding, partitionsToConsume,
+ commitOffsets, outputStrategy, keyFormat,
keyReaderFactory);
Review Comment:
While it makes sense to have `String outputStrategy` at this level, as it
was obtained via Property value, it can only have 2 legitimate values. As-is,
the ConsumerPool has to then compare the value using
`KafkaProcessorUtils.OUTPUT_USE_WRAPPER.getValue().equals(consumeStrategy)`.
It would probably be much cleaner to instead have `enum` for the possible
Output Strategies and pass that enum value into the ConsumerPool.
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -593,6 +609,9 @@ private void writeRecordData(final ProcessSession session,
final List<ConsumerRe
try {
Record record;
while ((record = reader.nextRecord()) != null) {
Review Comment:
There's a special case that we need to consider here.
It's possible for a Kafka record to have a null value. In that case, it will
produce no NiFi Records using the Record Reader. However, if using the Output
Strategy of Wrapper, we should still output the wrapper. It may have a Key or
Headers that are still of value.
So here, we should keep track of the fact that our Record Reader has
produced a Record. When we end this `while` loop, if no NiFi Record was
produced, we should output a Record where the value is `null` but the Key and
Headers are still populated.
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##########
@@ -312,6 +331,8 @@ public class PublishKafkaRecord_2_6 extends
AbstractProcessor implements Verifia
properties.add(KafkaProcessorUtils.TOKEN_AUTH);
properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
properties.add(MESSAGE_KEY_FIELD);
+ properties.add(PUBLISH_STRATEGY);
Review Comment:
Is probably best to add Publish Strategy before Message Key Field, since
Message Key Field depends on Publish Strategy. Otherwise, we have a situation
where changing the value of Publish Strategy introduce a new property above it,
which is kind of an odd user experience.
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java:
##########
@@ -239,6 +251,7 @@ public class PublishKafkaRecord_2_6 extends
AbstractProcessor implements Verifia
+ "If not specified, no FlowFile attributes will be added as
headers.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(NONE)
+ .dependsOn(PUBLISH_STRATEGY, PUBLISH_USE_VALUE)
Review Comment:
If this property now depends on Publish Strategy, it should come after
Publish Strategy in the Properties list (i.e., in `this.properties`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]