This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3543b9c  NIFI-6797: Add support for specifying Partition via EL or 
RecordPath for PublishKafka(Record)_1_0 and PublishKafka(Record)_2_0
3543b9c is described below

commit 3543b9c42cb00b8365059a990aa4f77f23c4892b
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Tue Oct 22 14:30:52 2019 -0400

    NIFI-6797: Add support for specifying Partition via EL or RecordPath for 
PublishKafka(Record)_1_0 and PublishKafka(Record)_2_0
    
    This closes #3834.
    
    Signed-off-by: Bryan Bende <bbe...@apache.org>
---
 .../nifi-kafka-1-0-processors/pom.xml              |   8 +-
 .../nifi/processors/kafka/pubsub/Partitioners.java |  40 +++++-
 .../kafka/pubsub/PublishKafkaRecord_1_0.java       |  88 +++++++++++-
 .../processors/kafka/pubsub/PublishKafka_1_0.java  |  82 +++++++++---
 .../processors/kafka/pubsub/PublisherLease.java    |  49 +++----
 .../processors/kafka/pubsub/PublisherPool.java     |  11 +-
 .../processors/kafka/pubsub/TestPublishKafka.java  |  43 +++---
 .../kafka/pubsub/TestPublishKafkaRecord_1_0.java   |  61 +++++----
 .../kafka/pubsub/TestPublisherLease.java           |  16 +--
 .../nifi-kafka-2-0-processors/pom.xml              |   6 +
 .../nifi/processors/kafka/pubsub/Partitioners.java |  40 +++++-
 .../kafka/pubsub/PublishKafkaRecord_2_0.java       | 149 ++++++++++++++++-----
 .../processors/kafka/pubsub/PublishKafka_2_0.java  |  45 ++++++-
 .../processors/kafka/pubsub/PublisherLease.java    |  49 +++----
 .../processors/kafka/pubsub/PublisherPool.java     |   9 +-
 .../kafka/pubsub/TestPublishKafkaRecord_2_0.java   |  82 ++++++++++--
 .../kafka/pubsub/TestPublishKafka_2_0.java         |  13 +-
 .../kafka/pubsub/TestPublisherLease.java           |  16 +--
 18 files changed, 605 insertions(+), 202 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
index 9b9b047..597cc80 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
@@ -52,7 +52,7 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-kerberos-credentials-service-api</artifactId>
         </dependency>
-        
+
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
@@ -92,6 +92,12 @@
             <version>2.6</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
index 64ab4ce..a7b20f2 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
@@ -16,11 +16,11 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.util.Map;
-
 import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.common.Cluster;
 
+import java.util.Map;
+
 /**
  * Collection of implementation of common Kafka {@link Partitioner}s.
  */
@@ -59,4 +59,40 @@ final public class Partitioners {
             return index++;
         }
     }
