This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 3543b9c NIFI-6797: Add support for specifying Partition via EL or
RecordPath for PublishKafka(Record)_1_0 and PublishKafka(Record)_2_0
3543b9c is described below
commit 3543b9c42cb00b8365059a990aa4f77f23c4892b
Author: Mark Payne <[email protected]>
AuthorDate: Tue Oct 22 14:30:52 2019 -0400
NIFI-6797: Add support for specifying Partition via EL or RecordPath for
PublishKafka(Record)_1_0 and PublishKafka(Record)_2_0
This closes #3834.
Signed-off-by: Bryan Bende <[email protected]>
---
.../nifi-kafka-1-0-processors/pom.xml | 8 +-
.../nifi/processors/kafka/pubsub/Partitioners.java | 40 +++++-
.../kafka/pubsub/PublishKafkaRecord_1_0.java | 88 +++++++++++-
.../processors/kafka/pubsub/PublishKafka_1_0.java | 82 +++++++++---
.../processors/kafka/pubsub/PublisherLease.java | 49 +++----
.../processors/kafka/pubsub/PublisherPool.java | 11 +-
.../processors/kafka/pubsub/TestPublishKafka.java | 43 +++---
.../kafka/pubsub/TestPublishKafkaRecord_1_0.java | 61 +++++----
.../kafka/pubsub/TestPublisherLease.java | 16 +--
.../nifi-kafka-2-0-processors/pom.xml | 6 +
.../nifi/processors/kafka/pubsub/Partitioners.java | 40 +++++-
.../kafka/pubsub/PublishKafkaRecord_2_0.java | 149 ++++++++++++++++-----
.../processors/kafka/pubsub/PublishKafka_2_0.java | 45 ++++++-
.../processors/kafka/pubsub/PublisherLease.java | 49 +++----
.../processors/kafka/pubsub/PublisherPool.java | 9 +-
.../kafka/pubsub/TestPublishKafkaRecord_2_0.java | 82 ++++++++++--
.../kafka/pubsub/TestPublishKafka_2_0.java | 13 +-
.../kafka/pubsub/TestPublisherLease.java | 16 +--
18 files changed, 605 insertions(+), 202 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
index 9b9b047..597cc80 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
@@ -52,7 +52,7 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kerberos-credentials-service-api</artifactId>
</dependency>
-
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
@@ -92,6 +92,12 @@
<version>2.6</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-path</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<profiles>
<profile>
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
index 64ab4ce..a7b20f2 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
@@ -16,11 +16,11 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
-import java.util.Map;
-
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
+import java.util.Map;
+
/**
* Collection of implementation of common Kafka {@link Partitioner}s.
*/
@@ -59,4 +59,40 @@ final public class Partitioners {
return index++;
}
}
+
+ public static class RecordPathPartitioner implements Partitioner {
+ @Override
+ public int partition(final String topic, final Object key, final
byte[] keyBytes, final Object value, final byte[] valueBytes, final Cluster
cluster) {
+ // When this partitioner is used, it is always overridden by
creating the ProducerRecord with the Partition directly specified. However, we
must have a unique value
+ // to set in the Producer's config, so this class exists
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs) {
+ }
+ }
+
+
+ public static class ExpressionLanguagePartitioner implements Partitioner {
+ @Override
+ public int partition(final String topic, final Object key, final
byte[] keyBytes, final Object value, final byte[] valueBytes, final Cluster
cluster) {
+ // When this partitioner is used, it is always overridden by
creating the ProducerRecord with the Partition directly specified. However, we
must have a unique value
+ // to set in the Producer's config, so this class exists
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs) {
+ }
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
index 2a7e7e7..6fc1ff1 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
@@ -41,11 +41,16 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
@@ -60,11 +65,16 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
+import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put",
"Send", "Message", "PubSub", "1.0"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records
to Apache Kafka using the Kafka 1.0 Producer API. "
+ "The contents of the FlowFile are expected to be record-oriented data
that can be read by the configured Record Reader. "
@@ -98,6 +108,12 @@ public class PublishKafkaRecord_1_0 extends
AbstractProcessor {
+ "the next Partition to Partition 2, and so on, wrapping as
necessary.");
static final AllowableValue RANDOM_PARTITIONING = new
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"DefaultPartitioner", "Messages will be assigned to random
partitions.");
+ static final AllowableValue RECORD_PATH_PARTITIONING = new
AllowableValue(Partitioners.RecordPathPartitioner.class.getName(),
+ "RecordPath Partitioner", "Interprets the <Partition> property as a
RecordPath that will be evaluated against each Record to determine which
partition the Record will go to. All Records " +
+ "that have the same value for the given RecordPath will go to the same
Partition.");
+ static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new
AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(),
"Expression Language Partitioner",
+ "Interprets the <Partition> property as Expression Language that will
be evaluated against each FlowFile. This Expression will be evaluated once
against the FlowFile, " +
+ "so all Records in a given FlowFile will go to the same
partition.");
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8",
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex
Encoded",
@@ -184,11 +200,20 @@ public class PublishKafkaRecord_1_0 extends
AbstractProcessor {
.name("partitioner.class")
.displayName("Partitioner class")
.description("Specifies which class to use to compute a partition id
for a message. Corresponds to Kafka's 'partitioner.class' property.")
- .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+ .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING,
RECORD_PATH_PARTITIONING, EXPRESSION_LANGUAGE_PARTITIONING)
.defaultValue(RANDOM_PARTITIONING.getValue())
.required(false)
.build();
+ static final PropertyDescriptor PARTITION = new
PropertyDescriptor.Builder()
+ .name("partition")
+ .displayName("Partition")
+ .description("Specifies which Partition Records will go to. How this
value is interpreted is dictated by the <Partitioner class> property.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .build();
+
static final PropertyDescriptor COMPRESSION_CODEC = new
PropertyDescriptor.Builder()
.name("compression.type")
.displayName("Compression Type")
@@ -253,6 +278,7 @@ public class PublishKafkaRecord_1_0 extends
AbstractProcessor {
private static final Set<Relationship> RELATIONSHIPS;
private volatile PublisherPool publisherPool = null;
+ private final RecordPathCache recordPathCache = new RecordPathCache(25);
static {
final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -276,6 +302,7 @@ public class PublishKafkaRecord_1_0 extends
AbstractProcessor {
properties.add(ACK_WAIT_TIME);
properties.add(METADATA_WAIT_TIME);
properties.add(PARTITION_CLASS);
+ properties.add(PARTITION);
properties.add(COMPRESSION_CODEC);
PROPERTIES = Collections.unmodifiableList(properties);
@@ -325,6 +352,32 @@ public class PublishKafkaRecord_1_0 extends
AbstractProcessor {
}
}
+ final String partitionClass =
validationContext.getProperty(PARTITION_CLASS).getValue();
+ if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
+ final String rawRecordPath =
validationContext.getProperty(PARTITION).getValue();
+ if (rawRecordPath == null) {
+ results.add(new ValidationResult.Builder()
+ .subject("Partition")
+ .valid(false)
+ .explanation("The <Partition> property must be specified
if using the RecordPath Partitioning class")
+ .build());
+ } else if
(!validationContext.isExpressionLanguagePresent(rawRecordPath)) {
+ final ValidationResult result = new
RecordPathValidator().validate(PARTITION.getDisplayName(), rawRecordPath,
validationContext);
+ if (result != null) {
+ results.add(result);
+ }
+ }
+ } else if
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+ final String rawRecordPath =
validationContext.getProperty(PARTITION).getValue();
+ if (rawRecordPath == null) {
+ results.add(new ValidationResult.Builder()
+ .subject("Partition")
+ .valid(false)
+ .explanation("The <Partition> property must be specified
if using the Expression Language Partitioning class")
+ .build());
+ }
+ }
+
return results;
}
@@ -414,6 +467,8 @@ public class PublishKafkaRecord_1_0 extends
AbstractProcessor {
final String topic =
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String messageKeyField =
context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
+ final Function<Record, Integer> partitioner =
getPartitioner(context, flowFile);
+
try {
session.read(flowFile, new InputStreamCallback() {
@Override
@@ -423,7 +478,7 @@ public class PublishKafkaRecord_1_0 extends
AbstractProcessor {
final RecordSet recordSet =
reader.createRecordSet();
final RecordSchema schema =
writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
- lease.publish(flowFile, recordSet,
writerFactory, schema, messageKeyField, topic);
+ lease.publish(flowFile, recordSet,
writerFactory, schema, messageKeyField, topic, partitioner);
} catch (final SchemaNotFoundException |
MalformedRecordException e) {
throw new ProcessException(e);
}
@@ -460,4 +515,33 @@ public class PublishKafkaRecord_1_0 extends
AbstractProcessor {
}
}
}
+
+ private Function<Record, Integer> getPartitioner(final ProcessContext
context, final FlowFile flowFile) {
+ final String partitionClass =
context.getProperty(PARTITION_CLASS).getValue();
+ if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
+ final String recordPath =
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+ final RecordPath compiled =
recordPathCache.getCompiled(recordPath);
+
+ return record -> evaluateRecordPath(compiled, record);
+ } else if
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+ final String partition =
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+ final int hash = Objects.hashCode(partition);
+ return (record) -> hash;
+ }
+
+ return null;
+ }
+
+ private Integer evaluateRecordPath(final RecordPath recordPath, final
Record record) {
+ final RecordPathResult result = recordPath.evaluate(record);
+ final LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);
+
+ result.getSelectedFields().forEach(fieldValue -> {
+ final Object value = fieldValue.getValue();
+ final long hash = Objects.hashCode(value);
+ accumulator.accumulate(hash);
+ });
+
+ return accumulator.intValue();
+ }
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
index 1c3efc0..c2fad36 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
@@ -17,25 +17,6 @@
package org.apache.nifi.processors.kafka.pubsub;
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import java.util.regex.Pattern;
-
-import javax.xml.bind.DatatypeConverter;
-
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -60,6 +41,27 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processor.util.StandardValidators;
+import javax.xml.bind.DatatypeConverter;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
+import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "1.0"})
@CapabilityDescription("Sends the contents of a FlowFile as a message to
Apache Kafka using the Kafka 1.0 Producer API."
+ "The messages to send may be individual FlowFiles or may be delimited,
using a "
@@ -94,6 +96,9 @@ public class PublishKafka_1_0 extends AbstractProcessor {
+ "the next Partition to Partition 2, and so on, wrapping as
necessary.");
static final AllowableValue RANDOM_PARTITIONING = new
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"DefaultPartitioner", "Messages will be assigned to random
partitions.");
+ static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new
AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(),
"Expression Language Partitioner",
+ "Interprets the <Partition> property as Expression Language that will
be evaluated against each FlowFile. This Expression will be evaluated once
against the FlowFile, " +
+ "so all Records in a given FlowFile will go to the same
partition.");
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8",
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex
Encoded",
@@ -187,11 +192,20 @@ public class PublishKafka_1_0 extends AbstractProcessor {
.name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
.displayName("Partitioner class")
.description("Specifies which class to use to compute a partition id
for a message. Corresponds to Kafka's 'partitioner.class' property.")
- .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+ .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING,
EXPRESSION_LANGUAGE_PARTITIONING)
.defaultValue(RANDOM_PARTITIONING.getValue())
.required(false)
.build();
+ static final PropertyDescriptor PARTITION = new
PropertyDescriptor.Builder()
+ .name("partition")
+ .displayName("Partition")
+ .description("Specifies which Partition Records will go to.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .build();
+
static final PropertyDescriptor COMPRESSION_CODEC = new
PropertyDescriptor.Builder()
.name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
.displayName("Compression Type")
@@ -273,6 +287,7 @@ public class PublishKafka_1_0 extends AbstractProcessor {
properties.add(ACK_WAIT_TIME);
properties.add(METADATA_WAIT_TIME);
properties.add(PARTITION_CLASS);
+ properties.add(PARTITION);
properties.add(COMPRESSION_CODEC);
PROPERTIES = Collections.unmodifiableList(properties);
@@ -322,6 +337,18 @@ public class PublishKafka_1_0 extends AbstractProcessor {
}
}
+ final String partitionClass =
validationContext.getProperty(PARTITION_CLASS).getValue();
+ if
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+ final String rawRecordPath =
validationContext.getProperty(PARTITION).getValue();
+ if (rawRecordPath == null) {
+ results.add(new ValidationResult.Builder()
+ .subject("Partition")
+ .valid(false)
+ .explanation("The <Partition> property must be specified
if using the Expression Language Partitioning class")
+ .build());
+ }
+ }
+
return results;
}
@@ -413,11 +440,12 @@ public class PublishKafka_1_0 extends AbstractProcessor {
demarcatorBytes = null;
}
+ final Integer partition = getPartition(context, flowFile);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws
IOException {
try (final InputStream in = new
BufferedInputStream(rawIn)) {
- lease.publish(flowFile, in, messageKey,
demarcatorBytes, topic);
+ lease.publish(flowFile, in, messageKey,
demarcatorBytes, topic, partition);
}
}
});
@@ -469,4 +497,16 @@ public class PublishKafka_1_0 extends AbstractProcessor {
return DatatypeConverter.parseHexBinary(uninterpretedKey);
}
+
+ private Integer getPartition(final ProcessContext context, final FlowFile
flowFile) {
+ final String partitionClass =
context.getProperty(PARTITION_CLASS).getValue();
+ if
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+ final String partition =
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+ final int hash = Objects.hashCode(partition);
+ return hash;
+ }
+
+ return null;
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index a2ddd81..f8587d9 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -17,19 +17,6 @@
package org.apache.nifi.processors.kafka.pubsub;
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -48,6 +35,20 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
public class PublisherLease implements Closeable {
private final ComponentLog logger;
private final Producer<byte[], byte[]> producer;
@@ -111,7 +112,7 @@ public class PublisherLease implements Closeable {
rollback();
}
- void publish(final FlowFile flowFile, final InputStream flowFileContent,
final byte[] messageKey, final byte[] demarcatorBytes, final String topic)
throws IOException {
+ void publish(final FlowFile flowFile, final InputStream flowFileContent,
final byte[] messageKey, final byte[] demarcatorBytes, final String topic,
final Integer partition) throws IOException {
if (tracker == null) {
tracker = new InFlightMessageTracker(logger);
}
@@ -126,13 +127,13 @@ public class PublisherLease implements Closeable {
// Send FlowFile content as it is, to support sending 0 byte
message.
messageContent = new byte[(int) flowFile.getSize()];
StreamUtils.fillBuffer(flowFileContent, messageContent);
- publish(flowFile, messageKey, messageContent, topic, tracker);
+ publish(flowFile, messageKey, messageContent, topic, tracker,
partition);
return;
}
try (final StreamDemarcator demarcator = new
StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
while ((messageContent = demarcator.nextToken()) != null) {
- publish(flowFile, messageKey, messageContent, topic,
tracker);
+ publish(flowFile, messageKey, messageContent, topic,
tracker, partition);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything
else.
@@ -150,7 +151,7 @@ public class PublisherLease implements Closeable {
}
void publish(final FlowFile flowFile, final RecordSet recordSet, final
RecordSetWriterFactory writerFactory, final RecordSchema schema,
- final String messageKeyField, final String topic) throws IOException {
+ final String messageKeyField, final String topic, final
Function<Record, Integer> partitioner) throws IOException {
if (tracker == null) {
tracker = new InFlightMessageTracker(logger);
}
@@ -176,7 +177,8 @@ public class PublisherLease implements Closeable {
final String key = messageKeyField == null ? null :
record.getAsString(messageKeyField);
final byte[] messageKey = (key == null) ? null :
key.getBytes(StandardCharsets.UTF_8);
- publish(flowFile, additionalAttributes, messageKey,
messageContent, topic, tracker);
+ final Integer partition = partitioner == null ? null :
partitioner.apply(record);
+ publish(flowFile, additionalAttributes, messageKey,
messageContent, topic, tracker, partition);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.
@@ -217,14 +219,15 @@ public class PublisherLease implements Closeable {
}
}
- protected void publish(final FlowFile flowFile, final byte[] messageKey,
final byte[] messageContent, final String topic, final InFlightMessageTracker
tracker) {
- publish(flowFile, Collections.emptyMap(), messageKey, messageContent,
topic, tracker);
+ protected void publish(final FlowFile flowFile, final byte[] messageKey,
final byte[] messageContent, final String topic, final InFlightMessageTracker
tracker, final Integer partition) {
+ publish(flowFile, Collections.emptyMap(), messageKey, messageContent,
topic, tracker, partition);
}
- protected void publish(final FlowFile flowFile, final Map<String, String>
additionalAttributes,
- final byte[] messageKey, final byte[] messageContent, final String
topic, final InFlightMessageTracker tracker) {
+ protected void publish(final FlowFile flowFile, final Map<String, String>
additionalAttributes, final byte[] messageKey, final byte[] messageContent,
+ final String topic, final InFlightMessageTracker
tracker, final Integer partition) {
- final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(topic, null, messageKey, messageContent);
+ final Integer moddedPartition = partition == null ? null :
Math.abs(partition) % (producer.partitionsFor(topic).size());
+ final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(topic, moddedPartition, messageKey, messageContent);
addHeaders(flowFile, additionalAttributes, record);
producer.send(record, new Callback() {
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
index 54811cc..0de5111 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
@@ -17,6 +17,10 @@
package org.apache.nifi.processors.kafka.pubsub;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.nifi.logging.ComponentLog;
+
import java.io.Closeable;
import java.nio.charset.Charset;
import java.util.HashMap;
@@ -26,10 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.nifi.logging.ComponentLog;
-
public class PublisherPool implements Closeable {
private final ComponentLog logger;
private final BlockingQueue<PublisherLease> publisherQueue;
@@ -44,7 +44,7 @@ public class PublisherPool implements Closeable {
private volatile boolean closed = false;
PublisherPool(final Map<String, Object> kafkaProperties, final
ComponentLog logger, final int maxMessageSize, final long maxAckWaitMillis,
- final boolean useTransactions, final Supplier<String>
transactionalIdSupplier, final Pattern attributeNameRegex, final Charset
headerCharacterSet) {
+ final boolean useTransactions, final Supplier<String>
transactionalIdSupplier, final Pattern attributeNameRegex, final Charset
headerCharacterSet) {
this.logger = logger;
this.publisherQueue = new LinkedBlockingQueue<>();
this.kafkaProperties = kafkaProperties;
@@ -77,7 +77,6 @@ public class PublisherPool implements Closeable {
}
final Producer<byte[], byte[]> producer = new
KafkaProducer<>(properties);
-
final PublisherLease lease = new PublisherLease(producer,
maxMessageSize, maxAckWaitMillis, logger, useTransactions, attributeNameRegex,
headerCharacterSet) {
@Override
public void close() {
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
index 44a709f..82e0b18 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
@@ -17,14 +17,13 @@
package org.apache.nifi.processors.kafka.pubsub;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
@@ -37,13 +36,15 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestPublishKafka {
private static final String TOPIC_NAME = "unit-test";
@@ -79,7 +80,7 @@ public class TestPublishKafka {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_SUCCESS, 1);
- verify(mockLease, times(1)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME),
nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -98,7 +99,7 @@ public class TestPublishKafka {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_SUCCESS, 3);
- verify(mockLease, times(3)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(3)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME),
nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -113,7 +114,7 @@ public class TestPublishKafka {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_FAILURE, 1);
- verify(mockLease, times(1)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME),
nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@@ -130,7 +131,7 @@ public class TestPublishKafka {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_FAILURE, 3);
- verify(mockLease, times(3)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(3)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME),
nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@@ -152,7 +153,7 @@ public class TestPublishKafka {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_SUCCESS, 2);
- verify(mockLease, times(2)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(2)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME),
nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -191,7 +192,7 @@ public class TestPublishKafka {
runner.assertTransferCount(PublishKafka_1_0.REL_SUCCESS, 0);
runner.assertTransferCount(PublishKafka_1_0.REL_FAILURE, 4);
- verify(mockLease, times(4)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(4)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME),
nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
index 2441bff..2d283b9 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
@@ -17,26 +17,6 @@
package org.apache.nifi.processors.kafka.pubsub;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
@@ -54,6 +34,28 @@ import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.Mockito;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
public class TestPublishKafkaRecord_1_0 {
private static final String TOPIC_NAME = "unit-test";
@@ -67,7 +69,7 @@ public class TestPublishKafkaRecord_1_0 {
mockPool = mock(PublisherPool.class);
mockLease = mock(PublisherLease.class);
Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- any(RecordSchema.class), any(String.class), any(String.class));
+ any(RecordSchema.class), any(String.class), any(String.class),
nullable(Function.class));
when(mockPool.obtainPublisher()).thenReturn(mockLease);
@@ -107,7 +109,7 @@ public class TestPublishKafkaRecord_1_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -126,7 +128,7 @@ public class TestPublishKafkaRecord_1_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -142,7 +144,7 @@ public class TestPublishKafkaRecord_1_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_FAILURE, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@@ -160,7 +162,7 @@ public class TestPublishKafkaRecord_1_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_FAILURE, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@@ -183,8 +185,9 @@ public class TestPublishKafkaRecord_1_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 2);
verify(mockLease, times(2)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
- verify(mockLease, times(0)).publish(any(FlowFile.class),
any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME),
any(InFlightMessageTracker.class));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
+ verify(mockLease, times(0)).publish(
+ any(FlowFile.class), any(Map.class), eq(null), any(byte[].class),
eq(TOPIC_NAME), any(InFlightMessageTracker.class), nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -214,7 +217,7 @@ public class TestPublishKafkaRecord_1_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -249,7 +252,7 @@ public class TestPublishKafkaRecord_1_0 {
runner.assertTransferCount(PublishKafkaRecord_1_0.REL_FAILURE, 4);
verify(mockLease, times(4)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 3f726a4..156af52 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -95,7 +95,7 @@ public class TestPublisherLease {
};
try {
- lease.publish(flowFile, failureInputStream, messageKey,
demarcatorBytes, topic);
+ lease.publish(flowFile, failureInputStream, messageKey,
demarcatorBytes, topic, null);
Assert.fail("Expected IOException");
} catch (final IOException ioe) {
// expected
@@ -134,7 +134,7 @@ public class TestPublisherLease {
}
}).when(producer).send(any(ProducerRecord.class), any(Callback.class));
- lease.publish(flowFile, new ByteArrayInputStream(new byte[1]),
messageKey, demarcatorBytes, topic);
+ lease.publish(flowFile, new ByteArrayInputStream(new byte[1]),
messageKey, demarcatorBytes, topic, null);
assertEquals(1, poisonCount.get());
@@ -179,16 +179,16 @@ public class TestPublisherLease {
final byte[] demarcatorBytes = "\n".getBytes(StandardCharsets.UTF_8);
final byte[] flowFileContent =
"1234567890\n1234567890\n1234567890\n\n\n\n1234567890\n\n\n1234567890\n\n\n\n".getBytes(StandardCharsets.UTF_8);
- lease.publish(flowFile, new ByteArrayInputStream(flowFileContent),
messageKey, demarcatorBytes, topic);
+ lease.publish(flowFile, new ByteArrayInputStream(flowFileContent),
messageKey, demarcatorBytes, topic, null);
final byte[] flowFileContent2 = new byte[0];
- lease.publish(new MockFlowFile(2L), new
ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic);
+ lease.publish(new MockFlowFile(2L), new
ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic,
null);
final byte[] flowFileContent3 =
"1234567890\n1234567890".getBytes(StandardCharsets.UTF_8); // no trailing new
line
- lease.publish(new MockFlowFile(3L), new
ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic);
+ lease.publish(new MockFlowFile(3L), new
ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic,
null);
final byte[] flowFileContent4 =
"\n\n\n".getBytes(StandardCharsets.UTF_8);
- lease.publish(new MockFlowFile(4L), new
ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic);
+ lease.publish(new MockFlowFile(4L), new
ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic,
null);
assertEquals(0, poisonCount.get());
@@ -240,7 +240,7 @@ public class TestPublisherLease {
final byte[] demarcatorBytes = null;
final byte[] flowFileContent = new byte[0];
- lease.publish(flowFile, new ByteArrayInputStream(flowFileContent),
messageKey, demarcatorBytes, topic);
+ lease.publish(flowFile, new ByteArrayInputStream(flowFileContent),
messageKey, demarcatorBytes, topic, null);
assertEquals(0, poisonCount.get());
@@ -279,7 +279,7 @@ public class TestPublisherLease {
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any(),
eq(flowFile))).thenReturn(writer);
- lease.publish(flowFile, recordSet, writerFactory, schema, keyField,
topic);
+ lease.publish(flowFile, recordSet, writerFactory, schema, keyField,
topic, null);
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema),
any(), eq(flowFile));
verify(writer, times(2)).write(any(Record.class));
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
index 8c934bc..7cb08e3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
@@ -97,6 +97,12 @@
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-path</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<profiles>
<profile>
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
index 64ab4ce..a7b20f2 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
@@ -16,11 +16,11 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
-import java.util.Map;
-
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
+import java.util.Map;
+
/**
* Collection of implementation of common Kafka {@link Partitioner}s.
*/
@@ -59,4 +59,40 @@ final public class Partitioners {
return index++;
}
}
+
+ public static class RecordPathPartitioner implements Partitioner {
+ @Override
+ public int partition(final String topic, final Object key, final
byte[] keyBytes, final Object value, final byte[] valueBytes, final Cluster
cluster) {
+ // When this partitioner is used, it is always overridden by
creating the ProducerRecord with the Partition directly specified. However, we
must have a unique value
+ // to set in the Producer's config, so this class exists
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs) {
+ }
+ }
+
+
+ public static class ExpressionLanguagePartitioner implements Partitioner {
+ @Override
+ public int partition(final String topic, final Object key, final
byte[] keyBytes, final Object value, final byte[] valueBytes, final Cluster
cluster) {
+ // When this partitioner is used, it is always overridden by
creating the ProducerRecord with the Partition directly specified. However, we
must have a unique value
+ // to set in the Producer's config, so this class exists
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs) {
+ }
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
index 628d4ad..fe502b3 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
@@ -28,9 +28,9 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@@ -41,11 +41,16 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
@@ -60,11 +65,18 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
+import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put",
"Send", "Message", "PubSub", "2.0"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records
to Apache Kafka using the Kafka 2.0 Producer API. "
+ "The contents of the FlowFile are expected to be record-oriented data
that can be read by the configured Record Reader. "
@@ -74,7 +86,7 @@ import java.util.regex.Pattern;
description = "These properties will be added on the Kafka configuration
after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was already
set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to:
http://kafka.apache.org/documentation.html#configuration. ",
- expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
+ expressionLanguageScope = VARIABLE_REGISTRY)
@WritesAttribute(attribute = "msg.count", description = "The number of
messages that were sent to Kafka for this FlowFile. This attribute is added
only to "
+ "FlowFiles that are routed to success.")
@SeeAlso({PublishKafka_2_0.class, ConsumeKafka_2_0.class,
ConsumeKafkaRecord_2_0.class})
@@ -98,80 +110,83 @@ public class PublishKafkaRecord_2_0 extends
AbstractProcessor {
+ "the next Partition to Partition 2, and so on, wrapping as
necessary.");
static final AllowableValue RANDOM_PARTITIONING = new
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"DefaultPartitioner", "Messages will be assigned to random
partitions.");
+ static final AllowableValue RECORD_PATH_PARTITIONING = new
AllowableValue(Partitioners.RecordPathPartitioner.class.getName(),
+ "RecordPath Partitioner", "Interprets the <Partition> property as a
RecordPath that will be evaluated against each Record to determine which
partition the Record will go to. All Records " +
+ "that have the same value for the given RecordPath will go to the same
Partition.");
+ static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new
AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(),
"Expression Language Partitioner",
+ "Interprets the <Partition> property as Expression Language that will
be evaluated against each FlowFile. This Expression will be evaluated once
against the FlowFile, " +
+ "so all Records in a given FlowFile will go to the same
partition.");
- static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8",
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
- static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex
Encoded",
- "The key is interpreted as arbitrary binary data that is encoded using
hexadecimal characters with uppercase letters.");
- static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor TOPIC = new Builder()
.name("topic")
.displayName("Topic Name")
.description("The name of the Kafka Topic to publish to.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();
- static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor RECORD_READER = new Builder()
.name("record-reader")
.displayName("Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.required(true)
.build();
- static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor RECORD_WRITER = new Builder()
.name("record-writer")
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data
before sending to Kafka")
.identifiesControllerService(RecordSetWriterFactory.class)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.required(true)
.build();
- static final PropertyDescriptor MESSAGE_KEY_FIELD = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor MESSAGE_KEY_FIELD = new Builder()
.name("message-key-field")
.displayName("Message Key Field")
.description("The name of a field in the Input Records that should be
used as the Key for the Kafka message.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.required(false)
.build();
- static final PropertyDescriptor DELIVERY_GUARANTEE = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor DELIVERY_GUARANTEE = new Builder()
.name("acks")
.displayName("Delivery Guarantee")
.description("Specifies the requirement for guaranteeing that a
message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
.required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE,
DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build();
- static final PropertyDescriptor METADATA_WAIT_TIME = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor METADATA_WAIT_TIME = new Builder()
.name("max.block.ms")
.displayName("Max Metadata Wait Time")
.description("The amount of time publisher will wait to obtain
metadata or wait for the buffer to flush during the 'send' call before failing
the "
+ "entire 'send' call. Corresponds to Kafka's 'max.block.ms'
property")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.defaultValue("5 sec")
.build();
- static final PropertyDescriptor ACK_WAIT_TIME = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor ACK_WAIT_TIME = new Builder()
.name("ack.wait.time")
.displayName("Acknowledgment Wait Time")
.description("After sending a message to Kafka, this indicates the
amount of time that we are willing to wait for a response from Kafka. "
+ "If Kafka does not acknowledge the message within this time
period, the FlowFile will be routed to 'failure'.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.required(true)
.defaultValue("5 secs")
.build();
- static final PropertyDescriptor MAX_REQUEST_SIZE = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor MAX_REQUEST_SIZE = new Builder()
.name("max.request.size")
.displayName("Max Request Size")
.description("The maximum size of a request in bytes. Corresponds to
Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
@@ -180,16 +195,25 @@ public class PublishKafkaRecord_2_0 extends
AbstractProcessor {
.defaultValue("1 MB")
.build();
- static final PropertyDescriptor PARTITION_CLASS = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor PARTITION_CLASS = new Builder()
.name("partitioner.class")
.displayName("Partitioner class")
.description("Specifies which class to use to compute a partition id
for a message. Corresponds to Kafka's 'partitioner.class' property.")
- .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+ .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING,
RECORD_PATH_PARTITIONING, EXPRESSION_LANGUAGE_PARTITIONING)
.defaultValue(RANDOM_PARTITIONING.getValue())
.required(false)
.build();
- static final PropertyDescriptor COMPRESSION_CODEC = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor PARTITION = new Builder()
+ .name("partition")
+ .displayName("Partition")
+ .description("Specifies which Partition Records will go to. How this
value is interpreted is dictated by the <Partitioner class> property.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .build();
+
+ static final PropertyDescriptor COMPRESSION_CODEC = new Builder()
.name("compression.type")
.displayName("Compression Type")
.description("This parameter allows you to specify the compression
codec for all data generated by this producer.")
@@ -199,37 +223,37 @@ public class PublishKafkaRecord_2_0 extends
AbstractProcessor {
.defaultValue("none")
.build();
- static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new Builder()
.name("attribute-name-regex")
.displayName("Attributes to Send as Headers (Regex)")
.description("A Regular Expression that is matched against all
FlowFile attribute names. "
+ "Any attribute whose name matches the regex will be added to the
Kafka messages as a Header. "
+ "If not specified, no FlowFile attributes will be added as
headers.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.required(false)
.build();
- static final PropertyDescriptor USE_TRANSACTIONS = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor USE_TRANSACTIONS = new Builder()
.name("use-transactions")
.displayName("Use Transactions")
.description("Specifies whether or not NiFi should provide
Transactional guarantees when communicating with Kafka. If there is a problem
sending data to Kafka, "
+ "and this property is set to false, then the messages that have
already been sent to Kafka will continue on and be delivered to consumers. "
+ "If this is set to true, then the Kafka transaction will be
rolled back so that those messages are not available to consumers. Setting this
to true "
+ "requires that the <Delivery Guarantee> property be set to
\"Guarantee Replicated Delivery.\"")
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
- static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new Builder()
.name("transactional-id-prefix")
.displayName("Transactional Id Prefix")
.description("When Use Transaction is set to true, KafkaProducer
config 'transactional.id' will be a generated UUID and will be prefixed with
this string.")
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.required(false)
.build();
- static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new Builder()
.name("message-header-encoding")
.displayName("Message Header Encoding")
.description("For any attribute that is added as a message header, as
configured via the <Attributes to Send as Headers> property, "
@@ -253,6 +277,7 @@ public class PublishKafkaRecord_2_0 extends
AbstractProcessor {
private static final Set<Relationship> RELATIONSHIPS;
private volatile PublisherPool publisherPool = null;
+ private final RecordPathCache recordPathCache = new RecordPathCache(25);
static {
final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -280,6 +305,7 @@ public class PublishKafkaRecord_2_0 extends
AbstractProcessor {
properties.add(ACK_WAIT_TIME);
properties.add(METADATA_WAIT_TIME);
properties.add(PARTITION_CLASS);
+ properties.add(PARTITION);
properties.add(COMPRESSION_CODEC);
PROPERTIES = Collections.unmodifiableList(properties);
@@ -302,12 +328,12 @@ public class PublishKafkaRecord_2_0 extends
AbstractProcessor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
+ return new Builder()
.description("Specifies the value for '" + propertyDescriptorName
+ "' Kafka Configuration.")
.name(propertyDescriptorName)
.addValidator(new
KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
.dynamic(true)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
}
@@ -329,6 +355,32 @@ public class PublishKafkaRecord_2_0 extends
AbstractProcessor {
}
}
+ final String partitionClass =
validationContext.getProperty(PARTITION_CLASS).getValue();
+ if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
+ final String rawRecordPath =
validationContext.getProperty(PARTITION).getValue();
+ if (rawRecordPath == null) {
+ results.add(new ValidationResult.Builder()
+ .subject("Partition")
+ .valid(false)
+ .explanation("The <Partition> property must be specified
if using the RecordPath Partitioning class")
+ .build());
+ } else if
(!validationContext.isExpressionLanguagePresent(rawRecordPath)) {
+ final ValidationResult result = new
RecordPathValidator().validate(PARTITION.getDisplayName(), rawRecordPath,
validationContext);
+ if (result != null) {
+ results.add(result);
+ }
+ }
+ } else if
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+ final String rawRecordPath =
validationContext.getProperty(PARTITION).getValue();
+ if (rawRecordPath == null) {
+ results.add(new ValidationResult.Builder()
+ .subject("Partition")
+ .valid(false)
+ .explanation("The <Partition> property must be specified
if using the Expression Language Partitioning class")
+ .build());
+ }
+ }
+
return results;
}
@@ -418,6 +470,8 @@ public class PublishKafkaRecord_2_0 extends
AbstractProcessor {
final String topic =
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String messageKeyField =
context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
+ final Function<Record, Integer> partitioner =
getPartitioner(context, flowFile);
+
try {
session.read(flowFile, new InputStreamCallback() {
@Override
@@ -427,7 +481,7 @@ public class PublishKafkaRecord_2_0 extends
AbstractProcessor {
final RecordSet recordSet =
reader.createRecordSet();
final RecordSchema schema =
writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
- lease.publish(flowFile, recordSet,
writerFactory, schema, messageKeyField, topic);
+ lease.publish(flowFile, recordSet,
writerFactory, schema, messageKeyField, topic, partitioner);
} catch (final SchemaNotFoundException |
MalformedRecordException e) {
throw new ProcessException(e);
}
@@ -464,4 +518,33 @@ public class PublishKafkaRecord_2_0 extends
AbstractProcessor {
}
}
}
+
+ private Function<Record, Integer> getPartitioner(final ProcessContext
context, final FlowFile flowFile) {
+ final String partitionClass =
context.getProperty(PARTITION_CLASS).getValue();
+ if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
+ final String recordPath =
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+ final RecordPath compiled =
recordPathCache.getCompiled(recordPath);
+
+ return record -> evaluateRecordPath(compiled, record);
+ } else if
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+ final String partition =
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+ final int hash = Objects.hashCode(partition);
+ return (record) -> hash;
+ }
+
+ return null;
+ }
+
+ private Integer evaluateRecordPath(final RecordPath recordPath, final
Record record) {
+ final RecordPathResult result = recordPath.evaluate(record);
+ final LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);
+
+ result.getSelectedFields().forEach(fieldValue -> {
+ final Object value = fieldValue.getValue();
+ final long hash = Objects.hashCode(value);
+ accumulator.accumulate(hash);
+ });
+
+ return accumulator.intValue();
+ }
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
index fdfc2e8..a42860b 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
@@ -54,11 +54,14 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
+import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "2.0"})
@CapabilityDescription("Sends the contents of a FlowFile as a message to
Apache Kafka using the Kafka 2.0 Producer API."
+ "The messages to send may be individual FlowFiles or may be delimited,
using a "
@@ -93,6 +96,9 @@ public class PublishKafka_2_0 extends AbstractProcessor {
+ "the next Partition to Partition 2, and so on, wrapping as
necessary.");
static final AllowableValue RANDOM_PARTITIONING = new
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"DefaultPartitioner", "Messages will be assigned to random
partitions.");
+ static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new
AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(),
"Expression Language Partitioner",
+ "Interprets the <Partition> property as Expression Language that will
be evaluated against each FlowFile. This Expression will be evaluated once
against the FlowFile, " +
+ "so all Records in a given FlowFile will go to the same
partition.");
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8",
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex
Encoded",
@@ -186,11 +192,20 @@ public class PublishKafka_2_0 extends AbstractProcessor {
.name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
.displayName("Partitioner class")
.description("Specifies which class to use to compute a partition id
for a message. Corresponds to Kafka's 'partitioner.class' property.")
- .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+ .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING,
EXPRESSION_LANGUAGE_PARTITIONING)
.defaultValue(RANDOM_PARTITIONING.getValue())
.required(false)
.build();
+ static final PropertyDescriptor PARTITION = new
PropertyDescriptor.Builder()
+ .name("partition")
+ .displayName("Partition")
+ .description("Specifies which Partition Records will go to.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .build();
+
static final PropertyDescriptor COMPRESSION_CODEC = new
PropertyDescriptor.Builder()
.name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
.displayName("Compression Type")
@@ -272,6 +287,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
properties.add(ACK_WAIT_TIME);
properties.add(METADATA_WAIT_TIME);
properties.add(PARTITION_CLASS);
+ properties.add(PARTITION);
properties.add(COMPRESSION_CODEC);
PROPERTIES = Collections.unmodifiableList(properties);
@@ -321,6 +337,18 @@ public class PublishKafka_2_0 extends AbstractProcessor {
}
}
+ final String partitionClass =
validationContext.getProperty(PARTITION_CLASS).getValue();
+ if
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+ final String rawRecordPath =
validationContext.getProperty(PARTITION).getValue();
+ if (rawRecordPath == null) {
+ results.add(new ValidationResult.Builder()
+ .subject("Partition")
+ .valid(false)
+ .explanation("The <Partition> property must be specified
if using the Expression Language Partitioning class")
+ .build());
+ }
+ }
+
return results;
}
@@ -412,11 +440,12 @@ public class PublishKafka_2_0 extends AbstractProcessor {
demarcatorBytes = null;
}
+ final Integer partition = getPartition(context, flowFile);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws
IOException {
try (final InputStream in = new
BufferedInputStream(rawIn)) {
- lease.publish(flowFile, in, messageKey,
demarcatorBytes, topic);
+ lease.publish(flowFile, in, messageKey,
demarcatorBytes, topic, partition);
}
}
});
@@ -468,4 +497,16 @@ public class PublishKafka_2_0 extends AbstractProcessor {
return DatatypeConverter.parseHexBinary(uninterpretedKey);
}
+
+ private Integer getPartition(final ProcessContext context, final FlowFile
flowFile) {
+ final String partitionClass =
context.getProperty(PARTITION_CLASS).getValue();
+ if
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+ final String partition =
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+ final int hash = Objects.hashCode(partition);
+ return hash;
+ }
+
+ return null;
+ }
+
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index a2ddd81..f8587d9 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -17,19 +17,6 @@
package org.apache.nifi.processors.kafka.pubsub;
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -48,6 +35,20 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
public class PublisherLease implements Closeable {
private final ComponentLog logger;
private final Producer<byte[], byte[]> producer;
@@ -111,7 +112,7 @@ public class PublisherLease implements Closeable {
rollback();
}
- void publish(final FlowFile flowFile, final InputStream flowFileContent,
final byte[] messageKey, final byte[] demarcatorBytes, final String topic)
throws IOException {
+ void publish(final FlowFile flowFile, final InputStream flowFileContent,
final byte[] messageKey, final byte[] demarcatorBytes, final String topic,
final Integer partition) throws IOException {
if (tracker == null) {
tracker = new InFlightMessageTracker(logger);
}
@@ -126,13 +127,13 @@ public class PublisherLease implements Closeable {
// Send FlowFile content as it is, to support sending 0 byte
message.
messageContent = new byte[(int) flowFile.getSize()];
StreamUtils.fillBuffer(flowFileContent, messageContent);
- publish(flowFile, messageKey, messageContent, topic, tracker);
+ publish(flowFile, messageKey, messageContent, topic, tracker,
partition);
return;
}
try (final StreamDemarcator demarcator = new
StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
while ((messageContent = demarcator.nextToken()) != null) {
- publish(flowFile, messageKey, messageContent, topic,
tracker);
+ publish(flowFile, messageKey, messageContent, topic,
tracker, partition);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything
else.
@@ -150,7 +151,7 @@ public class PublisherLease implements Closeable {
}
void publish(final FlowFile flowFile, final RecordSet recordSet, final
RecordSetWriterFactory writerFactory, final RecordSchema schema,
- final String messageKeyField, final String topic) throws IOException {
+ final String messageKeyField, final String topic, final
Function<Record, Integer> partitioner) throws IOException {
if (tracker == null) {
tracker = new InFlightMessageTracker(logger);
}
@@ -176,7 +177,8 @@ public class PublisherLease implements Closeable {
final String key = messageKeyField == null ? null :
record.getAsString(messageKeyField);
final byte[] messageKey = (key == null) ? null :
key.getBytes(StandardCharsets.UTF_8);
- publish(flowFile, additionalAttributes, messageKey,
messageContent, topic, tracker);
+ final Integer partition = partitioner == null ? null :
partitioner.apply(record);
+ publish(flowFile, additionalAttributes, messageKey,
messageContent, topic, tracker, partition);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.
@@ -217,14 +219,15 @@ public class PublisherLease implements Closeable {
}
}
- protected void publish(final FlowFile flowFile, final byte[] messageKey,
final byte[] messageContent, final String topic, final InFlightMessageTracker
tracker) {
- publish(flowFile, Collections.emptyMap(), messageKey, messageContent,
topic, tracker);
+ protected void publish(final FlowFile flowFile, final byte[] messageKey,
final byte[] messageContent, final String topic, final InFlightMessageTracker
tracker, final Integer partition) {
+ publish(flowFile, Collections.emptyMap(), messageKey, messageContent,
topic, tracker, partition);
}
- protected void publish(final FlowFile flowFile, final Map<String, String>
additionalAttributes,
- final byte[] messageKey, final byte[] messageContent, final String
topic, final InFlightMessageTracker tracker) {
+ protected void publish(final FlowFile flowFile, final Map<String, String>
additionalAttributes, final byte[] messageKey, final byte[] messageContent,
+ final String topic, final InFlightMessageTracker
tracker, final Integer partition) {
- final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(topic, null, messageKey, messageContent);
+ final Integer moddedPartition = partition == null ? null :
Math.abs(partition) % (producer.partitionsFor(topic).size());
+ final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(topic, moddedPartition, messageKey, messageContent);
addHeaders(flowFile, additionalAttributes, record);
producer.send(record, new Callback() {
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
index ec07da9..0de5111 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
@@ -17,6 +17,10 @@
package org.apache.nifi.processors.kafka.pubsub;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.nifi.logging.ComponentLog;
+
import java.io.Closeable;
import java.nio.charset.Charset;
import java.util.HashMap;
@@ -26,10 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import java.util.regex.Pattern;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.nifi.logging.ComponentLog;
-
public class PublisherPool implements Closeable {
private final ComponentLog logger;
private final BlockingQueue<PublisherLease> publisherQueue;
@@ -77,7 +77,6 @@ public class PublisherPool implements Closeable {
}
final Producer<byte[], byte[]> producer = new
KafkaProducer<>(properties);
-
final PublisherLease lease = new PublisherLease(producer,
maxMessageSize, maxAckWaitMillis, logger, useTransactions, attributeNameRegex,
headerCharacterSet) {
@Override
public void close() {
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
index ed5cf05..e80ce0d 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
@@ -23,6 +23,7 @@ import
org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
@@ -33,6 +34,8 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.ArrayList;
@@ -42,13 +45,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -67,7 +73,7 @@ public class TestPublishKafkaRecord_2_0 {
mockPool = mock(PublisherPool.class);
mockLease = mock(PublisherLease.class);
Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- any(RecordSchema.class), any(String.class), any(String.class));
+ any(RecordSchema.class), any(String.class), any(String.class),
nullable(Function.class));
when(mockPool.obtainPublisher()).thenReturn(mockLease);
@@ -107,7 +113,7 @@ public class TestPublishKafkaRecord_2_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -126,7 +132,7 @@ public class TestPublishKafkaRecord_2_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -142,7 +148,7 @@ public class TestPublishKafkaRecord_2_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@@ -160,7 +166,7 @@ public class TestPublishKafkaRecord_2_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@@ -183,8 +189,9 @@ public class TestPublishKafkaRecord_2_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 2);
verify(mockLease, times(2)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
- verify(mockLease, times(0)).publish(any(FlowFile.class),
any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME),
any(InFlightMessageTracker.class));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
+ verify(mockLease, times(0)).publish(
+ any(FlowFile.class), any(Map.class), eq(null), any(byte[].class),
eq(TOPIC_NAME), any(InFlightMessageTracker.class), any(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -214,7 +221,7 @@ public class TestPublishKafkaRecord_2_0 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -223,6 +230,61 @@ public class TestPublishKafkaRecord_2_0 {
mff.assertAttributeEquals("msg.count", "0");
}
+ @Test
+ public void testRecordPathPartition() throws IOException {
+ runner.setProperty(PublishKafkaRecord_2_0.PARTITION_CLASS,
PublishKafkaRecord_2_0.RECORD_PATH_PARTITIONING);
+ runner.setProperty(PublishKafkaRecord_2_0.PARTITION, "/age");
+
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 48\nJim Doe,
13"));
+
+ final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+ msgCounts.put(flowFiles.get(0), 0);
+
+ final PublishResult result = createPublishResult(msgCounts, new
HashSet<>(flowFiles), Collections.emptyMap());
+
+
+ mockLease = mock(PublisherLease.class);
+
+ when(mockLease.complete()).thenReturn(result);
+ when(mockPool.obtainPublisher()).thenReturn(mockLease);
+
+ final Map<Integer, List<Integer>> partitionsByAge = new HashMap<>();
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock invocationOnMock)
throws Throwable {
+ final Function<Record, Integer> partitioner =
invocationOnMock.getArgument(6, Function.class);
+ final RecordSet recordSet = invocationOnMock.getArgument(1,
RecordSet.class);
+
+ Record record;
+ while ((record = recordSet.next()) != null) {
+ final int partition = partitioner.apply(record);
+ final Integer age = record.getAsInt("age");
+
+ partitionsByAge.computeIfAbsent(age, k -> new
ArrayList<>()).add(partition);
+ }
+
+ return null;
+ }
+ }).when(mockLease).publish(any(FlowFile.class), any(RecordSet.class),
any(RecordSetWriterFactory.class),
+ nullable(RecordSchema.class), nullable(String.class),
any(String.class), nullable(Function.class));
+
+ runner.run();
+
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 1);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
+ nullable(RecordSchema.class), nullable(String.class),
any(String.class), nullable(Function.class));
+
+ assertEquals(2, partitionsByAge.size()); // 2 ages
+
+ final List<Integer> partitionsForAge13 = partitionsByAge.get(13);
+ assertEquals(1, partitionsForAge13.size());
+
+ final List<Integer> partitionsForAge48 = partitionsByAge.get(48);
+ assertEquals(2, partitionsForAge48.size());
+ assertEquals(partitionsForAge48.get(0), partitionsForAge48.get(1));
+ }
+
@Test
public void testSomeSuccessSomeFailure() throws IOException {
@@ -249,7 +311,7 @@ public class TestPublishKafkaRecord_2_0 {
runner.assertTransferCount(PublishKafkaRecord_2_0.REL_FAILURE, 4);
verify(mockLease, times(4)).publish(any(FlowFile.class),
any(RecordSet.class), any(RecordSetWriterFactory.class),
- AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME));
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()),
eq(null), eq(TOPIC_NAME), nullable(Function.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
index cd5fb96..14fb5d4 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
@@ -40,6 +40,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -79,7 +80,7 @@ public class TestPublishKafka_2_0 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_SUCCESS, 1);
- verify(mockLease, times(1)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME),
nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -98,7 +99,7 @@ public class TestPublishKafka_2_0 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_SUCCESS, 3);
- verify(mockLease, times(3)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(3)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME),
nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -113,7 +114,7 @@ public class TestPublishKafka_2_0 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 1);
- verify(mockLease, times(1)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME),
nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@@ -130,7 +131,7 @@ public class TestPublishKafka_2_0 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 3);
- verify(mockLease, times(3)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(3)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME),
nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@@ -152,7 +153,7 @@ public class TestPublishKafka_2_0 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_SUCCESS, 2);
- verify(mockLease, times(2)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(2)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME),
nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -191,7 +192,7 @@ public class TestPublishKafka_2_0 {
runner.assertTransferCount(PublishKafka_2_0.REL_SUCCESS, 0);
runner.assertTransferCount(PublishKafka_2_0.REL_FAILURE, 4);
- verify(mockLease, times(4)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(4)).publish(any(FlowFile.class),
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME),
nullable(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 141402e..60d157c 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -94,7 +94,7 @@ public class TestPublisherLease {
};
try {
- lease.publish(flowFile, failureInputStream, messageKey,
demarcatorBytes, topic);
+ lease.publish(flowFile, failureInputStream, messageKey,
demarcatorBytes, topic, null);
Assert.fail("Expected IOException");
} catch (final IOException ioe) {
// expected
@@ -133,7 +133,7 @@ public class TestPublisherLease {
}
}).when(producer).send(any(ProducerRecord.class), any(Callback.class));
- lease.publish(flowFile, new ByteArrayInputStream(new byte[1]),
messageKey, demarcatorBytes, topic);
+ lease.publish(flowFile, new ByteArrayInputStream(new byte[1]),
messageKey, demarcatorBytes, topic, null);
assertEquals(1, poisonCount.get());
@@ -178,16 +178,16 @@ public class TestPublisherLease {
final byte[] demarcatorBytes = "\n".getBytes(StandardCharsets.UTF_8);
final byte[] flowFileContent =
"1234567890\n1234567890\n1234567890\n\n\n\n1234567890\n\n\n1234567890\n\n\n\n".getBytes(StandardCharsets.UTF_8);
- lease.publish(flowFile, new ByteArrayInputStream(flowFileContent),
messageKey, demarcatorBytes, topic);
+ lease.publish(flowFile, new ByteArrayInputStream(flowFileContent),
messageKey, demarcatorBytes, topic, null);
final byte[] flowFileContent2 = new byte[0];
- lease.publish(new MockFlowFile(2L), new
ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic);
+ lease.publish(new MockFlowFile(2L), new
ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic,
null);
final byte[] flowFileContent3 =
"1234567890\n1234567890".getBytes(StandardCharsets.UTF_8); // no trailing new
line
- lease.publish(new MockFlowFile(3L), new
ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic);
+ lease.publish(new MockFlowFile(3L), new
ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic,
null);
final byte[] flowFileContent4 =
"\n\n\n".getBytes(StandardCharsets.UTF_8);
- lease.publish(new MockFlowFile(4L), new
ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic);
+ lease.publish(new MockFlowFile(4L), new
ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic,
null);
assertEquals(0, poisonCount.get());
@@ -239,7 +239,7 @@ public class TestPublisherLease {
final byte[] demarcatorBytes = null;
final byte[] flowFileContent = new byte[0];
- lease.publish(flowFile, new ByteArrayInputStream(flowFileContent),
messageKey, demarcatorBytes, topic);
+ lease.publish(flowFile, new ByteArrayInputStream(flowFileContent),
messageKey, demarcatorBytes, topic, null);
assertEquals(0, poisonCount.get());
@@ -278,7 +278,7 @@ public class TestPublisherLease {
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any(),
eq(flowFile))).thenReturn(writer);
- lease.publish(flowFile, recordSet, writerFactory, schema, keyField,
topic);
+ lease.publish(flowFile, recordSet, writerFactory, schema, keyField,
topic, null);
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema),
any(), eq(flowFile));
verify(writer, times(2)).write(any(Record.class));