This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ee2368e0ae NIFI-12371 Support tombstone messages in non-record Kafka
processors
ee2368e0ae is described below
commit ee2368e0ae684d8a3ca2e62ce89422a2e260bdce
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Nov 10 20:08:59 2023 +0100
NIFI-12371 Support tombstone messages in non-record Kafka processors
This closes #8076
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java | 3 ++-
.../apache/nifi/processors/kafka/pubsub/ConsumerLease.java | 2 ++
.../apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java | 3 +++
.../apache/nifi/processors/kafka/pubsub/PublisherLease.java | 11 ++++++++---
.../nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java | 2 ++
5 files changed, 17 insertions(+), 4 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index a5c6b15891..ff75453bc0 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -74,7 +74,8 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_OFFSET,
description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TIMESTAMP,
description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_PARTITION,
description = "The partition of the topic the message or message bundle is
from"),
- @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC,
description = "The topic the message or message bundle is from")
+ @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC,
description = "The topic the message or message bundle is from"),
+ @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE,
description = "Set to true if the consumed message is a tombstone message")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@DynamicProperty(name = "The name of a Kafka configuration property.", value =
"The value of a given Kafka configuration property.",
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 698ecc36a5..b197d97c41 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -474,6 +474,8 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
final byte[] value = record.value();
if (value != null) {
flowFile = session.write(flowFile, out -> out.write(value));
+ } else {
+ flowFile = session.putAttribute(flowFile,
KafkaFlowFileAttribute.KAFKA_TOMBSTONE, Boolean.TRUE.toString());
}
flowFile = session.putAllAttributes(flowFile, getAttributes(record));
tracker.updateFlowFile(flowFile);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
index b6b84ce1e0..9a8e5971ab 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -87,6 +88,8 @@ import static
org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute.KAFK
+ " In the event a dynamic property represents a property that was
already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to:
http://kafka.apache.org/documentation.html#configuration. ",
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT)
+@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE,
description = "If this attribute is set to 'true', if the processor is not
configured "
+ + "with a demarcator and if the FlowFile's content is null, then a
tombstone message with zero bytes will be sent to Kafka.")
@WritesAttribute(attribute = "msg.count", description = "The number of
messages that were sent to Kafka for this FlowFile. This attribute is added
only to "
+ "FlowFiles that are routed to success. If the <Message Demarcator>
Property is not set, this will always be 1, but if the Property is set, it may "
+ "be greater than 1.")
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 959e37ad96..2781ea8478 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -50,6 +50,7 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
import org.apache.nifi.kafka.shared.property.PublishStrategy;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -159,9 +160,13 @@ public class PublisherLease implements Closeable {
tracker.fail(flowFile, new TokenTooLargeException("A
message in the stream exceeds the maximum allowed message size of " +
maxMessageSize + " bytes."));
return;
}
- // Send FlowFile content as it is, to support sending 0 byte
message.
- messageContent = new byte[(int) flowFile.getSize()];
- StreamUtils.fillBuffer(flowFileContent, messageContent);
+ if
(Boolean.TRUE.toString().equals(flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_TOMBSTONE))
&& flowFile.getSize() == 0) {
+ messageContent = null;
+ } else {
+ // Send FlowFile content as it is, to support sending 0
byte message.
+ messageContent = new byte[(int) flowFile.getSize()];
+ StreamUtils.fillBuffer(flowFileContent, messageContent);
+ }
publish(flowFile, messageKey, messageContent, topic, tracker,
partition);
return;
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java
index 991f3b7a9e..15aa16ed76 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java
@@ -39,4 +39,6 @@ public interface KafkaFlowFileAttribute {
String KAFKA_CONSUMER_GROUP_ID = "kafka.consumer.id";
String KAFKA_CONSUMER_OFFSETS_COMMITTED =
"kafka.consumer.offsets.committed";
+
+ String KAFKA_TOMBSTONE = "kafka.tombstone";
}