exceptionfactory commented on code in PR #8076:
URL: https://github.com/apache/nifi/pull/8076#discussion_r1415789879
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java:
##########
@@ -159,9 +160,15 @@ void publish(final FlowFile flowFile, final InputStream
flowFileContent, final b
tracker.fail(flowFile, new TokenTooLargeException("A
message in the stream exceeds the maximum allowed message size of " +
maxMessageSize + " bytes."));
return;
}
- // Send FlowFile content as it is, to support sending 0 byte
message.
- messageContent = new byte[(int) flowFile.getSize()];
- StreamUtils.fillBuffer(flowFileContent, messageContent);
+
if(flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_IS_TOMBSTONE) != null
+ &&
flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_IS_TOMBSTONE).equals("true")
Review Comment:
Can this `getAttribute()` call return `null`? It looks like it would be
safer to reverse the comparison as follows:
```suggestion
&&
Boolean.TRUE.toString().equals(flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_IS_TOMBSTONE))
```
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java:
##########
@@ -159,9 +160,15 @@ void publish(final FlowFile flowFile, final InputStream
flowFileContent, final b
tracker.fail(flowFile, new TokenTooLargeException("A
message in the stream exceeds the maximum allowed message size of " +
maxMessageSize + " bytes."));
return;
}
- // Send FlowFile content as it is, to support sending 0 byte
message.
- messageContent = new byte[(int) flowFile.getSize()];
- StreamUtils.fillBuffer(flowFileContent, messageContent);
+
if(flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_IS_TOMBSTONE) != null
Review Comment:
Spacing:
```suggestion
if
(flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_IS_TOMBSTONE) != null
```
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java:
##########
@@ -87,6 +88,8 @@
+ " In the event a dynamic property represents a property that was
already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to:
http://kafka.apache.org/documentation.html#configuration. ",
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT)
+@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_IS_TOMBSTONE,
description = "If this attribute is set to 'true', if the processor is not
configured "
+ + "with a demarcator and if the FlowFile's content is null, then a
tombtsone message will be sent to Kafka.")
Review Comment:
Recommend noting that tombstone means zero bytes:
```suggestion
+ "with a demarcator and if the FlowFile's content is null, then a
tombstone message with zero bytes will be sent to Kafka.")
```
##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java:
##########
@@ -39,4 +39,6 @@ public interface KafkaFlowFileAttribute {
String KAFKA_CONSUMER_GROUP_ID = "kafka.consumer.id";
String KAFKA_CONSUMER_OFFSETS_COMMITTED =
"kafka.consumer.offsets.committed";
+
+ String KAFKA_IS_TOMBSTONE = "kafka.isTombstone";
Review Comment:
What do you think about naming this `kafka.tombstone`, dropping the `is`
prefix?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]