This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c610aab NIFI-7953: Updated
ConsumeKafka_2_0/ConsumeKafkaRecord_2_0/ConsumeKafka_2_6/ConsumeKafkaRecord_2_6
to allow separating records by key
c610aab is described below
commit c610aab3cb017d7030381f2715de446923870966
Author: Mark Payne <[email protected]>
AuthorDate: Tue Oct 27 12:46:12 2020 -0400
NIFI-7953: Updated
ConsumeKafka_2_0/ConsumeKafkaRecord_2_0/ConsumeKafka_2_6/ConsumeKafkaRecord_2_6
to allow separating records by key
---
.../java/org/apache/nifi/util/MockFlowFile.java | 3 +-
.../kafka/pubsub/ConsumeKafkaRecord_2_0.java | 49 +++++++++++++++++-----
.../processors/kafka/pubsub/ConsumeKafka_2_0.java | 17 +++++++-
.../processors/kafka/pubsub/ConsumerLease.java | 37 ++++++++++------
.../nifi/processors/kafka/pubsub/ConsumerPool.java | 33 ++++++++++-----
.../kafka/pubsub/KafkaProcessorUtils.java | 2 +
.../processors/kafka/pubsub/ConsumerPoolTest.java | 2 +
.../kafka/pubsub/ConsumeKafkaRecord_2_6.java | 31 +++++++++++++-
.../processors/kafka/pubsub/ConsumeKafka_2_6.java | 18 +++++++-
.../processors/kafka/pubsub/ConsumerLease.java | 33 ++++++++++-----
.../nifi/processors/kafka/pubsub/ConsumerPool.java | 21 +++++++---
.../kafka/pubsub/KafkaProcessorUtils.java | 5 +--
.../processors/kafka/pubsub/ConsumerPoolTest.java | 2 +
13 files changed, 194 insertions(+), 59 deletions(-)
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index 375158c..254320e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -226,7 +226,8 @@ public class MockFlowFile implements FlowFileRecord {
}
public void assertAttributeEquals(final String attributeName, final String
expectedValue) {
- Assert.assertEquals(expectedValue, attributes.get(attributeName));
+ Assert.assertEquals("Expected attribute " + attributeName + " to be "
+ expectedValue + " but instead it was " + attributes.get(attributeName),
+ expectedValue, attributes.get(attributeName));
}
public void assertAttributeNotEquals(final String attributeName, final
String expectedValue) {
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
index 7f5c75a..79824b1 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
@@ -34,7 +34,6 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -57,6 +56,12 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+import static
org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE;
+import static
org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static
org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+
@CapabilityDescription("Consumes messages from Apache Kafka specifically built
against the Kafka 2.0 Consumer API. "
+ "The complementary NiFi processor for sending messages is
PublishKafkaRecord_2_0. Please note that, at this time, the Processor assumes
that "
+ "all records that are retrieved from a given partition have the same
schema. If any of the Kafka messages are pulled but cannot be parsed or written
with the "
@@ -77,7 +82,7 @@ import java.util.regex.Pattern;
description = "These properties will be added on the Kafka
configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was
already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to:
http://kafka.apache.org/documentation.html#configuration.",
- expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
+ expressionLanguageScope = VARIABLE_REGISTRY)
@SeeAlso({ConsumeKafka_2_0.class, PublishKafka_2_0.class,
PublishKafkaRecord_2_0.class})
public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
@@ -93,7 +98,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor
{
.description("The name of the Kafka Topic(s) to pull from. More
than one can be supplied if comma separated.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor TOPIC_TYPE = new Builder()
@@ -110,7 +115,7 @@ public class ConsumeKafkaRecord_2_0 extends
AbstractProcessor {
.displayName("Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.required(true)
.build();
@@ -119,7 +124,7 @@ public class ConsumeKafkaRecord_2_0 extends
AbstractProcessor {
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data
before sending to Kafka")
.identifiesControllerService(RecordSetWriterFactory.class)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.required(true)
.build();
@@ -129,7 +134,7 @@ public class ConsumeKafkaRecord_2_0 extends
AbstractProcessor {
.description("A Group ID is used to identify consumers that are
within the same consumer group. Corresponds to Kafka's 'group.id' property.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor AUTO_OFFSET_RESET = new Builder()
@@ -179,7 +184,7 @@ public class ConsumeKafkaRecord_2_0 extends
AbstractProcessor {
+ "read_uncomitted. This means that messages will be received as
soon as they are written to Kafka but will be pulled, even if the producer
cancels the transactions. If "
+ "this value is true, NiFi will not receive any messages for
which the producer's transaction was canceled, but this can result in some
latency since the consumer must wait "
+ "for the producer to finish its entire transaction instead of
pulling as the messages become available.")
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
@@ -203,8 +208,25 @@ public class ConsumeKafkaRecord_2_0 extends
AbstractProcessor {
+ "\".*\" if messages are expected to have header values that are
unique per message, such as an identifier or timestamp, because it will prevent
NiFi from bundling "
+ "the messages together efficiently.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
+ .required(false)
+ .build();
+ static final PropertyDescriptor SEPARATE_BY_KEY = new Builder()
+ .name("separate-by-key")
+ .displayName("Separate By Key")
+ .description("If true, two Records will only be added to the same
FlowFile if both of the Kafka Messages have identical keys.")
.required(false)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+ static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new
PropertyDescriptor.Builder()
+ .name("key-attribute-encoding")
+ .displayName("Key Attribute Encoding")
+ .description("If the <Separate By Key> property is set to true,
FlowFiles that are emitted have an attribute named '" +
KafkaProcessorUtils.KAFKA_KEY +
+ "'. This property dictates how the value of the attribute should
be encoded.")
+ .required(true)
+ .defaultValue(UTF8_ENCODING.getValue())
+ .allowableValues(UTF8_ENCODING, HEX_ENCODING,
DO_NOT_ADD_KEY_AS_ATTRIBUTE)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -242,6 +264,8 @@ public class ConsumeKafkaRecord_2_0 extends
AbstractProcessor {
descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
descriptors.add(GROUP_ID);
+ descriptors.add(SEPARATE_BY_KEY);
+ descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(MESSAGE_HEADER_ENCODING);
descriptors.add(HEADER_NAME_REGEX);
@@ -283,7 +307,7 @@ public class ConsumeKafkaRecord_2_0 extends
AbstractProcessor {
.name(propertyDescriptorName)
.addValidator(new
KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
.dynamic(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
}
@@ -328,6 +352,9 @@ public class ConsumeKafkaRecord_2_0 extends
AbstractProcessor {
final String headerNameRegex =
context.getProperty(HEADER_NAME_REGEX).getValue();
final Pattern headerNamePattern = headerNameRegex == null ? null :
Pattern.compile(headerNameRegex);
+ final boolean separateByKey =
context.getProperty(SEPARATE_BY_KEY).asBoolean();
+ final String keyEncoding =
context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+
if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@@ -337,11 +364,11 @@ public class ConsumeKafkaRecord_2_0 extends
AbstractProcessor {
}
return new ConsumerPool(maxLeases, readerFactory, writerFactory,
props, topics, maxUncommittedTime, securityProtocol,
- bootstrapServers, log, honorTransactions, charset,
headerNamePattern);
+ bootstrapServers, log, honorTransactions, charset,
headerNamePattern, separateByKey, keyEncoding);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, readerFactory, writerFactory,
props, topicPattern, maxUncommittedTime, securityProtocol,
- bootstrapServers, log, honorTransactions, charset,
headerNamePattern);
+ bootstrapServers, log, honorTransactions, charset,
headerNamePattern, separateByKey, keyEncoding);
} else {
getLogger().error("Subscription type has an unknown value {}", new
Object[] {topicType});
return null;
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
index 13bebc9..fc00693 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
@@ -146,6 +146,16 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
+ "will result in a single FlowFile which "
+ "time it is triggered. To enter special character such
as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
.build();
+
+ static final PropertyDescriptor SEPARATE_BY_KEY = new
PropertyDescriptor.Builder()
+ .name("separate-by-key")
+ .displayName("Separate By Key")
+ .description("If true, and the <Message Demarcator> property is set,
two messages will only be added to the same FlowFile if both of the Kafka
Messages have identical keys.")
+ .required(false)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
static final PropertyDescriptor HEADER_NAME_REGEX = new
PropertyDescriptor.Builder()
.name("header-name-regex")
.displayName("Headers to Add as Attributes (Regex)")
@@ -234,6 +244,7 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(MESSAGE_DEMARCATOR);
+ descriptors.add(SEPARATE_BY_KEY);
descriptors.add(MESSAGE_HEADER_ENCODING);
descriptors.add(HEADER_NAME_REGEX);
descriptors.add(MAX_POLL_RECORDS);
@@ -315,6 +326,8 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
final String headerNameRegex =
context.getProperty(HEADER_NAME_REGEX).getValue();
final Pattern headerNamePattern = headerNameRegex == null ? null :
Pattern.compile(headerNameRegex);
+ final boolean separateByKey =
context.getProperty(SEPARATE_BY_KEY).asBoolean();
+
if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@@ -323,11 +336,11 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
}
}
- return new ConsumerPool(maxLeases, demarcator, props, topics,
maxUncommittedTime, keyEncoding, securityProtocol,
+ return new ConsumerPool(maxLeases, demarcator, separateByKey,
props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
bootstrapServers, log, honorTransactions, charset,
headerNamePattern);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
- return new ConsumerPool(maxLeases, demarcator, props,
topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
+ return new ConsumerPool(maxLeases, demarcator, separateByKey,
props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
bootstrapServers, log, honorTransactions, charset,
headerNamePattern);
} else {
getLogger().error("Subscription type has an unknown value {}", new
Object[] {topicType});
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 2674dd9..3ecec49 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -49,6 +49,7 @@ import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -83,6 +84,7 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
private final RecordReaderFactory readerFactory;
private final Charset headerCharacterSet;
private final Pattern headerNamePattern;
+ private final boolean separateByKey;
private boolean poisoned = false;
//used for tracking demarcated flowfiles to their TopicPartition so we can
append
//to them on subsequent poll calls
@@ -103,7 +105,8 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
final RecordSetWriterFactory writerFactory,
final ComponentLog logger,
final Charset headerCharacterSet,
- final Pattern headerNamePattern) {
+ final Pattern headerNamePattern,
+ final boolean separateByKey) {
this.maxWaitMillis = maxWaitMillis;
this.kafkaConsumer = kafkaConsumer;
this.demarcatorBytes = demarcatorBytes;
@@ -115,6 +118,7 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
this.logger = logger;
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
+ this.separateByKey = separateByKey;
}
/**
@@ -164,7 +168,7 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
* flowfiles necessary or appends to existing ones if in demarcation mode.
*/
void poll() {
- /**
+ /*
* Implementation note:
* Even if ConsumeKafka is not scheduled to poll due to downstream
connection back-pressure is engaged,
* for longer than session.timeout.ms (defaults to 10 sec), Kafka
consumer sends heartbeat from background thread.
@@ -202,7 +206,7 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
return false;
}
try {
- /**
+ /*
* Committing the nifi session then the offsets means we have an at
* least once guarantee here. If we reversed the order we'd have at
* most once.
@@ -412,7 +416,7 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
private void writeDemarcatedData(final ProcessSession session, final
List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition
topicPartition) {
// Group the Records by their BundleInformation
final Map<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> map
= records.stream()
- .collect(Collectors.groupingBy(rec -> new
BundleInformation(topicPartition, null, getAttributes(rec))));
+ .collect(Collectors.groupingBy(rec -> new
BundleInformation(topicPartition, null, getAttributes(rec), separateByKey ?
rec.key() : null)));
for (final Map.Entry<BundleInformation, List<ConsumerRecord<byte[],
byte[]>>> entry : map.entrySet()) {
final BundleInformation bundleInfo = entry.getKey();
@@ -538,7 +542,7 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
while ((record = reader.nextRecord()) != null) {
// Determine the bundle for this record.
final RecordSchema recordSchema =
record.getSchema();
- final BundleInformation bundleInfo = new
BundleInformation(topicPartition, recordSchema, attributes);
+ final BundleInformation bundleInfo = new
BundleInformation(topicPartition, recordSchema, attributes, separateByKey ?
consumerRecord.key() : null);
BundleTracker tracker = bundleMap.get(bundleInfo);
if (tracker == null) {
@@ -626,9 +630,16 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET,
String.valueOf(tracker.initialOffset));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP,
String.valueOf(tracker.initialTimestamp));
- if (tracker.key != null && tracker.totalRecords == 1) {
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+
+ // If we have a kafka key, we will add it as an attribute only if
+ // the FlowFile contains a single Record, or if the Records have been
separated by Key,
+ // because we then know that even though there are multiple Records,
they all have the same key.
+ if (tracker.key != null && (tracker.totalRecords == 1 ||
separateByKey)) {
+ if
(!keyEncoding.equalsIgnoreCase(KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE.getValue()))
{
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+ }
}
+
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION,
String.valueOf(tracker.partition));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
if (tracker.totalRecords > 1) {
@@ -647,8 +658,8 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
tracker.updateFlowFile(newFlowFile);
}
- private static class BundleTracker {
+ private static class BundleTracker {
final long initialOffset;
final long initialTimestamp;
final int partition;
@@ -678,23 +689,24 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
private void updateFlowFile(final FlowFile flowFile) {
this.flowFile = flowFile;
}
-
}
private static class BundleInformation {
private final TopicPartition topicPartition;
private final RecordSchema schema;
private final Map<String, String> attributes;
+ private final byte[] messageKey;
- public BundleInformation(final TopicPartition topicPartition, final
RecordSchema schema, final Map<String, String> attributes) {
+ public BundleInformation(final TopicPartition topicPartition, final
RecordSchema schema, final Map<String, String> attributes, final byte[]
messageKey) {
this.topicPartition = topicPartition;
this.schema = schema;
this.attributes = attributes;
+ this.messageKey = messageKey;
}
@Override
public int hashCode() {
- return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0
: 13 * schema.hashCode()) + ((attributes == null) ? 0 : 13 *
attributes.hashCode());
+ return 41 + Objects.hash(topicPartition, schema, attributes) + 37
* Arrays.hashCode(messageKey);
}
@Override
@@ -710,7 +722,8 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
}
final BundleInformation other = (BundleInformation) obj;
- return Objects.equals(topicPartition, other.topicPartition) &&
Objects.equals(schema, other.schema) && Objects.equals(attributes,
other.attributes);
+ return Objects.equals(topicPartition, other.topicPartition) &&
Objects.equals(schema, other.schema) && Objects.equals(attributes,
other.attributes)
+ && Arrays.equals(this.messageKey, other.messageKey);
}
}
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 7f02b26..0462729 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -59,6 +59,7 @@ public class ConsumerPool implements Closeable {
private final RecordSetWriterFactory writerFactory;
private final Charset headerCharacterSet;
private final Pattern headerNamePattern;
+ private final boolean separateByKey;
private final AtomicLong consumerCreatedCountRef = new AtomicLong();
private final AtomicLong consumerClosedCountRef = new AtomicLong();
private final AtomicLong leasesObtainedCountRef = new AtomicLong();
@@ -86,6 +87,7 @@ public class ConsumerPool implements Closeable {
public ConsumerPool(
final int maxConcurrentLeases,
final byte[] demarcator,
+ final boolean separateByKey,
final Map<String, Object> kafkaProperties,
final List<String> topics,
final long maxWaitMillis,
@@ -111,11 +113,13 @@ public class ConsumerPool implements Closeable {
this.honorTransactions = honorTransactions;
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
+ this.separateByKey = separateByKey;
}
public ConsumerPool(
final int maxConcurrentLeases,
final byte[] demarcator,
+ final boolean separateByKey,
final Map<String, Object> kafkaProperties,
final Pattern topics,
final long maxWaitMillis,
@@ -141,6 +145,7 @@ public class ConsumerPool implements Closeable {
this.honorTransactions = honorTransactions;
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
+ this.separateByKey = separateByKey;
}
public ConsumerPool(
@@ -155,12 +160,13 @@ public class ConsumerPool implements Closeable {
final ComponentLog logger,
final boolean honorTransactions,
final Charset headerCharacterSet,
- final Pattern headerNamePattern) {
+ final Pattern headerNamePattern,
+ final boolean separateByKey,
+ final String keyEncoding) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = null;
- this.keyEncoding = null;
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.securityProtocol = securityProtocol;
@@ -171,6 +177,8 @@ public class ConsumerPool implements Closeable {
this.honorTransactions = honorTransactions;
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
+ this.separateByKey = separateByKey;
+ this.keyEncoding = keyEncoding;
}
public ConsumerPool(
@@ -185,12 +193,13 @@ public class ConsumerPool implements Closeable {
final ComponentLog logger,
final boolean honorTransactions,
final Charset headerCharacterSet,
- final Pattern headerNamePattern) {
+ final Pattern headerNamePattern,
+ final boolean separateByKey,
+ final String keyEncoding) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = null;
- this.keyEncoding = null;
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.securityProtocol = securityProtocol;
@@ -201,6 +210,8 @@ public class ConsumerPool implements Closeable {
this.honorTransactions = honorTransactions;
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
+ this.separateByKey = separateByKey;
+ this.keyEncoding = keyEncoding;
}
/**
@@ -218,7 +229,8 @@ public class ConsumerPool implements Closeable {
if (lease == null) {
final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
consumerCreatedCountRef.incrementAndGet();
- /**
+
+ /*
* For now return a new consumer lease. But we could later elect to
* have this return null if we determine the broker indicates that
* the lag time on all topics being monitored is sufficiently low.
@@ -228,10 +240,9 @@ public class ConsumerPool implements Closeable {
* sitting idle which could prompt excessive rebalances.
*/
lease = new SimpleConsumerLease(consumer);
- /**
- * This subscription tightly couples the lease to the given
- * consumer. They cannot be separated from then on.
- */
+
+ // This subscription tightly couples the lease to the given
+ // consumer. They cannot be separated from then on.
if (topics != null) {
consumer.subscribe(topics, lease);
} else {
@@ -268,7 +279,7 @@ public class ConsumerPool implements Closeable {
public void close() {
final List<SimpleConsumerLease> leases = new ArrayList<>();
pooledLeases.drainTo(leases);
- leases.stream().forEach((lease) -> {
+ leases.forEach((lease) -> {
lease.close(true);
});
}
@@ -301,7 +312,7 @@ public class ConsumerPool implements Closeable {
private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding,
securityProtocol, bootstrapServers,
- readerFactory, writerFactory, logger, headerCharacterSet,
headerNamePattern);
+ readerFactory, writerFactory, logger, headerCharacterSet,
headerNamePattern, separateByKey);
this.consumer = consumer;
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index e756776..b089fce 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -60,6 +60,8 @@ public final class KafkaProcessorUtils {
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8",
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex
Encoded",
"The key is interpreted as arbitrary binary data and is encoded
using hexadecimal characters with uppercase letters");
+ static final AllowableValue DO_NOT_ADD_KEY_AS_ATTRIBUTE = new
AllowableValue("do-not-add", "Do Not Add Key as Attribute",
+ "The key will not be added as an Attribute");
static final Pattern HEX_KEY_PATTERN =
Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 9d53ee6..d006a6e 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -70,6 +70,7 @@ public class ConsumerPoolTest {
testPool = new ConsumerPool(
1,
null,
+ false,
Collections.emptyMap(),
Collections.singletonList("nifi"),
100L,
@@ -88,6 +89,7 @@ public class ConsumerPoolTest {
testDemarcatedPool = new ConsumerPool(
1,
"--demarcator--".getBytes(StandardCharsets.UTF_8),
+ false,
Collections.emptyMap(),
Collections.singletonList("nifi"),
100L,
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index 8043058..3e7b16a 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -57,6 +57,10 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import static
org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE;
+import static
org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static
org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+
@CapabilityDescription("Consumes messages from Apache Kafka specifically built
against the Kafka 2.6 Consumer API. "
+ "The complementary NiFi processor for sending messages is
PublishKafkaRecord_2_6. Please note that, at this time, the Processor assumes
that "
+ "all records that are retrieved from a given partition have the same
schema. If any of the Kafka messages are pulled but cannot be parsed or written
with the "
@@ -207,6 +211,24 @@ public class ConsumeKafkaRecord_2_6 extends
AbstractProcessor {
.required(false)
.build();
+ static final PropertyDescriptor SEPARATE_BY_KEY = new Builder()
+ .name("separate-by-key")
+ .displayName("Separate By Key")
+ .description("If true, two Records will only be added to the same
FlowFile if both of the Kafka Messages have identical keys.")
+ .required(false)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+ static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new
PropertyDescriptor.Builder()
+ .name("key-attribute-encoding")
+ .displayName("Key Attribute Encoding")
+ .description("If the <Separate By Key> property is set to true,
FlowFiles that are emitted have an attribute named '" +
KafkaProcessorUtils.KAFKA_KEY +
+ "'. This property dictates how the value of the attribute should
be encoded.")
+ .required(true)
+ .defaultValue(UTF8_ENCODING.getValue())
+ .allowableValues(UTF8_ENCODING, HEX_ENCODING,
DO_NOT_ADD_KEY_AS_ATTRIBUTE)
+ .build();
+
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles received from Kafka. Depending on
demarcation strategy it is a flow file per message or a bundle of messages
grouped by topic and partition.")
@@ -242,6 +264,8 @@ public class ConsumeKafkaRecord_2_6 extends
AbstractProcessor {
descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
descriptors.add(GROUP_ID);
+ descriptors.add(SEPARATE_BY_KEY);
+ descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(MESSAGE_HEADER_ENCODING);
descriptors.add(HEADER_NAME_REGEX);
@@ -328,6 +352,9 @@ public class ConsumeKafkaRecord_2_6 extends
AbstractProcessor {
final String headerNameRegex =
context.getProperty(HEADER_NAME_REGEX).getValue();
final Pattern headerNamePattern = headerNameRegex == null ? null :
Pattern.compile(headerNameRegex);
+ final boolean separateByKey =
context.getProperty(SEPARATE_BY_KEY).asBoolean();
+ final String keyEncoding =
context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+
if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@@ -337,11 +364,11 @@ public class ConsumeKafkaRecord_2_6 extends
AbstractProcessor {
}
return new ConsumerPool(maxLeases, readerFactory, writerFactory,
props, topics, maxUncommittedTime, securityProtocol,
- bootstrapServers, log, honorTransactions, charset,
headerNamePattern);
+ bootstrapServers, log, honorTransactions, charset,
headerNamePattern, separateByKey, keyEncoding);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, readerFactory, writerFactory,
props, topicPattern, maxUncommittedTime, securityProtocol,
- bootstrapServers, log, honorTransactions, charset,
headerNamePattern);
+ bootstrapServers, log, honorTransactions, charset,
headerNamePattern, separateByKey, keyEncoding);
} else {
getLogger().error("Subscription type has an unknown value {}", new
Object[] {topicType});
return null;
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index c96bb60..5461abb 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -146,6 +146,17 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
+ "will result in a single FlowFile which "
+ "time it is triggered. To enter special character such
as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
.build();
+
+
+ static final PropertyDescriptor SEPARATE_BY_KEY = new
PropertyDescriptor.Builder()
+ .name("separate-by-key")
+ .displayName("Separate By Key")
+ .description("If true, and the <Message Demarcator> property is set,
two messages will only be added to the same FlowFile if both of the Kafka
Messages have identical keys.")
+ .required(false)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
static final PropertyDescriptor HEADER_NAME_REGEX = new
PropertyDescriptor.Builder()
.name("header-name-regex")
.displayName("Headers to Add as Attributes (Regex)")
@@ -234,6 +245,7 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(MESSAGE_DEMARCATOR);
+ descriptors.add(SEPARATE_BY_KEY);
descriptors.add(MESSAGE_HEADER_ENCODING);
descriptors.add(HEADER_NAME_REGEX);
descriptors.add(MAX_POLL_RECORDS);
@@ -315,6 +327,8 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
final String headerNameRegex =
context.getProperty(HEADER_NAME_REGEX).getValue();
final Pattern headerNamePattern = headerNameRegex == null ? null :
Pattern.compile(headerNameRegex);
+ final boolean separateByKey =
context.getProperty(SEPARATE_BY_KEY).asBoolean();
+
if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@@ -323,11 +337,11 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
}
}
- return new ConsumerPool(maxLeases, demarcator, props, topics,
maxUncommittedTime, keyEncoding, securityProtocol,
+ return new ConsumerPool(maxLeases, demarcator, separateByKey,
props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
bootstrapServers, log, honorTransactions, charset,
headerNamePattern);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
- return new ConsumerPool(maxLeases, demarcator, props,
topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
+ return new ConsumerPool(maxLeases, demarcator, separateByKey,
props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
bootstrapServers, log, honorTransactions, charset,
headerNamePattern);
} else {
getLogger().error("Subscription type has an unknown value {}", new
Object[] {topicType});
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 458165b..c3846a2 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -49,6 +49,7 @@ import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -83,6 +84,7 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
private final RecordReaderFactory readerFactory;
private final Charset headerCharacterSet;
private final Pattern headerNamePattern;
+ private final boolean separateByKey;
private boolean poisoned = false;
//used for tracking demarcated flowfiles to their TopicPartition so we can
append
//to them on subsequent poll calls
@@ -103,7 +105,8 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
final RecordSetWriterFactory writerFactory,
final ComponentLog logger,
final Charset headerCharacterSet,
- final Pattern headerNamePattern) {
+ final Pattern headerNamePattern,
+ final boolean separateByKey) {
this.maxWaitMillis = maxWaitMillis;
this.kafkaConsumer = kafkaConsumer;
this.demarcatorBytes = demarcatorBytes;
@@ -115,6 +118,7 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
this.logger = logger;
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
+ this.separateByKey = separateByKey;
}
/**
@@ -412,7 +416,7 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
private void writeDemarcatedData(final ProcessSession session, final
List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition
topicPartition) {
// Group the Records by their BundleInformation
final Map<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> map
= records.stream()
- .collect(Collectors.groupingBy(rec -> new
BundleInformation(topicPartition, null, getAttributes(rec))));
+ .collect(Collectors.groupingBy(rec -> new
BundleInformation(topicPartition, null, getAttributes(rec), separateByKey ?
rec.key() : null)));
for (final Map.Entry<BundleInformation, List<ConsumerRecord<byte[],
byte[]>>> entry : map.entrySet()) {
final BundleInformation bundleInfo = entry.getKey();
@@ -538,7 +542,7 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
while ((record = reader.nextRecord()) != null) {
// Determine the bundle for this record.
final RecordSchema recordSchema =
record.getSchema();
- final BundleInformation bundleInfo = new
BundleInformation(topicPartition, recordSchema, attributes);
+ final BundleInformation bundleInfo = new
BundleInformation(topicPartition, recordSchema, attributes, separateByKey ?
consumerRecord.key() : null);
BundleTracker tracker = bundleMap.get(bundleInfo);
if (tracker == null) {
@@ -626,9 +630,16 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET,
String.valueOf(tracker.initialOffset));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP,
String.valueOf(tracker.initialTimestamp));
- if (tracker.key != null && tracker.totalRecords == 1) {
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+
+ // If we have a kafka key, we will add it as an attribute only if
+ // the FlowFile contains a single Record, or if the Records have been
separated by Key,
+ // because we then know that even though there are multiple Records,
they all have the same key.
+ if (tracker.key != null && (tracker.totalRecords == 1 ||
separateByKey)) {
+ if
(!keyEncoding.equalsIgnoreCase(KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE.getValue()))
{
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+ }
}
+
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION,
String.valueOf(tracker.partition));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
if (tracker.totalRecords > 1) {
@@ -647,8 +658,8 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
tracker.updateFlowFile(newFlowFile);
}
- private static class BundleTracker {
+ private static class BundleTracker {
final long initialOffset;
final long initialTimestamp;
final int partition;
@@ -678,23 +689,24 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
private void updateFlowFile(final FlowFile flowFile) {
this.flowFile = flowFile;
}
-
}
private static class BundleInformation {
private final TopicPartition topicPartition;
private final RecordSchema schema;
private final Map<String, String> attributes;
+ private final byte[] messageKey;
- public BundleInformation(final TopicPartition topicPartition, final
RecordSchema schema, final Map<String, String> attributes) {
+ public BundleInformation(final TopicPartition topicPartition, final
RecordSchema schema, final Map<String, String> attributes, final byte[]
messageKey) {
this.topicPartition = topicPartition;
this.schema = schema;
this.attributes = attributes;
+ this.messageKey = messageKey;
}
@Override
public int hashCode() {
- return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0
: 13 * schema.hashCode()) + ((attributes == null) ? 0 : 13 *
attributes.hashCode());
+ return 41 + Objects.hash(topicPartition, schema, attributes) + 37
* Arrays.hashCode(messageKey);
}
@Override
@@ -710,7 +722,8 @@ public abstract class ConsumerLease implements Closeable,
ConsumerRebalanceListe
}
final BundleInformation other = (BundleInformation) obj;
- return Objects.equals(topicPartition, other.topicPartition) &&
Objects.equals(schema, other.schema) && Objects.equals(attributes,
other.attributes);
+ return Objects.equals(topicPartition, other.topicPartition) &&
Objects.equals(schema, other.schema) && Objects.equals(attributes,
other.attributes)
+ && Arrays.equals(this.messageKey, other.messageKey);
}
}
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 7f02b26..2a33298 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -59,6 +59,7 @@ public class ConsumerPool implements Closeable {
private final RecordSetWriterFactory writerFactory;
private final Charset headerCharacterSet;
private final Pattern headerNamePattern;
+ private final boolean separateByKey;
private final AtomicLong consumerCreatedCountRef = new AtomicLong();
private final AtomicLong consumerClosedCountRef = new AtomicLong();
private final AtomicLong leasesObtainedCountRef = new AtomicLong();
@@ -86,6 +87,7 @@ public class ConsumerPool implements Closeable {
public ConsumerPool(
final int maxConcurrentLeases,
final byte[] demarcator,
+ final boolean separateByKey,
final Map<String, Object> kafkaProperties,
final List<String> topics,
final long maxWaitMillis,
@@ -111,11 +113,13 @@ public class ConsumerPool implements Closeable {
this.honorTransactions = honorTransactions;
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
+ this.separateByKey = separateByKey;
}
public ConsumerPool(
final int maxConcurrentLeases,
final byte[] demarcator,
+ final boolean separateByKey,
final Map<String, Object> kafkaProperties,
final Pattern topics,
final long maxWaitMillis,
@@ -141,6 +145,7 @@ public class ConsumerPool implements Closeable {
this.honorTransactions = honorTransactions;
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
+ this.separateByKey = separateByKey;
}
public ConsumerPool(
@@ -155,12 +160,13 @@ public class ConsumerPool implements Closeable {
final ComponentLog logger,
final boolean honorTransactions,
final Charset headerCharacterSet,
- final Pattern headerNamePattern) {
+ final Pattern headerNamePattern,
+ final boolean separateByKey,
+ final String keyEncoding) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = null;
- this.keyEncoding = null;
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.securityProtocol = securityProtocol;
@@ -171,6 +177,8 @@ public class ConsumerPool implements Closeable {
this.honorTransactions = honorTransactions;
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
+ this.separateByKey = separateByKey;
+ this.keyEncoding = keyEncoding;
}
public ConsumerPool(
@@ -185,12 +193,13 @@ public class ConsumerPool implements Closeable {
final ComponentLog logger,
final boolean honorTransactions,
final Charset headerCharacterSet,
- final Pattern headerNamePattern) {
+ final Pattern headerNamePattern,
+ final boolean separateByKey,
+ final String keyEncoding) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
this.demarcatorBytes = null;
- this.keyEncoding = null;
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.securityProtocol = securityProtocol;
@@ -201,6 +210,8 @@ public class ConsumerPool implements Closeable {
this.honorTransactions = honorTransactions;
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
+ this.separateByKey = separateByKey;
+ this.keyEncoding = keyEncoding;
}
/**
@@ -301,7 +312,7 @@ public class ConsumerPool implements Closeable {
private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding,
securityProtocol, bootstrapServers,
- readerFactory, writerFactory, logger, headerCharacterSet,
headerNamePattern);
+ readerFactory, writerFactory, logger, headerCharacterSet,
headerNamePattern, separateByKey);
this.consumer = consumer;
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index e756776..44a6984 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -50,7 +50,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-import java.util.regex.Pattern;
public final class KafkaProcessorUtils {
private static final String ALLOW_EXPLICIT_KEYTAB =
"NIFI_ALLOW_EXPLICIT_KEYTAB";
@@ -60,8 +59,8 @@ public final class KafkaProcessorUtils {
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8",
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex
Encoded",
"The key is interpreted as arbitrary binary data and is encoded
using hexadecimal characters with uppercase letters");
-
- static final Pattern HEX_KEY_PATTERN =
Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
+ static final AllowableValue DO_NOT_ADD_KEY_AS_ATTRIBUTE = new
AllowableValue("do-not-add", "Do Not Add Key as Attribute",
+ "The key will not be added as an Attribute");
static final String KAFKA_KEY = "kafka.key";
static final String KAFKA_TOPIC = "kafka.topic";
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 9d53ee6..d006a6e 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -70,6 +70,7 @@ public class ConsumerPoolTest {
testPool = new ConsumerPool(
1,
null,
+ false,
Collections.emptyMap(),
Collections.singletonList("nifi"),
100L,
@@ -88,6 +89,7 @@ public class ConsumerPoolTest {
testDemarcatedPool = new ConsumerPool(
1,
"--demarcator--".getBytes(StandardCharsets.UTF_8),
+ false,
Collections.emptyMap(),
Collections.singletonList("nifi"),
100L,