+
+    public static class RecordPathPartitioner implements Partitioner {
+        @Override
+        public int partition(final String topic, final Object key, final 
byte[] keyBytes, final Object value, final byte[] valueBytes, final Cluster 
cluster) {
+            // When this partitioner is used, it is always overridden by 
creating the ProducerRecord with the Partition directly specified. However, we 
must have a unique value
+            // to set in the Producer's config, so this class exists
+            return 0;
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs) {
+        }
+    }
+
+
+    public static class ExpressionLanguagePartitioner implements Partitioner {
+        @Override
+        public int partition(final String topic, final Object key, final 
byte[] keyBytes, final Object value, final byte[] valueBytes, final Cluster 
cluster) {
+            // When this partitioner is used, it is always overridden by 
creating the ProducerRecord with the Partition directly specified. However, we 
must have a unique value
+            // to set in the Producer's config, so this class exists
+            return 0;
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs) {
+        }
+    }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
index 2a7e7e7..6fc1ff1 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
@@ -41,11 +41,16 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.FlowFileFilters;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 
@@ -60,11 +65,16 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+
 @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
 @CapabilityDescription("Sends the contents of a FlowFile as individual records 
to Apache Kafka using the Kafka 1.0 Producer API. "
     + "The contents of the FlowFile are expected to be record-oriented data 
that can be read by the configured Record Reader. "
@@ -98,6 +108,12 @@ public class PublishKafkaRecord_1_0 extends 
AbstractProcessor {
             + "the next Partition to Partition 2, and so on, wrapping as 
necessary.");
     static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
         "DefaultPartitioner", "Messages will be assigned to random 
partitions.");
+    static final AllowableValue RECORD_PATH_PARTITIONING = new 
AllowableValue(Partitioners.RecordPathPartitioner.class.getName(),
+        "RecordPath Partitioner", "Interprets the <Partition> property as a 
RecordPath that will be evaluated against each Record to determine which 
partition the Record will go to. All Records " +
+        "that have the same value for the given RecordPath will go to the same 
Partition.");
+    static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new 
AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(), 
"Expression Language Partitioner",
+        "Interprets the <Partition> property as Expression Language that will 
be evaluated against each FlowFile. This Expression will be evaluated once 
against the FlowFile, " +
+            "so all Records in a given FlowFile will go to the same 
partition.");
 
     static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", 
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
     static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex 
Encoded",
@@ -184,11 +200,20 @@ public class PublishKafkaRecord_1_0 extends 
AbstractProcessor {
         .name("partitioner.class")
         .displayName("Partitioner class")
         .description("Specifies which class to use to compute a partition id 
for a message. Corresponds to Kafka's 'partitioner.class' property.")
-        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, 
RECORD_PATH_PARTITIONING, EXPRESSION_LANGUAGE_PARTITIONING)
         .defaultValue(RANDOM_PARTITIONING.getValue())
         .required(false)
         .build();
 
+    static final PropertyDescriptor PARTITION = new 
PropertyDescriptor.Builder()
+        .name("partition")
+        .displayName("Partition")
+        .description("Specifies which Partition Records will go to. How this 
value is interpreted is dictated by the <Partitioner class> property.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+
     static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
         .name("compression.type")
         .displayName("Compression Type")
@@ -253,6 +278,7 @@ public class PublishKafkaRecord_1_0 extends 
AbstractProcessor {
     private static final Set<Relationship> RELATIONSHIPS;
 
     private volatile PublisherPool publisherPool = null;
+    private final RecordPathCache recordPathCache = new RecordPathCache(25);
 
     static {
         final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -276,6 +302,7 @@ public class PublishKafkaRecord_1_0 extends 
AbstractProcessor {
         properties.add(ACK_WAIT_TIME);
         properties.add(METADATA_WAIT_TIME);
         properties.add(PARTITION_CLASS);
+        properties.add(PARTITION);
         properties.add(COMPRESSION_CODEC);
 
         PROPERTIES = Collections.unmodifiableList(properties);
@@ -325,6 +352,32 @@ public class PublishKafkaRecord_1_0 extends 
AbstractProcessor {
             }
         }
 
+        final String partitionClass = 
validationContext.getProperty(PARTITION_CLASS).getValue();
+        if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
+            final String rawRecordPath = 
validationContext.getProperty(PARTITION).getValue();
+            if (rawRecordPath == null) {
+                results.add(new ValidationResult.Builder()
+                    .subject("Partition")
+                    .valid(false)
+                    .explanation("The <Partition> property must be specified 
if using the RecordPath Partitioning class")
+                    .build());
+            } else if 
(!validationContext.isExpressionLanguagePresent(rawRecordPath)) {
+                final ValidationResult result = new 
RecordPathValidator().validate(PARTITION.getDisplayName(), rawRecordPath, 
validationContext);
+                if (result != null) {
+                    results.add(result);
+                }
+            }
+        } else if 
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+            final String rawRecordPath = 
validationContext.getProperty(PARTITION).getValue();
+            if (rawRecordPath == null) {
+                results.add(new ValidationResult.Builder()
+                    .subject("Partition")
+                    .valid(false)
+                    .explanation("The <Partition> property must be specified 
if using the Expression Language Partitioning class")
+                    .build());
+            }
+        }
+
         return results;
     }
 
@@ -414,6 +467,8 @@ public class PublishKafkaRecord_1_0 extends 
AbstractProcessor {
                 final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
                 final String messageKeyField = 
context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
 
+                final Function<Record, Integer> partitioner = 
getPartitioner(context, flowFile);
+
                 try {
                     session.read(flowFile, new InputStreamCallback() {
                         @Override
@@ -423,7 +478,7 @@ public class PublishKafkaRecord_1_0 extends 
AbstractProcessor {
                                 final RecordSet recordSet = 
reader.createRecordSet();
 
                                 final RecordSchema schema = 
writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
-                                lease.publish(flowFile, recordSet, 
writerFactory, schema, messageKeyField, topic);
+                                lease.publish(flowFile, recordSet, 
writerFactory, schema, messageKeyField, topic, partitioner);
                             } catch (final SchemaNotFoundException | 
MalformedRecordException e) {
                                 throw new ProcessException(e);
                             }
@@ -460,4 +515,33 @@ public class PublishKafkaRecord_1_0 extends 
AbstractProcessor {
             }
         }
     }
+
+    private Function<Record, Integer> getPartitioner(final ProcessContext 
context, final FlowFile flowFile) {
+        final String partitionClass = 
context.getProperty(PARTITION_CLASS).getValue();
+        if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
+            final String recordPath = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+            final RecordPath compiled = 
recordPathCache.getCompiled(recordPath);
+
+            return record -> evaluateRecordPath(compiled, record);
+        } else if 
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+            final String partition = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+            final int hash = Objects.hashCode(partition);
+            return (record) -> hash;
+        }
+
+        return null;
+    }
+
+    private Integer evaluateRecordPath(final RecordPath recordPath, final 
Record record) {
+        final RecordPathResult result = recordPath.evaluate(record);
+        final LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);
+
+        result.getSelectedFields().forEach(fieldValue -> {
+            final Object value = fieldValue.getValue();
+            final long hash = Objects.hashCode(value);
+            accumulator.accumulate(hash);
+        });
+
+        return accumulator.intValue();
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
index 1c3efc0..c2fad36 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
@@ -17,25 +17,6 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import java.util.regex.Pattern;
-
-import javax.xml.bind.DatatypeConverter;
-
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -60,6 +41,27 @@ import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.FlowFileFilters;
 import org.apache.nifi.processor.util.StandardValidators;
 
+import javax.xml.bind.DatatypeConverter;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+
 @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "1.0"})
 @CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka using the Kafka 1.0 Producer API."
     + "The messages to send may be individual FlowFiles or may be delimited, 
using a "
@@ -94,6 +96,9 @@ public class PublishKafka_1_0 extends AbstractProcessor {
             + "the next Partition to Partition 2, and so on, wrapping as 
necessary.");
     static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
         "DefaultPartitioner", "Messages will be assigned to random 
partitions.");
+    static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new 
AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(), 
"Expression Language Partitioner",
+        "Interprets the <Partition> property as Expression Language that will 
be evaluated against each FlowFile. This Expression will be evaluated once 
against the FlowFile, " +
+            "so all Records in a given FlowFile will go to the same 
partition.");
 
     static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", 
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
     static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex 
Encoded",
@@ -187,11 +192,20 @@ public class PublishKafka_1_0 extends AbstractProcessor {
         .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
         .displayName("Partitioner class")
         .description("Specifies which class to use to compute a partition id 
for a message. Corresponds to Kafka's 'partitioner.class' property.")
-        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, 
EXPRESSION_LANGUAGE_PARTITIONING)
         .defaultValue(RANDOM_PARTITIONING.getValue())
         .required(false)
         .build();
 
+    static final PropertyDescriptor PARTITION = new 
PropertyDescriptor.Builder()
+        .name("partition")
+        .displayName("Partition")
+        .description("Specifies which Partition Records will go to.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+
     static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
         .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
         .displayName("Compression Type")
@@ -273,6 +287,7 @@ public class PublishKafka_1_0 extends AbstractProcessor {
         properties.add(ACK_WAIT_TIME);
         properties.add(METADATA_WAIT_TIME);
         properties.add(PARTITION_CLASS);
+        properties.add(PARTITION);
         properties.add(COMPRESSION_CODEC);
 
         PROPERTIES = Collections.unmodifiableList(properties);
@@ -322,6 +337,18 @@ public class PublishKafka_1_0 extends AbstractProcessor {
             }
         }
 
+        final String partitionClass = 
validationContext.getProperty(PARTITION_CLASS).getValue();
+        if 
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+            final String rawRecordPath = 
validationContext.getProperty(PARTITION).getValue();
+            if (rawRecordPath == null) {
+                results.add(new ValidationResult.Builder()
+                    .subject("Partition")
+                    .valid(false)
+                    .explanation("The <Partition> property must be specified 
if using the Expression Language Partitioning class")
+                    .build());
+            }
+        }
+
         return results;
     }
 
@@ -413,11 +440,12 @@ public class PublishKafka_1_0 extends AbstractProcessor {
                     demarcatorBytes = null;
                 }
 
+                final Integer partition = getPartition(context, flowFile);
                 session.read(flowFile, new InputStreamCallback() {
                     @Override
                     public void process(final InputStream rawIn) throws 
IOException {
                         try (final InputStream in = new 
BufferedInputStream(rawIn)) {
-                            lease.publish(flowFile, in, messageKey, 
demarcatorBytes, topic);
+                            lease.publish(flowFile, in, messageKey, 
demarcatorBytes, topic, partition);
                         }
                     }
                 });
@@ -469,4 +497,16 @@ public class PublishKafka_1_0 extends AbstractProcessor {
 
         return DatatypeConverter.parseHexBinary(uninterpretedKey);
     }
+
+    private Integer getPartition(final ProcessContext context, final FlowFile 
flowFile) {
+        final String partitionClass = 
context.getProperty(PARTITION_CLASS).getValue();
+        if 
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+            final String partition = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+            final int hash = Objects.hashCode(partition);
+            return hash;
+        }
+
+        return null;
+    }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index a2ddd81..f8587d9 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -17,19 +17,6 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -48,6 +35,20 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.exception.TokenTooLargeException;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
 
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
 public class PublisherLease implements Closeable {
     private final ComponentLog logger;
     private final Producer<byte[], byte[]> producer;
@@ -111,7 +112,7 @@ public class PublisherLease implements Closeable {
         rollback();
     }
 
-    void publish(final FlowFile flowFile, final InputStream flowFileContent, 
final byte[] messageKey, final byte[] demarcatorBytes, final String topic) 
throws IOException {
+    void publish(final FlowFile flowFile, final InputStream flowFileContent, 
final byte[] messageKey, final byte[] demarcatorBytes, final String topic, 
final Integer partition) throws IOException {
         if (tracker == null) {
             tracker = new InFlightMessageTracker(logger);
         }
@@ -126,13 +127,13 @@ public class PublisherLease implements Closeable {
                 // Send FlowFile content as it is, to support sending 0 byte 
message.
                 messageContent = new byte[(int) flowFile.getSize()];
                 StreamUtils.fillBuffer(flowFileContent, messageContent);
-                publish(flowFile, messageKey, messageContent, topic, tracker);
+                publish(flowFile, messageKey, messageContent, topic, tracker, 
partition);
                 return;
             }
 
             try (final StreamDemarcator demarcator = new 
StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
                 while ((messageContent = demarcator.nextToken()) != null) {
-                    publish(flowFile, messageKey, messageContent, topic, 
tracker);
+                    publish(flowFile, messageKey, messageContent, topic, 
tracker, partition);
 
                     if (tracker.isFailed(flowFile)) {
                         // If we have a failure, don't try to send anything 
else.
@@ -150,7 +151,7 @@ public class PublisherLease implements Closeable {
     }
 
     void publish(final FlowFile flowFile, final RecordSet recordSet, final 
RecordSetWriterFactory writerFactory, final RecordSchema schema,
-        final String messageKeyField, final String topic) throws IOException {
+                 final String messageKeyField, final String topic, final 
Function<Record, Integer> partitioner) throws IOException {
         if (tracker == null) {
             tracker = new InFlightMessageTracker(logger);
         }
@@ -176,7 +177,8 @@ public class PublisherLease implements Closeable {
                 final String key = messageKeyField == null ? null : 
record.getAsString(messageKeyField);
                 final byte[] messageKey = (key == null) ? null : 
key.getBytes(StandardCharsets.UTF_8);
 
-                publish(flowFile, additionalAttributes, messageKey, 
messageContent, topic, tracker);
+                final Integer partition = partitioner == null ? null : 
partitioner.apply(record);
+                publish(flowFile, additionalAttributes, messageKey, 
messageContent, topic, tracker, partition);
 
                 if (tracker.isFailed(flowFile)) {
                     // If we have a failure, don't try to send anything else.
@@ -217,14 +219,15 @@ public class PublisherLease implements Closeable {
         }
     }
 
-    protected void publish(final FlowFile flowFile, final byte[] messageKey, 
final byte[] messageContent, final String topic, final InFlightMessageTracker 
tracker) {
-        publish(flowFile, Collections.emptyMap(), messageKey, messageContent, 
topic, tracker);
+    protected void publish(final FlowFile flowFile, final byte[] messageKey, 
final byte[] messageContent, final String topic, final InFlightMessageTracker 
tracker, final Integer partition) {
+        publish(flowFile, Collections.emptyMap(), messageKey, messageContent, 
topic, tracker, partition);
     }
 
-    protected void publish(final FlowFile flowFile, final Map<String, String> 
additionalAttributes,
-        final byte[] messageKey, final byte[] messageContent, final String 
topic, final InFlightMessageTracker tracker) {
+    protected void publish(final FlowFile flowFile, final Map<String, String> 
additionalAttributes, final byte[] messageKey, final byte[] messageContent,
+                           final String topic, final InFlightMessageTracker 
tracker, final Integer partition) {
 
-        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(topic, null, messageKey, messageContent);
+        final Integer moddedPartition = partition == null ? null : 
Math.abs(partition) % (producer.partitionsFor(topic).size());
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(topic, moddedPartition, messageKey, messageContent);
         addHeaders(flowFile, additionalAttributes, record);
 
         producer.send(record, new Callback() {
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
index 54811cc..0de5111 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
@@ -17,6 +17,10 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.nifi.logging.ComponentLog;
+
 import java.io.Closeable;
 import java.nio.charset.Charset;
 import java.util.HashMap;
@@ -26,10 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.nifi.logging.ComponentLog;
-
 public class PublisherPool implements Closeable {
     private final ComponentLog logger;
     private final BlockingQueue<PublisherLease> publisherQueue;
@@ -44,7 +44,7 @@ public class PublisherPool implements Closeable {
     private volatile boolean closed = false;
 
     PublisherPool(final Map<String, Object> kafkaProperties, final 
ComponentLog logger, final int maxMessageSize, final long maxAckWaitMillis,
-                  final boolean useTransactions, final Supplier<String> 
transactionalIdSupplier, final Pattern attributeNameRegex, final Charset 
headerCharacterSet) {
+        final boolean useTransactions, final Supplier<String> 
transactionalIdSupplier, final Pattern attributeNameRegex, final Charset 
headerCharacterSet) {
         this.logger = logger;
         this.publisherQueue = new LinkedBlockingQueue<>();
         this.kafkaProperties = kafkaProperties;
@@ -77,7 +77,6 @@ public class PublisherPool implements Closeable {
         }
 
         final Producer<byte[], byte[]> producer = new 
KafkaProducer<>(properties);
-
         final PublisherLease lease = new PublisherLease(producer, 
maxMessageSize, maxAckWaitMillis, logger, useTransactions, attributeNameRegex, 
headerCharacterSet) {
             @Override
             public void close() {
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
index 44a709f..82e0b18 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
@@ -17,14 +17,13 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -37,13 +36,15 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestPublishKafka {
     private static final String TOPIC_NAME = "unit-test";
@@ -79,7 +80,7 @@ public class TestPublishKafka {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_SUCCESS, 1);
 
-        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), 
nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -98,7 +99,7 @@ public class TestPublishKafka {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_SUCCESS, 3);
 
-        verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), 
nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -113,7 +114,7 @@ public class TestPublishKafka {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_FAILURE, 1);
 
-        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), 
nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
     }
@@ -130,7 +131,7 @@ public class TestPublishKafka {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_FAILURE, 3);
 
-        verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), 
nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
     }
@@ -152,7 +153,7 @@ public class TestPublishKafka {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_SUCCESS, 2);
 
-        verify(mockLease, times(2)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(2)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), 
nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -191,7 +192,7 @@ public class TestPublishKafka {
         runner.assertTransferCount(PublishKafka_1_0.REL_SUCCESS, 0);
         runner.assertTransferCount(PublishKafka_1_0.REL_FAILURE, 4);
 
-        verify(mockLease, times(4)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(4)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), 
nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
 
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
index 2441bff..2d283b9 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
@@ -17,26 +17,6 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
@@ -54,6 +34,28 @@ import org.junit.Test;
 import org.mockito.AdditionalMatchers;
 import org.mockito.Mockito;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class TestPublishKafkaRecord_1_0 {
 
     private static final String TOPIC_NAME = "unit-test";
@@ -67,7 +69,7 @@ public class TestPublishKafkaRecord_1_0 {
         mockPool = mock(PublisherPool.class);
         mockLease = mock(PublisherLease.class);
         
Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-            any(RecordSchema.class), any(String.class), any(String.class));
+            any(RecordSchema.class), any(String.class), any(String.class), 
nullable(Function.class));
 
         when(mockPool.obtainPublisher()).thenReturn(mockLease);
 
@@ -107,7 +109,7 @@ public class TestPublishKafkaRecord_1_0 {
         
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 1);
 
         verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -126,7 +128,7 @@ public class TestPublishKafkaRecord_1_0 {
         
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 3);
 
         verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -142,7 +144,7 @@ public class TestPublishKafkaRecord_1_0 {
         
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_FAILURE, 1);
 
         verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
     }
@@ -160,7 +162,7 @@ public class TestPublishKafkaRecord_1_0 {
         
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_FAILURE, 3);
 
         verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
     }
@@ -183,8 +185,9 @@ public class TestPublishKafkaRecord_1_0 {
         
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 2);
 
         verify(mockLease, times(2)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
-        verify(mockLease, times(0)).publish(any(FlowFile.class), 
any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), 
any(InFlightMessageTracker.class));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
+        verify(mockLease, times(0)).publish(
+            any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), 
eq(TOPIC_NAME), any(InFlightMessageTracker.class), nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -214,7 +217,7 @@ public class TestPublishKafkaRecord_1_0 {
         
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 1);
 
         verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -249,7 +252,7 @@ public class TestPublishKafkaRecord_1_0 {
         runner.assertTransferCount(PublishKafkaRecord_1_0.REL_FAILURE, 4);
 
         verify(mockLease, times(4)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
 
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 3f726a4..156af52 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -95,7 +95,7 @@ public class TestPublisherLease {
         };
 
         try {
-            lease.publish(flowFile, failureInputStream, messageKey, 
demarcatorBytes, topic);
+            lease.publish(flowFile, failureInputStream, messageKey, 
demarcatorBytes, topic, null);
             Assert.fail("Expected IOException");
         } catch (final IOException ioe) {
             // expected
@@ -134,7 +134,7 @@ public class TestPublisherLease {
             }
         }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
 
-        lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), 
messageKey, demarcatorBytes, topic);
+        lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), 
messageKey, demarcatorBytes, topic, null);
 
         assertEquals(1, poisonCount.get());
 
@@ -179,16 +179,16 @@ public class TestPublisherLease {
         final byte[] demarcatorBytes = "\n".getBytes(StandardCharsets.UTF_8);
 
         final byte[] flowFileContent = 
"1234567890\n1234567890\n1234567890\n\n\n\n1234567890\n\n\n1234567890\n\n\n\n".getBytes(StandardCharsets.UTF_8);
-        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), 
messageKey, demarcatorBytes, topic);
+        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), 
messageKey, demarcatorBytes, topic, null);
 
         final byte[] flowFileContent2 = new byte[0];
-        lease.publish(new MockFlowFile(2L), new 
ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic);
+        lease.publish(new MockFlowFile(2L), new 
ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic, 
null);
 
         final byte[] flowFileContent3 = 
"1234567890\n1234567890".getBytes(StandardCharsets.UTF_8); // no trailing new 
line
-        lease.publish(new MockFlowFile(3L), new 
ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic);
+        lease.publish(new MockFlowFile(3L), new 
ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic, 
null);
 
         final byte[] flowFileContent4 = 
"\n\n\n".getBytes(StandardCharsets.UTF_8);
-        lease.publish(new MockFlowFile(4L), new 
ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic);
+        lease.publish(new MockFlowFile(4L), new 
ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic, 
null);
 
         assertEquals(0, poisonCount.get());
 
@@ -240,7 +240,7 @@ public class TestPublisherLease {
         final byte[] demarcatorBytes = null;
 
         final byte[] flowFileContent = new byte[0];
-        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), 
messageKey, demarcatorBytes, topic);
+        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), 
messageKey, demarcatorBytes, topic, null);
 
         assertEquals(0, poisonCount.get());
 
@@ -279,7 +279,7 @@ public class TestPublisherLease {
 
         Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any(), 
eq(flowFile))).thenReturn(writer);
 
-        lease.publish(flowFile, recordSet, writerFactory, schema, keyField, 
topic);
+        lease.publish(flowFile, recordSet, writerFactory, schema, keyField, 
topic, null);
 
         verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), 
any(), eq(flowFile));
         verify(writer, times(2)).write(any(Record.class));
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
index 8c934bc..7cb08e3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
@@ -97,6 +97,12 @@
             <artifactId>slf4j-simple</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-path</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
index 64ab4ce..a7b20f2 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
@@ -16,11 +16,11 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.util.Map;
-
 import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.common.Cluster;
 
+import java.util.Map;
+
 /**
  * Collection of implementation of common Kafka {@link Partitioner}s.
  */
@@ -59,4 +59,40 @@ final public class Partitioners {
             return index++;
         }
     }
+
+    public static class RecordPathPartitioner implements Partitioner {
+        @Override
+        public int partition(final String topic, final Object key, final 
byte[] keyBytes, final Object value, final byte[] valueBytes, final Cluster 
cluster) {
+            // When this partitioner is used, it is always overridden by 
creating the ProducerRecord with the Partition directly specified. However, we 
must have a unique value
+            // to set in the Producer's config, so this class exists
+            return 0;
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs) {
+        }
+    }
+
+
+    public static class ExpressionLanguagePartitioner implements Partitioner {
+        @Override
+        public int partition(final String topic, final Object key, final 
byte[] keyBytes, final Object value, final byte[] valueBytes, final Cluster 
cluster) {
+            // When this partitioner is used, it is always overridden by 
creating the ProducerRecord with the Partition directly specified. However, we 
must have a unique value
+            // to set in the Producer's config, so this class exists
+            return 0;
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs) {
+        }
+    }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
index 628d4ad..fe502b3 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
@@ -28,9 +28,9 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
@@ -41,11 +41,16 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.FlowFileFilters;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 
@@ -60,11 +65,18 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+
 @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "2.0"})
 @CapabilityDescription("Sends the contents of a FlowFile as individual records 
to Apache Kafka using the Kafka 2.0 Producer API. "
     + "The contents of the FlowFile are expected to be record-oriented data 
that can be read by the configured Record Reader. "
@@ -74,7 +86,7 @@ import java.util.regex.Pattern;
     description = "These properties will be added on the Kafka configuration 
after loading any provided configuration properties."
     + " In the event a dynamic property represents a property that was already 
set, its value will be ignored and WARN message logged."
     + " For the list of available Kafka properties please refer to: 
http://kafka.apache.org/documentation.html#configuration. ",
-    expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
+    expressionLanguageScope = VARIABLE_REGISTRY)
 @WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Kafka for this FlowFile. This attribute is added 
only to "
     + "FlowFiles that are routed to success.")
 @SeeAlso({PublishKafka_2_0.class, ConsumeKafka_2_0.class, 
ConsumeKafkaRecord_2_0.class})
@@ -98,80 +110,83 @@ public class PublishKafkaRecord_2_0 extends 
AbstractProcessor {
             + "the next Partition to Partition 2, and so on, wrapping as 
necessary.");
     static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
         "DefaultPartitioner", "Messages will be assigned to random 
partitions.");
+    static final AllowableValue RECORD_PATH_PARTITIONING = new 
AllowableValue(Partitioners.RecordPathPartitioner.class.getName(),
+        "RecordPath Partitioner", "Interprets the <Partition> property as a 
RecordPath that will be evaluated against each Record to determine which 
partition the Record will go to. All Records " +
+        "that have the same value for the given RecordPath will go to the same 
Partition.");
+    static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new 
AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(), 
"Expression Language Partitioner",
+        "Interprets the <Partition> property as Expression Language that will 
be evaluated against each FlowFile. This Expression will be evaluated once 
against the FlowFile, " +
+            "so all Records in a given FlowFile will go to the same 
partition.");
 
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", 
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex 
Encoded",
-        "The key is interpreted as arbitrary binary data that is encoded using 
hexadecimal characters with uppercase letters.");
 
-    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor TOPIC = new Builder()
         .name("topic")
         .displayName("Topic Name")
         .description("The name of the Kafka Topic to publish to.")
         .required(true)
         .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
         .build();
 
-    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor RECORD_READER = new Builder()
         .name("record-reader")
         .displayName("Record Reader")
         .description("The Record Reader to use for incoming FlowFiles")
         .identifiesControllerService(RecordReaderFactory.class)
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .required(true)
         .build();
 
-    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor RECORD_WRITER = new Builder()
         .name("record-writer")
         .displayName("Record Writer")
         .description("The Record Writer to use in order to serialize the data 
before sending to Kafka")
         .identifiesControllerService(RecordSetWriterFactory.class)
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .required(true)
         .build();
 
-    static final PropertyDescriptor MESSAGE_KEY_FIELD = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor MESSAGE_KEY_FIELD = new Builder()
         .name("message-key-field")
         .displayName("Message Key Field")
         .description("The name of a field in the Input Records that should be 
used as the Key for the Kafka message.")
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
         .required(false)
         .build();
 
-    static final PropertyDescriptor DELIVERY_GUARANTEE = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor DELIVERY_GUARANTEE = new Builder()
         .name("acks")
         .displayName("Delivery Guarantee")
         .description("Specifies the requirement for guaranteeing that a 
message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
         .required(true)
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, 
DELIVERY_REPLICATED)
         .defaultValue(DELIVERY_BEST_EFFORT.getValue())
         .build();
 
-    static final PropertyDescriptor METADATA_WAIT_TIME = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor METADATA_WAIT_TIME = new Builder()
         .name("max.block.ms")
         .displayName("Max Metadata Wait Time")
         .description("The amount of time publisher will wait to obtain 
metadata or wait for the buffer to flush during the 'send' call before failing 
the "
             + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' 
property")
         .required(true)
         .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .expressionLanguageSupported(VARIABLE_REGISTRY)
         .defaultValue("5 sec")
         .build();
 
-    static final PropertyDescriptor ACK_WAIT_TIME = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor ACK_WAIT_TIME = new Builder()
         .name("ack.wait.time")
         .displayName("Acknowledgment Wait Time")
         .description("After sending a message to Kafka, this indicates the 
amount of time that we are willing to wait for a response from Kafka. "
             + "If Kafka does not acknowledge the message within this time 
period, the FlowFile will be routed to 'failure'.")
         .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .required(true)
         .defaultValue("5 secs")
         .build();
 
-    static final PropertyDescriptor MAX_REQUEST_SIZE = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor MAX_REQUEST_SIZE = new Builder()
         .name("max.request.size")
         .displayName("Max Request Size")
         .description("The maximum size of a request in bytes. Corresponds to 
Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
@@ -180,16 +195,25 @@ public class PublishKafkaRecord_2_0 extends 
AbstractProcessor {
         .defaultValue("1 MB")
         .build();
 
-    static final PropertyDescriptor PARTITION_CLASS = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor PARTITION_CLASS = new Builder()
         .name("partitioner.class")
         .displayName("Partitioner class")
         .description("Specifies which class to use to compute a partition id 
for a message. Corresponds to Kafka's 'partitioner.class' property.")
-        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, 
RECORD_PATH_PARTITIONING, EXPRESSION_LANGUAGE_PARTITIONING)
         .defaultValue(RANDOM_PARTITIONING.getValue())
         .required(false)
         .build();
 
-    static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor PARTITION = new Builder()
+        .name("partition")
+        .displayName("Partition")
+        .description("Specifies which Partition Records will go to. How this 
value is interpreted is dictated by the <Partitioner class> property.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+
+    static final PropertyDescriptor COMPRESSION_CODEC = new Builder()
         .name("compression.type")
         .displayName("Compression Type")
         .description("This parameter allows you to specify the compression 
codec for all data generated by this producer.")
@@ -199,37 +223,37 @@ public class PublishKafkaRecord_2_0 extends 
AbstractProcessor {
         .defaultValue("none")
         .build();
 
-    static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new Builder()
         .name("attribute-name-regex")
         .displayName("Attributes to Send as Headers (Regex)")
         .description("A Regular Expression that is matched against all 
FlowFile attribute names. "
             + "Any attribute whose name matches the regex will be added to the 
Kafka messages as a Header. "
             + "If not specified, no FlowFile attributes will be added as 
headers.")
         .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .required(false)
         .build();
-    static final PropertyDescriptor USE_TRANSACTIONS = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor USE_TRANSACTIONS = new Builder()
         .name("use-transactions")
         .displayName("Use Transactions")
         .description("Specifies whether or not NiFi should provide 
Transactional guarantees when communicating with Kafka. If there is a problem 
sending data to Kafka, "
             + "and this property is set to false, then the messages that have 
already been sent to Kafka will continue on and be delivered to consumers. "
             + "If this is set to true, then the Kafka transaction will be 
rolled back so that those messages are not available to consumers. Setting this 
to true "
             + "requires that the <Delivery Guarantee> property be set to 
\"Guarantee Replicated Delivery.\"")
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .allowableValues("true", "false")
         .defaultValue("true")
         .required(true)
         .build();
-    static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new Builder()
         .name("transactional-id-prefix")
         .displayName("Transactional Id Prefix")
         .description("When Use Transaction is set to true, KafkaProducer 
config 'transactional.id' will be a generated UUID and will be prefixed with 
this string.")
-        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .expressionLanguageSupported(VARIABLE_REGISTRY)
         .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
         .required(false)
         .build();
-    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new Builder()
         .name("message-header-encoding")
         .displayName("Message Header Encoding")
         .description("For any attribute that is added as a message header, as 
configured via the <Attributes to Send as Headers> property, "
@@ -253,6 +277,7 @@ public class PublishKafkaRecord_2_0 extends 
AbstractProcessor {
     private static final Set<Relationship> RELATIONSHIPS;
 
     private volatile PublisherPool publisherPool = null;
+    private final RecordPathCache recordPathCache = new RecordPathCache(25);
 
     static {
         final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -280,6 +305,7 @@ public class PublishKafkaRecord_2_0 extends 
AbstractProcessor {
         properties.add(ACK_WAIT_TIME);
         properties.add(METADATA_WAIT_TIME);
         properties.add(PARTITION_CLASS);
+        properties.add(PARTITION);
         properties.add(COMPRESSION_CODEC);
 
         PROPERTIES = Collections.unmodifiableList(properties);
@@ -302,12 +328,12 @@ public class PublishKafkaRecord_2_0 extends 
AbstractProcessor {
 
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
+        return new Builder()
             .description("Specifies the value for '" + propertyDescriptorName 
+ "' Kafka Configuration.")
             .name(propertyDescriptorName)
             .addValidator(new 
KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
             .dynamic(true)
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .expressionLanguageSupported(VARIABLE_REGISTRY)
             .build();
     }
 
@@ -329,6 +355,32 @@ public class PublishKafkaRecord_2_0 extends 
AbstractProcessor {
             }
         }
 
+        final String partitionClass = 
validationContext.getProperty(PARTITION_CLASS).getValue();
+        if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
+            final String rawRecordPath = 
validationContext.getProperty(PARTITION).getValue();
+            if (rawRecordPath == null) {
+                results.add(new ValidationResult.Builder()
+                    .subject("Partition")
+                    .valid(false)
+                    .explanation("The <Partition> property must be specified 
if using the RecordPath Partitioning class")
+                    .build());
+            } else if 
(!validationContext.isExpressionLanguagePresent(rawRecordPath)) {
+                final ValidationResult result = new 
RecordPathValidator().validate(PARTITION.getDisplayName(), rawRecordPath, 
validationContext);
+                if (result != null) {
+                    results.add(result);
+                }
+            }
+        } else if 
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+            final String rawRecordPath = 
validationContext.getProperty(PARTITION).getValue();
+            if (rawRecordPath == null) {
+                results.add(new ValidationResult.Builder()
+                    .subject("Partition")
+                    .valid(false)
+                    .explanation("The <Partition> property must be specified 
if using the Expression Language Partitioning class")
+                    .build());
+            }
+        }
+
         return results;
     }
 
@@ -418,6 +470,8 @@ public class PublishKafkaRecord_2_0 extends 
AbstractProcessor {
                 final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
                 final String messageKeyField = 
context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
 
+                final Function<Record, Integer> partitioner = 
getPartitioner(context, flowFile);
+
                 try {
                     session.read(flowFile, new InputStreamCallback() {
                         @Override
@@ -427,7 +481,7 @@ public class PublishKafkaRecord_2_0 extends 
AbstractProcessor {
                                 final RecordSet recordSet = 
reader.createRecordSet();
 
                                 final RecordSchema schema = 
writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
-                                lease.publish(flowFile, recordSet, 
writerFactory, schema, messageKeyField, topic);
+                                lease.publish(flowFile, recordSet, 
writerFactory, schema, messageKeyField, topic, partitioner);
                             } catch (final SchemaNotFoundException | 
MalformedRecordException e) {
                                 throw new ProcessException(e);
                             }
@@ -464,4 +518,33 @@ public class PublishKafkaRecord_2_0 extends 
AbstractProcessor {
             }
         }
     }
+
+    private Function<Record, Integer> getPartitioner(final ProcessContext 
context, final FlowFile flowFile) {
+        final String partitionClass = 
context.getProperty(PARTITION_CLASS).getValue();
+        if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
+            final String recordPath = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+            final RecordPath compiled = 
recordPathCache.getCompiled(recordPath);
+
+            return record -> evaluateRecordPath(compiled, record);
+        } else if 
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+            final String partition = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+            final int hash = Objects.hashCode(partition);
+            return (record) -> hash;
+        }
+
+        return null;
+    }
+
+    private Integer evaluateRecordPath(final RecordPath recordPath, final 
Record record) {
+        final RecordPathResult result = recordPath.evaluate(record);
+        final LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);
+
+        result.getSelectedFields().forEach(fieldValue -> {
+            final Object value = fieldValue.getValue();
+            final long hash = Objects.hashCode(value);
+            accumulator.accumulate(hash);
+        });
+
+        return accumulator.intValue();
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
index fdfc2e8..a42860b 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
@@ -54,11 +54,14 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+
 @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "2.0"})
 @CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Kafka using the Kafka 2.0 Producer API."
     + "The messages to send may be individual FlowFiles or may be delimited, 
using a "
@@ -93,6 +96,9 @@ public class PublishKafka_2_0 extends AbstractProcessor {
             + "the next Partition to Partition 2, and so on, wrapping as 
necessary.");
     static final AllowableValue RANDOM_PARTITIONING = new 
AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
         "DefaultPartitioner", "Messages will be assigned to random 
partitions.");
+    static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new 
AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(), 
"Expression Language Partitioner",
+        "Interprets the <Partition> property as Expression Language that will 
be evaluated against each FlowFile. This Expression will be evaluated once 
against the FlowFile, " +
+            "so all Records in a given FlowFile will go to the same 
partition.");
 
     static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", 
"UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
     static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex 
Encoded",
@@ -186,11 +192,20 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
         .displayName("Partitioner class")
         .description("Specifies which class to use to compute a partition id 
for a message. Corresponds to Kafka's 'partitioner.class' property.")
-        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, 
EXPRESSION_LANGUAGE_PARTITIONING)
         .defaultValue(RANDOM_PARTITIONING.getValue())
         .required(false)
         .build();
 
+    static final PropertyDescriptor PARTITION = new 
PropertyDescriptor.Builder()
+        .name("partition")
+        .displayName("Partition")
+        .description("Specifies which Partition Records will go to.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+        .build();
+
     static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
         .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
         .displayName("Compression Type")
@@ -272,6 +287,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         properties.add(ACK_WAIT_TIME);
         properties.add(METADATA_WAIT_TIME);
         properties.add(PARTITION_CLASS);
+        properties.add(PARTITION);
         properties.add(COMPRESSION_CODEC);
 
         PROPERTIES = Collections.unmodifiableList(properties);
@@ -321,6 +337,18 @@ public class PublishKafka_2_0 extends AbstractProcessor {
             }
         }
 
+        final String partitionClass = 
validationContext.getProperty(PARTITION_CLASS).getValue();
+        if 
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+            final String rawRecordPath = 
validationContext.getProperty(PARTITION).getValue();
+            if (rawRecordPath == null) {
+                results.add(new ValidationResult.Builder()
+                    .subject("Partition")
+                    .valid(false)
+                    .explanation("The <Partition> property must be specified 
if using the Expression Language Partitioning class")
+                    .build());
+            }
+        }
+
         return results;
     }
 
@@ -412,11 +440,12 @@ public class PublishKafka_2_0 extends AbstractProcessor {
                     demarcatorBytes = null;
                 }
 
+                final Integer partition = getPartition(context, flowFile);
                 session.read(flowFile, new InputStreamCallback() {
                     @Override
                     public void process(final InputStream rawIn) throws 
IOException {
                         try (final InputStream in = new 
BufferedInputStream(rawIn)) {
-                            lease.publish(flowFile, in, messageKey, 
demarcatorBytes, topic);
+                            lease.publish(flowFile, in, messageKey, 
demarcatorBytes, topic, partition);
                         }
                     }
                 });
@@ -468,4 +497,16 @@ public class PublishKafka_2_0 extends AbstractProcessor {
 
         return DatatypeConverter.parseHexBinary(uninterpretedKey);
     }
+
+    private Integer getPartition(final ProcessContext context, final FlowFile 
flowFile) {
+        final String partitionClass = 
context.getProperty(PARTITION_CLASS).getValue();
+        if 
(EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
+            final String partition = 
context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+            final int hash = Objects.hashCode(partition);
+            return hash;
+        }
+
+        return null;
+    }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index a2ddd81..f8587d9 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -17,19 +17,6 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -48,6 +35,20 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.exception.TokenTooLargeException;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
 
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.regex.Pattern;
+
 public class PublisherLease implements Closeable {
     private final ComponentLog logger;
     private final Producer<byte[], byte[]> producer;
@@ -111,7 +112,7 @@ public class PublisherLease implements Closeable {
         rollback();
     }
 
-    void publish(final FlowFile flowFile, final InputStream flowFileContent, 
final byte[] messageKey, final byte[] demarcatorBytes, final String topic) 
throws IOException {
+    void publish(final FlowFile flowFile, final InputStream flowFileContent, 
final byte[] messageKey, final byte[] demarcatorBytes, final String topic, 
final Integer partition) throws IOException {
         if (tracker == null) {
             tracker = new InFlightMessageTracker(logger);
         }
@@ -126,13 +127,13 @@ public class PublisherLease implements Closeable {
                 // Send FlowFile content as it is, to support sending 0 byte 
message.
                 messageContent = new byte[(int) flowFile.getSize()];
                 StreamUtils.fillBuffer(flowFileContent, messageContent);
-                publish(flowFile, messageKey, messageContent, topic, tracker);
+                publish(flowFile, messageKey, messageContent, topic, tracker, 
partition);
                 return;
             }
 
             try (final StreamDemarcator demarcator = new 
StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
                 while ((messageContent = demarcator.nextToken()) != null) {
-                    publish(flowFile, messageKey, messageContent, topic, 
tracker);
+                    publish(flowFile, messageKey, messageContent, topic, 
tracker, partition);
 
                     if (tracker.isFailed(flowFile)) {
                         // If we have a failure, don't try to send anything 
else.
@@ -150,7 +151,7 @@ public class PublisherLease implements Closeable {
     }
 
     void publish(final FlowFile flowFile, final RecordSet recordSet, final 
RecordSetWriterFactory writerFactory, final RecordSchema schema,
-        final String messageKeyField, final String topic) throws IOException {
+                 final String messageKeyField, final String topic, final 
Function<Record, Integer> partitioner) throws IOException {
         if (tracker == null) {
             tracker = new InFlightMessageTracker(logger);
         }
@@ -176,7 +177,8 @@ public class PublisherLease implements Closeable {
                 final String key = messageKeyField == null ? null : 
record.getAsString(messageKeyField);
                 final byte[] messageKey = (key == null) ? null : 
key.getBytes(StandardCharsets.UTF_8);
 
-                publish(flowFile, additionalAttributes, messageKey, 
messageContent, topic, tracker);
+                final Integer partition = partitioner == null ? null : 
partitioner.apply(record);
+                publish(flowFile, additionalAttributes, messageKey, 
messageContent, topic, tracker, partition);
 
                 if (tracker.isFailed(flowFile)) {
                     // If we have a failure, don't try to send anything else.
@@ -217,14 +219,15 @@ public class PublisherLease implements Closeable {
         }
     }
 
-    protected void publish(final FlowFile flowFile, final byte[] messageKey, 
final byte[] messageContent, final String topic, final InFlightMessageTracker 
tracker) {
-        publish(flowFile, Collections.emptyMap(), messageKey, messageContent, 
topic, tracker);
+    protected void publish(final FlowFile flowFile, final byte[] messageKey, 
final byte[] messageContent, final String topic, final InFlightMessageTracker 
tracker, final Integer partition) {
+        publish(flowFile, Collections.emptyMap(), messageKey, messageContent, 
topic, tracker, partition);
     }
 
-    protected void publish(final FlowFile flowFile, final Map<String, String> 
additionalAttributes,
-        final byte[] messageKey, final byte[] messageContent, final String 
topic, final InFlightMessageTracker tracker) {
+    protected void publish(final FlowFile flowFile, final Map<String, String> 
additionalAttributes, final byte[] messageKey, final byte[] messageContent,
+                           final String topic, final InFlightMessageTracker 
tracker, final Integer partition) {
 
-        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(topic, null, messageKey, messageContent);
+        final Integer moddedPartition = partition == null ? null : 
Math.abs(partition) % (producer.partitionsFor(topic).size());
+        final ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(topic, moddedPartition, messageKey, messageContent);
         addHeaders(flowFile, additionalAttributes, record);
 
         producer.send(record, new Callback() {
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
index ec07da9..0de5111 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
@@ -17,6 +17,10 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.nifi.logging.ComponentLog;
+
 import java.io.Closeable;
 import java.nio.charset.Charset;
 import java.util.HashMap;
@@ -26,10 +30,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.nifi.logging.ComponentLog;
-
 public class PublisherPool implements Closeable {
     private final ComponentLog logger;
     private final BlockingQueue<PublisherLease> publisherQueue;
@@ -77,7 +77,6 @@ public class PublisherPool implements Closeable {
         }
 
         final Producer<byte[], byte[]> producer = new 
KafkaProducer<>(properties);
-
         final PublisherLease lease = new PublisherLease(producer, 
maxMessageSize, maxAckWaitMillis, logger, useTransactions, attributeNameRegex, 
headerCharacterSet) {
             @Override
             public void close() {
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
index ed5cf05..e80ce0d 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
@@ -23,6 +23,7 @@ import 
org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
 import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
@@ -33,6 +34,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.AdditionalMatchers;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -42,13 +45,16 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -67,7 +73,7 @@ public class TestPublishKafkaRecord_2_0 {
         mockPool = mock(PublisherPool.class);
         mockLease = mock(PublisherLease.class);
         
Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-            any(RecordSchema.class), any(String.class), any(String.class));
+            any(RecordSchema.class), any(String.class), any(String.class), 
nullable(Function.class));
 
         when(mockPool.obtainPublisher()).thenReturn(mockLease);
 
@@ -107,7 +113,7 @@ public class TestPublishKafkaRecord_2_0 {
         
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 1);
 
         verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -126,7 +132,7 @@ public class TestPublishKafkaRecord_2_0 {
         
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 3);
 
         verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -142,7 +148,7 @@ public class TestPublishKafkaRecord_2_0 {
         
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 1);
 
         verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
     }
@@ -160,7 +166,7 @@ public class TestPublishKafkaRecord_2_0 {
         
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 3);
 
         verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
     }
@@ -183,8 +189,9 @@ public class TestPublishKafkaRecord_2_0 {
         
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 2);
 
         verify(mockLease, times(2)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
-        verify(mockLease, times(0)).publish(any(FlowFile.class), 
any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), 
any(InFlightMessageTracker.class));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
+        verify(mockLease, times(0)).publish(
+            any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), 
eq(TOPIC_NAME), any(InFlightMessageTracker.class), any(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -214,7 +221,7 @@ public class TestPublishKafkaRecord_2_0 {
         
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 1);
 
         verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -223,6 +230,61 @@ public class TestPublishKafkaRecord_2_0 {
         mff.assertAttributeEquals("msg.count", "0");
     }
 
+    @Test
+    public void testRecordPathPartition() throws IOException {
+        runner.setProperty(PublishKafkaRecord_2_0.PARTITION_CLASS, 
PublishKafkaRecord_2_0.RECORD_PATH_PARTITIONING);
+        runner.setProperty(PublishKafkaRecord_2_0.PARTITION, "/age");
+
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 48\nJim Doe, 
13"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 0);
+
+        final PublishResult result = createPublishResult(msgCounts, new 
HashSet<>(flowFiles), Collections.emptyMap());
+
+
+        mockLease = mock(PublisherLease.class);
+
+        when(mockLease.complete()).thenReturn(result);
+        when(mockPool.obtainPublisher()).thenReturn(mockLease);
+
+        final Map<Integer, List<Integer>> partitionsByAge = new HashMap<>();
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(final InvocationOnMock invocationOnMock) 
throws Throwable {
+                final Function<Record, Integer> partitioner = 
invocationOnMock.getArgument(6, Function.class);
+                final RecordSet recordSet = invocationOnMock.getArgument(1, 
RecordSet.class);
+
+                Record record;
+                while ((record = recordSet.next()) != null) {
+                    final int partition = partitioner.apply(record);
+                    final Integer age = record.getAsInt("age");
+
+                    partitionsByAge.computeIfAbsent(age, k -> new 
ArrayList<>()).add(partition);
+                }
+
+                return null;
+            }
+        }).when(mockLease).publish(any(FlowFile.class), any(RecordSet.class), 
any(RecordSetWriterFactory.class),
+            nullable(RecordSchema.class), nullable(String.class), 
any(String.class), nullable(Function.class));
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
+            nullable(RecordSchema.class), nullable(String.class), 
any(String.class), nullable(Function.class));
+
+        assertEquals(2, partitionsByAge.size()); // 2 ages
+
+        final List<Integer> partitionsForAge13 = partitionsByAge.get(13);
+        assertEquals(1, partitionsForAge13.size());
+
+        final List<Integer> partitionsForAge48 = partitionsByAge.get(48);
+        assertEquals(2, partitionsForAge48.size());
+        assertEquals(partitionsForAge48.get(0), partitionsForAge48.get(1));
+    }
+
 
     @Test
     public void testSomeSuccessSomeFailure() throws IOException {
@@ -249,7 +311,7 @@ public class TestPublishKafkaRecord_2_0 {
         runner.assertTransferCount(PublishKafkaRecord_2_0.REL_FAILURE, 4);
 
         verify(mockLease, times(4)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
-                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME));
+                AdditionalMatchers.or(any(RecordSchema.class), isNull()), 
eq(null), eq(TOPIC_NAME), nullable(Function.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
 
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
index cd5fb96..14fb5d4 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
@@ -40,6 +40,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -79,7 +80,7 @@ public class TestPublishKafka_2_0 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_SUCCESS, 1);
 
-        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), 
nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -98,7 +99,7 @@ public class TestPublishKafka_2_0 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_SUCCESS, 3);
 
-        verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), 
nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -113,7 +114,7 @@ public class TestPublishKafka_2_0 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 1);
 
-        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), 
nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
     }
@@ -130,7 +131,7 @@ public class TestPublishKafka_2_0 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 3);
 
-        verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), 
nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
     }
@@ -152,7 +153,7 @@ public class TestPublishKafka_2_0 {
         runner.run();
         runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_SUCCESS, 2);
 
-        verify(mockLease, times(2)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(2)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), 
nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(0)).poison();
         verify(mockLease, times(1)).close();
@@ -191,7 +192,7 @@ public class TestPublishKafka_2_0 {
         runner.assertTransferCount(PublishKafka_2_0.REL_SUCCESS, 0);
         runner.assertTransferCount(PublishKafka_2_0.REL_FAILURE, 4);
 
-        verify(mockLease, times(4)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(4)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), 
nullable(Integer.class));
         verify(mockLease, times(1)).complete();
         verify(mockLease, times(1)).close();
 
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 141402e..60d157c 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -94,7 +94,7 @@ public class TestPublisherLease {
         };
 
         try {
-            lease.publish(flowFile, failureInputStream, messageKey, 
demarcatorBytes, topic);
+            lease.publish(flowFile, failureInputStream, messageKey, 
demarcatorBytes, topic, null);
             Assert.fail("Expected IOException");
         } catch (final IOException ioe) {
             // expected
@@ -133,7 +133,7 @@ public class TestPublisherLease {
             }
         }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
 
-        lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), 
messageKey, demarcatorBytes, topic);
+        lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), 
messageKey, demarcatorBytes, topic, null);
 
         assertEquals(1, poisonCount.get());
 
@@ -178,16 +178,16 @@ public class TestPublisherLease {
         final byte[] demarcatorBytes = "\n".getBytes(StandardCharsets.UTF_8);
 
         final byte[] flowFileContent = 
"1234567890\n1234567890\n1234567890\n\n\n\n1234567890\n\n\n1234567890\n\n\n\n".getBytes(StandardCharsets.UTF_8);
-        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), 
messageKey, demarcatorBytes, topic);
+        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), 
messageKey, demarcatorBytes, topic, null);
 
         final byte[] flowFileContent2 = new byte[0];
-        lease.publish(new MockFlowFile(2L), new 
ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic);
+        lease.publish(new MockFlowFile(2L), new 
ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic, 
null);
 
         final byte[] flowFileContent3 = 
"1234567890\n1234567890".getBytes(StandardCharsets.UTF_8); // no trailing new 
line
-        lease.publish(new MockFlowFile(3L), new 
ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic);
+        lease.publish(new MockFlowFile(3L), new 
ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic, 
null);
 
         final byte[] flowFileContent4 = 
"\n\n\n".getBytes(StandardCharsets.UTF_8);
-        lease.publish(new MockFlowFile(4L), new 
ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic);
+        lease.publish(new MockFlowFile(4L), new 
ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic, 
null);
 
         assertEquals(0, poisonCount.get());
 
@@ -239,7 +239,7 @@ public class TestPublisherLease {
         final byte[] demarcatorBytes = null;
 
         final byte[] flowFileContent = new byte[0];
-        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), 
messageKey, demarcatorBytes, topic);
+        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), 
messageKey, demarcatorBytes, topic, null);
 
         assertEquals(0, poisonCount.get());
 
@@ -278,7 +278,7 @@ public class TestPublisherLease {
 
         Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any(), 
eq(flowFile))).thenReturn(writer);
 
-        lease.publish(flowFile, recordSet, writerFactory, schema, keyField, 
topic);
+        lease.publish(flowFile, recordSet, writerFactory, schema, keyField, 
topic, null);
 
         verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), 
any(), eq(flowFile));
         verify(writer, times(2)).write(any(Record.class));

Reply via email to