This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 767435de8be46bc5f5a1ae3ea180569a3bcaf8a2
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Dec 1 14:05:08 2021 +0100

    MINIFICPP-1677 Add SASL options to Kafka processors
    
    - Add new security options: SASL_PLAIN and SASL_SSL
    - Add SASL mechanism property with GSSAPI and PLAIN support
    - Add Username and Password properties for PLAIN mechanism
    - Extract common Kafka authentication code to add the same authentication 
options to ConsumeKafka processor as PublishKafka
    - Add Kafka broker configuration and tests for SASL/PLAIN and SASL/SSL with 
username and password authentication
    
    Closes #1212
    Signed-off-by: Marton Szasz <[email protected]>
---
 PROCESSORS.md                                      |  15 +-
 docker/test/integration/features/kafka.feature     | 123 +++++++++++
 .../minifi/core/KafkaBrokerContainer.py            |   8 +-
 .../integration/resources/kafka_broker/Dockerfile  |   2 +-
 .../kafka_broker/conf/server-ssl.properties        | 155 --------------
 .../resources/kafka_broker/conf/server.properties  |  50 ++++-
 extensions/librdkafka/ConsumeKafka.cpp             |  76 ++-----
 extensions/librdkafka/ConsumeKafka.h               |  15 +-
 extensions/librdkafka/KafkaProcessorBase.cpp       | 135 ++++++++++++
 extensions/librdkafka/KafkaProcessorBase.h         |  69 ++++++
 extensions/librdkafka/PublishKafka.cpp             | 237 +++++----------------
 extensions/librdkafka/PublishKafka.h               |  25 ++-
 libminifi/include/utils/ProcessorConfigUtils.h     |  18 +-
 libminifi/src/utils/ProcessorConfigUtils.cpp       |  18 +-
 14 files changed, 501 insertions(+), 445 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 8c85ec9..2a1bcbb 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -253,17 +253,23 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 |Headers To Add As Attributes|||A comma separated list to match against all 
message headers. Any message header whose name matches an item from the list 
will be added to the FlowFile as an Attribute. If not specified, no Header 
values will be added as FlowFile attributes. The behaviour on when multiple 
headers of the same name are present is set using the DuplicateHeaderHandling 
attribute.|
 |**Honor Transactions**|true||Specifies whether or not MiNiFi should honor 
transactional guarantees when communicating with Kafka. If false, the Processor 
will use an "isolation level" of read_uncomitted. This means that messages will 
be received as soon as they are written to Kafka but will be pulled, even if 
the producer cancels the transactions. If this value is true, MiNiFi will not 
receive any messages for which the producer's transaction was canceled, but 
this can result in some la [...]
 |**Kafka Brokers**|localhost:9092||A comma-separated list of known Kafka 
Brokers in the format <host>:<port>.<br/>**Supports Expression Language: true**|
+|Kerberos Keytab Path|||The path to the location on the local filesystem where 
the kerberos keytab is located. Read permission on the file is required.|
+|Kerberos Principal|||Keberos Principal|
+|Kerberos Service Name|||Kerberos Service Name|
 |**Key Attribute Encoding**|UTF-8|Hex<br>UTF-8<br>|FlowFiles that are emitted 
have an attribute named 'kafka.key'. This property dictates how the value of 
the attribute should be encoded.|
 |Max Poll Records|10000||Specifies the maximum number of records Kafka should 
return when polling each time the processor is triggered.|
 |**Max Poll Time**|4 seconds||Specifies the maximum amount of time the 
consumer can use for polling data from the brokers. Polling is a blocking 
operation, so the upper limit of this value is specified in 4 seconds.|
 |Message Demarcator|||Since KafkaConsumer receives messages in batches, you 
have an option to output FlowFiles which contains all Kafka messages in a 
single batch for a given topic and partition and this property allows you to 
provide a string (interpreted as UTF-8) to use for demarcating apart multiple 
Kafka messages. This is an optional property and if not provided each Kafka 
message received will result in a single FlowFile which time it is triggered. 
<br/>**Supports Expression Langua [...]
 |Message Header Encoding|UTF-8|Hex<br>UTF-8<br>|Any message header that is 
found on a Kafka message will be added to the outbound FlowFile as an 
attribute. This property indicates the Character Encoding to use for 
deserializing the headers.|
 |**Offset Reset**|latest|earliest<br>latest<br>none<br>|Allows you to manage 
the condition when there is no initial offset in Kafka or if the current offset 
does not exist any more on the server (e.g. because that data has been 
deleted). Corresponds to Kafka's 'auto.offset.reset' property.|
-|SSL Context Service|||SSL Context Service Name|
-|**Security Protocol**|plaintext|plaintext<br>ssl|Protocol used to communicate 
with brokers. Corresponds to Kafka's 'security.protocol' property.|
+|Password|||The password for the given username when the SASL Mechanism is 
sasl_plaintext|
+|SASL Mechanism|GSSAPI|GSSAPI<br/>PLAIN|The SASL mechanism to use for 
authentication. Corresponds to Kafka's 'sasl.mechanism' property.|
+|**Security 
Protocol**|plaintext|plaintext<br/>ssl<br/>sasl_plaintext<br/>sasl_ssl|Protocol 
used to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.|
 |Session Timeout|60 seconds||Client group session and failure detection 
timeout. The consumer sends periodic heartbeats to indicate its liveness to the 
broker. If no hearts are received by the broker for a group member within the 
session timeout, the broker will remove the consumer from the group and trigger 
a rebalance. The allowed range is configured with the broker configuration 
properties group.min.session.timeout.ms and group.max.session.timeout.ms.|
+|SSL Context Service|||SSL Context Service Name|
 |**Topic Name Format**|Names|Names<br>Patterns<br>|Specifies whether the 
Topic(s) provided are a comma separated list of names or a single regular 
expression. Using regular expressions does not automatically discover Kafka 
topics created after the processor started.|
 |**Topic Names**|||The name of the Kafka Topic(s) to pull from. Multiple topic 
names are supported as a comma separated list.<br/>**Supports Expression 
Language: true**|
+|Username|||The username when the SASL Mechanism is sasl_plaintext|
 ### Properties
 
 | Name | Description |
@@ -1187,16 +1193,19 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 |Kafka Key|||The key to use for the message. If not specified, the UUID of the 
flow file is used as the message key.<br/>**Supports Expression Language: 
true**|
 |Message Key Field|||DEPRECATED, does not work -- use Kafka Key instead|
 |Message Timeout|30 sec||The total time sending a message could take|
+|Password|||The password for the given username when the SASL Mechanism is 
sasl_plaintext|
 |Queue Buffering Max Time|||Delay to wait for messages in the producer queue 
to accumulate before constructing message batches|
 |Queue Max Buffer Size|||Maximum total message size sum allowed on the 
producer queue|
 |Queue Max Message|||Maximum number of messages allowed on the producer queue|
 |Request Timeout|10 sec||The ack timeout of the producer request|
+|SASL Mechanism|GSSAPI|GSSAPI<br/>PLAIN|The SASL mechanism to use for 
authentication. Corresponds to Kafka's 'sasl.mechanism' property.|
 |SSL Context Service|||SSL Context Service Name|
 |Security CA|||DEPRECATED in favor of SSL Context Service. File or directory 
path to CA certificate(s) for verifying the broker's key|
 |Security Cert|||DEPRECATED in favor of SSL Context Service. Path to client's 
public key (PEM) used for authentication|
 |Security Pass Phrase|||DEPRECATED in favor of SSL Context Service. Private 
key passphrase|
 |Security Private Key|||DEPRECATED in favor of SSL Context Service. Path to 
client's private key (PEM) used for authentication|
-|Security Protocol|||Protocol used to communicate with brokers|
+|**Security 
Protocol**|plaintext|plaintext<br/>ssl<br/>sasl_plaintext<br/>sasl_ssl|Protocol 
used to communicate with brokers. Corresponds to Kafka's 'security.protocol' 
property.|
+|Username|||The username when the SASL Mechanism is sasl_plaintext|
 |Target Batch Payload Size|512 KB||The target total payload size for a batch. 
0 B means unlimited (Batch Size is still applied).|
 |**Topic Name**|||The Kafka Topic of interest<br/>**Supports Expression 
Language: true**|
 ### Relationships
diff --git a/docker/test/integration/features/kafka.feature 
b/docker/test/integration/features/kafka.feature
index ab4c34e..9825b55 100644
--- a/docker/test/integration/features/kafka.feature
+++ b/docker/test/integration/features/kafka.feature
@@ -87,6 +87,89 @@ Feature: Sending data to using Kafka streaming platform 
using PublishKafka
     # We fallback to the flowfile's uuid as message key if the Kafka Key 
property is not set
     And the Minifi logs match the following regex: "PublishKafka: Message Key 
\[[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}\]" in less than 
10 seconds
 
+  Scenario: A MiNiFi instance transfers data to a kafka broker through SASL 
Plain security protocol
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker 
instance
+    And these processor properties are set:
+      | processor name | property name          | property value               
          |
+      | PublishKafka   | Topic Name             | test                         
          |
+      | PublishKafka   | Request Timeout        | 10 sec                       
          |
+      | PublishKafka   | Message Timeout        | 12 sec                       
          |
+      | PublishKafka   | Known Brokers          | kafka-broker:9094            
          |
+      | PublishKafka   | Client Name            | LMN                          
          |
+      | PublishKafka   | Security Protocol      | sasl_plaintext               
          |
+      | PublishKafka   | SASL Mechanism         | PLAIN                        
          |
+      | PublishKafka   | Username               | alice                        
          |
+      | PublishKafka   | Password               | alice-secret                 
          |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishKafka
+    And the "success" relationship of the PublishKafka processor is connected 
to the PutFile
+
+    And a kafka broker is set up in correspondence with the PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+
+  Scenario: PublishKafka sends can use SASL SSL connect with security 
properties
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker 
instance
+    And these processor properties are set:
+      | processor name | property name          | property value               
              |
+      | PublishKafka   | Client Name            | LMN                          
              |
+      | PublishKafka   | Known Brokers          | kafka-broker:9095            
              |
+      | PublishKafka   | Topic Name             | test                         
              |
+      | PublishKafka   | Batch Size             | 10                           
              |
+      | PublishKafka   | Compress Codec         | none                         
              |
+      | PublishKafka   | Delivery Guarantee     | 1                            
              |
+      | PublishKafka   | Request Timeout        | 10 sec                       
              |
+      | PublishKafka   | Message Timeout        | 12 sec                       
              |
+      | PublishKafka   | Security CA            | /tmp/resources/certs/ca-cert 
              |
+      | PublishKafka   | Security Cert          | 
/tmp/resources/certs/client_LMN_client.pem |
+      | PublishKafka   | Security Pass Phrase   | abcdefgh                     
              |
+      | PublishKafka   | Security Private Key   | 
/tmp/resources/certs/client_LMN_client.key |
+      | PublishKafka   | Security Protocol      | sasl_ssl                     
              |
+      | PublishKafka   | SASL Mechanism         | PLAIN                        
              |
+      | PublishKafka   | Username               | alice                        
              |
+      | PublishKafka   | Password               | alice-secret                 
              |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PublishKafka
+    And the "success" relationship of the PublishKafka processor is connected 
to the PutFile
+
+    And a kafka broker is set up in correspondence with the PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+
+  Scenario: PublishKafka sends can use SASL SSL connect with SSL Context
+    Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker 
instance
+    And these processor properties are set:
+      | processor name | property name          | property value               
              |
+      | PublishKafka   | Client Name            | LMN                          
              |
+      | PublishKafka   | Known Brokers          | kafka-broker:9095            
              |
+      | PublishKafka   | Topic Name             | test                         
              |
+      | PublishKafka   | Batch Size             | 10                           
              |
+      | PublishKafka   | Compress Codec         | none                         
              |
+      | PublishKafka   | Delivery Guarantee     | 1                            
              |
+      | PublishKafka   | Request Timeout        | 10 sec                       
              |
+      | PublishKafka   | Message Timeout        | 12 sec                       
              |
+      | PublishKafka   | Security Protocol      | sasl_ssl                     
              |
+      | PublishKafka   | SASL Mechanism         | PLAIN                        
              |
+      | PublishKafka   | Username               | alice                        
              |
+      | PublishKafka   | Password               | alice-secret                 
              |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And an ssl context service set up for PublishKafka
+    And the "success" relationship of the GetFile processor is connected to 
the PublishKafka
+    And the "success" relationship of the PublishKafka processor is connected 
to the PutFile
+
+    And a kafka broker is set up in correspondence with the PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored 
directory in less than 60 seconds
+
   Scenario: PublishKafka sends can use SSL connect with SSL Context Service
     Given a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
     And a file with the content "test" is present in "/tmp/input"
@@ -327,3 +410,43 @@ Feature: Sending data to using Kafka streaming platform 
using PublishKafka
     And a message with content "Lewis Carroll" is published to the 
"ConsumeKafkaTest" topic using an ssl connection
 
     Then two flowfiles with the contents "Alice's Adventures in Wonderland" 
and "Lewis Carroll" are placed in the monitored directory in less than 60 
seconds
+
+  Scenario: ConsumeKafka receives data via SASL SSL
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And these processor properties are set:
+      | processor name | property name        | property value                 
            |
+      | ConsumeKafka   | Kafka Brokers        | kafka-broker:9095              
            |
+      | ConsumeKafka   | Security Protocol    | sasl_ssl                       
            |
+      | ConsumeKafka   | SASL Mechanism       | PLAIN                          
            |
+      | ConsumeKafka   | Username             | alice                          
            |
+      | ConsumeKafka   | Password             | alice-secret                   
            |
+    And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "kafka-consumer-flow" flow
+    And an ssl context service set up for ConsumeKafka
+    And the "success" relationship of the ConsumeKafka processor is connected 
to the PutFile
+
+    And a kafka broker is set up in correspondence with the publisher flow
+
+    When all instances start up
+    And a message with content "Alice's Adventures in Wonderland" is published 
to the "ConsumeKafkaTest" topic using an ssl connection
+    And a message with content "Lewis Carroll" is published to the 
"ConsumeKafkaTest" topic using an ssl connection
+
+    Then two flowfiles with the contents "Alice's Adventures in Wonderland" 
and "Lewis Carroll" are placed in the monitored directory in less than 60 
seconds
+
+  Scenario: MiNiFi consumes data from a kafka topic via SASL PLAIN connection
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" 
in the "kafka-consumer-flow" flow
+    And the "success" relationship of the ConsumeKafka processor is connected 
to the PutFile
+    And these processor properties are set:
+      | processor name | property name        | property value                 
            |
+      | ConsumeKafka   | Kafka Brokers        | kafka-broker:9094              
            |
+      | ConsumeKafka   | Security Protocol    | sasl_plaintext                 
            |
+      | ConsumeKafka   | SASL Mechanism       | PLAIN                          
            |
+      | ConsumeKafka   | Username             | alice                          
            |
+      | ConsumeKafka   | Password             | alice-secret                   
            |
+
+    And a kafka broker is set up in correspondence with the third-party kafka 
publisher
+
+    When all instances start up
+    And a message with content "some test message" is published to the 
"ConsumeKafkaTest" topic
+
+    Then at least one flowfile with the content "some test message" is placed 
in the monitored directory in less than 60 seconds
diff --git a/docker/test/integration/minifi/core/KafkaBrokerContainer.py 
b/docker/test/integration/minifi/core/KafkaBrokerContainer.py
index df215ef..64346c3 100644
--- a/docker/test/integration/minifi/core/KafkaBrokerContainer.py
+++ b/docker/test/integration/minifi/core/KafkaBrokerContainer.py
@@ -19,15 +19,15 @@ class KafkaBrokerContainer(Container):
             detach=True,
             name='kafka-broker',
             network=self.network.name,
-            ports={'9092/tcp': 9092, '29092/tcp': 29092, '9093/tcp': 9093, 
'29093/tcp': 29093},
+            ports={'9092/tcp': 9092, '29092/tcp': 29092, '9093/tcp': 9093, 
'29093/tcp': 29093, '9094/tcp': 9094, '29094/tcp': 29094, '9094/tcp': 9094, 
'29095/tcp': 29095},
             environment=[
                 "KAFKA_BROKER_ID=1",
                 "ALLOW_PLAINTEXT_LISTENER=yes",
                 "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true",
-                
"KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,SSL_HOST://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:29092",
-                
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL",
+                
"KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,SASL_PLAINTEXT://kafka-broker:9094,SASL_SSL://kafka-broker:9095,SSL_HOST://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:29092,SASL_PLAINTEXT_HOST://0.0.0.0:29094,SASL_SSL_HOST://0.0.0.0:29095",
+                
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_PLAINTEXT_HOST:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL",
                 "KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SSL",
-                
"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,PLAINTEXT_HOST://localhost:29092,SSL://kafka-broker:9093,SSL_HOST://localhost:29093",
+                
"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,PLAINTEXT_HOST://localhost:29092,SSL://kafka-broker:9093,SSL_HOST://localhost:29093,SASL_PLAINTEXT://kafka-broker:9094,SASL_PLAINTEXT_HOST://localhost:29094,SASL_SSL://kafka-broker:9095,SASL_SSL_HOST://localhost:29095",
                 "KAFKA_HEAP_OPTS=-Xms512m -Xmx1g",
                 "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
                 "SSL_CLIENT_AUTH=none"])
diff --git a/docker/test/integration/resources/kafka_broker/Dockerfile 
b/docker/test/integration/resources/kafka_broker/Dockerfile
index 31472c9..9c332a1 100644
--- a/docker/test/integration/resources/kafka_broker/Dockerfile
+++ b/docker/test/integration/resources/kafka_broker/Dockerfile
@@ -1,3 +1,3 @@
 FROM wurstmeister/kafka:2.12-2.5.0
-ADD conf/server-ssl.properties $KAFKA_HOME/config/server.properties
+ADD conf/server.properties $KAFKA_HOME/config/server.properties
 ADD conf/ /usr/local/etc/kafka/
diff --git 
a/docker/test/integration/resources/kafka_broker/conf/server-ssl.properties 
b/docker/test/integration/resources/kafka_broker/conf/server-ssl.properties
deleted file mode 100644
index c7158fa..0000000
--- a/docker/test/integration/resources/kafka_broker/conf/server-ssl.properties
+++ /dev/null
@@ -1,155 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# see kafka.server.KafkaConfig for additional details and defaults
-
-############################# Server Basics #############################
-
-# The id of the broker. This must be set to a unique integer for each broker.
-broker.id=0
-
-############################# Socket Server Settings 
#############################
-
-# The address the socket server listens on. It will get the value returned from
-# java.net.InetAddress.getCanonicalHostName() if not configured.
-#   FORMAT:
-#     listeners = listener_name://host_name:port
-#   EXAMPLE:
-#     listeners = PLAINTEXT://your.host.name:9092
-#listeners=PLAINTEXT://:9092
-
-# Hostname and port the broker will advertise to producers and consumers. If 
not set,
-# it uses the value for "listeners" if configured.  Otherwise, it will use the 
value
-# returned from java.net.InetAddress.getCanonicalHostName().
-#advertised.listeners=PLAINTEXT://your.host.name:9092
-
-# Maps listener names to security protocols, the default is for them to be the 
same. See the config documentation for more details
-#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
-
-# The number of threads that the server uses for receiving requests from the 
network and sending responses to the network
-num.network.threads=3
-
-# The number of threads that the server uses for processing requests, which 
may include disk I/O
-num.io.threads=8
-
-# The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer.bytes=102400
-
-# The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer.bytes=102400
-
-# The maximum size of a request that the socket server will accept (protection 
against OOM)
-socket.request.max.bytes=104857600
-
-
-############################# Log Basics #############################
-
-# A comma separated list of directories under which to store log files
-log.dirs=/tmp/kafka-logs
-
-# The default number of log partitions per topic. More partitions allow greater
-# parallelism for consumption, but this will also result in more files across
-# the brokers.
-num.partitions=1
-
-# The number of threads per data directory to be used for log recovery at 
startup and flushing at shutdown.
-# This value is recommended to be increased for installations with data dirs 
located in RAID array.
-num.recovery.threads.per.data.dir=1
-
-############################# Internal Topic Settings  
#############################
-# The replication factor for the group metadata internal topics 
"__consumer_offsets" and "__transaction_state"
-# For anything other than development testing, a value greater than 1 is 
recommended for to ensure availability such as 3.
-offsets.topic.replication.factor=1
-transaction.state.log.replication.factor=1
-transaction.state.log.min.isr=1
-
-############################# Log Flush Policy #############################
-
-# Messages are immediately written to the filesystem but by default we only 
fsync() to sync
-# the OS cache lazily. The following configurations control the flush of data 
to disk.
-# There are a few important trade-offs here:
-#    1. Durability: Unflushed data may be lost if you are not using 
replication.
-#    2. Latency: Very large flush intervals may lead to latency spikes when 
the flush does occur as there will be a lot of data to flush.
-#    3. Throughput: The flush is generally the most expensive operation, and a 
small flush interval may lead to excessive seeks.
-# The settings below allow one to configure the flush policy to flush data 
after a period of time or
-# every N messages (or both). This can be done globally and overridden on a 
per-topic basis.
-
-# The number of messages to accept before forcing a flush of data to disk
-#log.flush.interval.messages=10000
-
-# The maximum amount of time a message can sit in a log before we force a flush
-#log.flush.interval.ms=1000
-
-############################# Log Retention Policy 
#############################
-
-# The following configurations control the disposal of log segments. The 
policy can
-# be set to delete segments after a period of time, or after a given size has 
accumulated.
-# A segment will be deleted whenever *either* of these criteria are met. 
Deletion always happens
-# from the end of the log.
-
-# The minimum age of a log file to be eligible for deletion due to age
-log.retention.hours=168
-
-# A size-based retention policy for logs. Segments are pruned from the log 
unless the remaining
-# segments drop below log.retention.bytes. Functions independently of 
log.retention.hours.
-#log.retention.bytes=1073741824
-
-# The maximum size of a log segment file. When this size is reached a new log 
segment will be created.
-log.segment.bytes=1073741824
-
-# The interval at which log segments are checked to see if they can be deleted 
according
-# to the retention policies
-log.retention.check.interval.ms=300000
-
-############################# Zookeeper #############################
-
-# Zookeeper connection string (see zookeeper docs for details).
-# This is a comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
-# You can also append an optional chroot string to the urls to specify the
-# root directory for all kafka znodes.
-zookeeper.connect=localhost:2181
-
-# Timeout in ms for connecting to zookeeper
-zookeeper.connection.timeout.ms=6000
-
-
-############################# Group Coordinator Settings 
#############################
-
-# The following configuration specifies the time, in milliseconds, that the 
GroupCoordinator will delay the initial consumer rebalance.
-# The rebalance will be further delayed by the value of 
group.initial.rebalance.delay.ms as new members join the group, up to a maximum 
of max.poll.interval.ms.
-# The default value for this is 3 seconds.
-# We override this to 0 here as it makes for a better out-of-the-box 
experience for development and testing.
-# However, in production environments the default value of 3 seconds is more 
suitable as this will help to avoid unnecessary, and potentially expensive, 
rebalances during application startup.
-group.initial.rebalance.delay.ms=0
-
-listeners=SSL://kafka-broker:9093,SSL_HOST://0.0.0.0:29093
-advertised.listeners=SSL://kafka-broker:9093,SSL_HOST://localhost:29093
-listener.security.protocol.map=SSL:SSL,SSL_HOST:SSL
-
-# SSL
-ssl.protocol = TLS
-ssl.enabled.protocols=TLSv1.2
-ssl.keystore.type = JKS
-ssl.keystore.location = 
/usr/local/etc/kafka/certs/broker_kafka-broker_server.keystore.jks
-# ssl.keystore.location = 
/usr/local/etc/kafka/certs/broker_localhost_server.keystore.jks
-ssl.keystore.password = abcdefgh
-ssl.key.password = abcdefgh
-ssl.truststore.type = JKS
-ssl.truststore.location = 
/usr/local/etc/kafka/certs/broker_kafka-broker_server.truststore.jks
-# ssl.truststore.location = 
/usr/local/etc/kafka/certs/broker_localhost_server.truststore.jks
-ssl.truststore.password = abcdefgh
-# To require authentication of clients use "require", else "none" or "request"
-ssl.client.auth = required
diff --git 
a/docker/test/integration/resources/kafka_broker/conf/server.properties 
b/docker/test/integration/resources/kafka_broker/conf/server.properties
index 20d9095..e752fca 100644
--- a/docker/test/integration/resources/kafka_broker/conf/server.properties
+++ b/docker/test/integration/resources/kafka_broker/conf/server.properties
@@ -22,7 +22,7 @@ broker.id=0
 
 ############################# Socket Server Settings 
#############################
 
-# The address the socket server listens on. It will get the value returned 
from 
+# The address the socket server listens on. It will get the value returned from
 # java.net.InetAddress.getCanonicalHostName() if not configured.
 #   FORMAT:
 #     listeners = listener_name://host_name:port
@@ -30,7 +30,7 @@ broker.id=0
 #     listeners = PLAINTEXT://your.host.name:9092
 #listeners=PLAINTEXT://:9092
 
-# Hostname and port the broker will advertise to producers and consumers. If 
not set, 
+# Hostname and port the broker will advertise to producers and consumers. If 
not set,
 # it uses the value for "listeners" if configured.  Otherwise, it will use the 
value
 # returned from java.net.InetAddress.getCanonicalHostName().
 #advertised.listeners=PLAINTEXT://your.host.name:9092
@@ -57,7 +57,7 @@ socket.request.max.bytes=104857600
 ############################# Log Basics #############################
 
 # A comma separated list of directories under which to store log files
-log.dirs=/usr/local/var/lib/kafka-logs
+log.dirs=/tmp/kafka-logs
 
 # The default number of log partitions per topic. More partitions allow greater
 # parallelism for consumption, but this will also result in more files across
@@ -133,4 +133,46 @@ zookeeper.connection.timeout.ms=6000
 # The default value for this is 3 seconds.
 # We override this to 0 here as it makes for a better out-of-the-box 
experience for development and testing.
 # However, in production environments the default value of 3 seconds is more 
suitable as this will help to avoid unnecessary, and potentially expensive, 
rebalances during application startup.
-group.initial.rebalance.delay.ms=0
\ No newline at end of file
+group.initial.rebalance.delay.ms=0
+
+sasl.enabled.mechanisms=PLAIN
+sasl.mechanism.inter.broker.protocol=PLAIN
+confluent.metrics.reporter.sasl.mechanism=PLAIN
+listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
 required \
+  username="admin" \
+  password="admin-secret" \
+  user_admin="admin-secret" \
+  user_alice="alice-secret";
+
+listener.name.sasl_plaintext_host.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
 required \
+  username="admin" \
+  password="admin-secret" \
+  user_admin="admin-secret" \
+  user_alice="alice-secret";
+
+listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
 required \
+  username="admin" \
+  password="admin-secret" \
+  user_admin="admin-secret" \
+  user_alice="alice-secret";
+
+listener.name.sasl_ssl_host.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
 required \
+  username="admin" \
+  password="admin-secret" \
+  user_admin="admin-secret" \
+  user_alice="alice-secret";
+
+# SSL
+ssl.protocol = TLS
+ssl.enabled.protocols=TLSv1.2
+ssl.keystore.type = JKS
+ssl.keystore.location = 
/usr/local/etc/kafka/certs/broker_kafka-broker_server.keystore.jks
+# ssl.keystore.location = 
/usr/local/etc/kafka/certs/broker_localhost_server.keystore.jks
+ssl.keystore.password = abcdefgh
+ssl.key.password = abcdefgh
+ssl.truststore.type = JKS
+ssl.truststore.location = 
/usr/local/etc/kafka/certs/broker_kafka-broker_server.truststore.jks
+# ssl.truststore.location = 
/usr/local/etc/kafka/certs/broker_localhost_server.truststore.jks
+ssl.truststore.password = abcdefgh
+# To require authentication of clients use "require", else "none" or "request"
+ssl.client.auth = required
diff --git a/extensions/librdkafka/ConsumeKafka.cpp 
b/extensions/librdkafka/ConsumeKafka.cpp
index e27051e..eef2fb4 100644
--- a/extensions/librdkafka/ConsumeKafka.cpp
+++ b/extensions/librdkafka/ConsumeKafka.cpp
@@ -65,13 +65,6 @@ core::Property 
ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty(
   ->isRequired(true)
   ->build());
 
-core::Property 
ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security 
Protocol")
-  ->withDescription("Protocol used to communicate with brokers. Corresponds to 
Kafka's 'security.protocol' property.")
-  ->withAllowableValues<std::string>({SECURITY_PROTOCOL_PLAINTEXT, 
SECURITY_PROTOCOL_SSL})
-  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
-  ->isRequired(true)
-  ->build());
-
 core::Property 
ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
   ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple 
topic names are supported as a comma separated list.")
   ->supportsExpressionLanguage(true)
@@ -168,18 +161,19 @@ core::Property 
ConsumeKafka::SessionTimeout(core::PropertyBuilder::createPropert
   ->withDefaultValue<core::TimePeriodValue>("60 seconds")
   ->build());
 
-core::Property ConsumeKafka::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")
-        ->withDescription("SSL Context Service Name")
-        ->asType<minifi::controllers::SSLContextService>()
-        ->build());
-
 const core::Relationship ConsumeKafka::Success("success", "Incoming Kafka 
messages as flowfiles. Depending on the demarcation strategy, this can be one 
or multiple flowfiles per message.");
 
 void ConsumeKafka::initialize() {
   setSupportedProperties({
-    KafkaBrokers,
     SecurityProtocol,
+    SSLContextService,
+    KerberosServiceName,
+    KerberosPrincipal,
+    KerberosKeytabPath,
+    SASLMechanism,
+    Username,
+    Password,
+    KafkaBrokers,
     TopicNames,
     TopicNameFormat,
     HonorTransactions,
@@ -192,8 +186,7 @@ void ConsumeKafka::initialize() {
     DuplicateHeaderHandling,
     MaxPollRecords,
     MaxPollTime,
-    SessionTimeout,
-    SSLContextService
+    SessionTimeout
   });
   setSupportedRelationships({
     Success,
@@ -203,40 +196,22 @@ void ConsumeKafka::initialize() {
 void ConsumeKafka::onSchedule(core::ProcessContext* context, 
core::ProcessSessionFactory* /* sessionFactory */) {
   gsl_Expects(context);
   // Required properties
-  kafka_brokers_                = utils::getRequiredPropertyOrThrow(context, 
KafkaBrokers.getName());
-  security_protocol_            = utils::getRequiredPropertyOrThrow(context, 
SecurityProtocol.getName());
-  topic_names_                  = 
utils::listFromRequiredCommaSeparatedProperty(context, TopicNames.getName());
-  topic_name_format_            = utils::getRequiredPropertyOrThrow(context, 
TopicNameFormat.getName());
-  honor_transactions_           = utils::parseBooleanPropertyOrThrow(context, 
HonorTransactions.getName());
-  group_id_                     = utils::getRequiredPropertyOrThrow(context, 
GroupID.getName());
-  offset_reset_                 = utils::getRequiredPropertyOrThrow(context, 
OffsetReset.getName());
-  key_attribute_encoding_       = utils::getRequiredPropertyOrThrow(context, 
KeyAttributeEncoding.getName());
-  max_poll_time_milliseconds_   = utils::parseTimePropertyMSOrThrow(context, 
MaxPollTime.getName());
-  session_timeout_milliseconds_ = utils::parseTimePropertyMSOrThrow(context, 
SessionTimeout.getName());
+  kafka_brokers_                = utils::getRequiredPropertyOrThrow(*context, 
KafkaBrokers.getName());
+  topic_names_                  = 
utils::listFromRequiredCommaSeparatedProperty(*context, TopicNames.getName());
+  topic_name_format_            = utils::getRequiredPropertyOrThrow(*context, 
TopicNameFormat.getName());
+  honor_transactions_           = utils::parseBooleanPropertyOrThrow(*context, 
HonorTransactions.getName());
+  group_id_                     = utils::getRequiredPropertyOrThrow(*context, 
GroupID.getName());
+  offset_reset_                 = utils::getRequiredPropertyOrThrow(*context, 
OffsetReset.getName());
+  key_attribute_encoding_       = utils::getRequiredPropertyOrThrow(*context, 
KeyAttributeEncoding.getName());
+  max_poll_time_milliseconds_   = utils::parseTimePropertyMSOrThrow(*context, 
MaxPollTime.getName());
+  session_timeout_milliseconds_ = utils::parseTimePropertyMSOrThrow(*context, 
SessionTimeout.getName());
 
   // Optional properties
   context->getProperty(MessageDemarcator.getName(), message_demarcator_);
   context->getProperty(MessageHeaderEncoding.getName(), 
message_header_encoding_);
   context->getProperty(DuplicateHeaderHandling.getName(), 
duplicate_header_handling_);
 
-  std::string ssl_service_name;
-  std::shared_ptr<minifi::controllers::SSLContextService> ssl_service;
-  if (context->getProperty(SSLContextService.getName(), ssl_service_name) && 
!ssl_service_name.empty()) {
-    std::shared_ptr<core::controller::ControllerService> service = 
context->getControllerService(ssl_service_name);
-    if (service) {
-      ssl_service = 
std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-      ssl_data_.ca_loc = ssl_service->getCACertificate();
-      ssl_data_.cert_loc = ssl_service->getCertificateFile();
-      ssl_data_.key_loc = ssl_service->getPrivateKeyFile();
-      ssl_data_.key_pw = ssl_service->getPassphrase();
-    } else {
-      logger_->log_warn("SSL Context Service property is set to '%s', but the 
controller service could not be found.", ssl_service_name);
-    }
-  } else if (security_protocol_ == SECURITY_PROTOCOL_SSL) {
-    logger_->log_warn("Security protocol is set to %s, but no valid SSL 
Context Service property is set.", SECURITY_PROTOCOL_SSL);
-  }
-
-  headers_to_add_as_attributes_ = 
utils::listFromCommaSeparatedProperty(context, 
HeadersToAddAsAttributes.getName());
+  headers_to_add_as_attributes_ = 
utils::listFromCommaSeparatedProperty(*context, 
HeadersToAddAsAttributes.getName());
   max_poll_records_ = 
gsl::narrow<std::size_t>(utils::getOptionalUintProperty(*context, 
MaxPollRecords.getName()).value_or(DEFAULT_MAX_POLL_RECORDS));
 
   if (!utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_UTF_8, 
key_attribute_encoding_) && 
!utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_HEX, 
key_attribute_encoding_)) {
@@ -324,7 +299,7 @@ void 
ConsumeKafka::extend_config_from_dynamic_properties(const core::ProcessCont
   }
 }
 
-void ConsumeKafka::configure_new_connection(const core::ProcessContext& 
context) {
+void ConsumeKafka::configure_new_connection(core::ProcessContext& context) {
   using utils::setKafkaConfigurationField;
 
   conf_ = { rd_kafka_conf_new(), utils::rd_kafka_conf_deleter() };
@@ -342,18 +317,11 @@ void ConsumeKafka::configure_new_connection(const 
core::ProcessContext& context)
   // logger_->log_info("Enabling all debug logs for kafka consumer.");
   // setKafkaConfigurationField(*conf_, "debug", "all");
 
+  setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));
+
   setKafkaConfigurationField(*conf_, "bootstrap.servers", kafka_brokers_);
   setKafkaConfigurationField(*conf_, "allow.auto.create.topics", "true");
   setKafkaConfigurationField(*conf_, "auto.offset.reset", offset_reset_);
-
-  if (security_protocol_ == SECURITY_PROTOCOL_SSL) {
-    setKafkaConfigurationField(*conf_, "security.protocol", "ssl");
-    setKafkaConfigurationField(*conf_, "ssl.ca.location", ssl_data_.ca_loc);
-    setKafkaConfigurationField(*conf_, "ssl.certificate.location", 
ssl_data_.cert_loc);
-    setKafkaConfigurationField(*conf_, "ssl.key.location", ssl_data_.key_loc);
-    setKafkaConfigurationField(*conf_, "ssl.key.password", ssl_data_.key_pw);
-  }
-
   setKafkaConfigurationField(*conf_, "enable.auto.commit", "false");
   setKafkaConfigurationField(*conf_, "enable.auto.offset.store", "false");
   setKafkaConfigurationField(*conf_, "isolation.level", honor_transactions_ ? 
"read_committed" : "read_uncommitted");
diff --git a/extensions/librdkafka/ConsumeKafka.h 
b/extensions/librdkafka/ConsumeKafka.h
index 14a23d0..e8610e0 100644
--- a/extensions/librdkafka/ConsumeKafka.h
+++ b/extensions/librdkafka/ConsumeKafka.h
@@ -23,7 +23,7 @@
 #include <utility>
 #include <vector>
 
-#include "core/Processor.h"
+#include "KafkaProcessorBase.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "rdkafka.h"
 #include "rdkafka_utils.h"
@@ -35,13 +35,12 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-class ConsumeKafka : public core::Processor {
+class ConsumeKafka : public KafkaProcessorBase {
  public:
   EXTENSIONAPI static constexpr char const* ProcessorName = "ConsumeKafka";
 
   // Supported Properties
   EXTENSIONAPI static core::Property KafkaBrokers;
-  EXTENSIONAPI static core::Property SecurityProtocol;
   EXTENSIONAPI static core::Property TopicNames;
   EXTENSIONAPI static core::Property TopicNameFormat;
   EXTENSIONAPI static core::Property HonorTransactions;
@@ -55,7 +54,6 @@ class ConsumeKafka : public core::Processor {
   EXTENSIONAPI static core::Property MaxPollRecords;
   EXTENSIONAPI static core::Property MaxPollTime;
   EXTENSIONAPI static core::Property SessionTimeout;
-  EXTENSIONAPI static core::Property SSLContextService;
 
   // Supported Relationships
   EXTENSIONAPI static const core::Relationship Success;
@@ -98,7 +96,7 @@ class ConsumeKafka : public core::Processor {
   static constexpr const std::size_t METADATA_COMMUNICATIONS_TIMEOUT_MS{ 60000 
};
 
   explicit ConsumeKafka(const std::string& name, const utils::Identifier& uuid 
= utils::Identifier()) :
-      Processor(name, uuid) {}
+      KafkaProcessorBase(name, uuid, 
core::logging::LoggerFactory<ConsumeKafka>::getLogger()) {}
 
   ~ConsumeKafka() override = default;
 
@@ -126,7 +124,7 @@ class ConsumeKafka : public core::Processor {
  private:
   void create_topic_partition_list();
   void extend_config_from_dynamic_properties(const core::ProcessContext& 
context);
-  void configure_new_connection(const core::ProcessContext& context);
+  void configure_new_connection(core::ProcessContext& context);
   std::string extract_message(const rd_kafka_message_t& rkmessage) const;
   std::vector<std::unique_ptr<rd_kafka_message_t, 
utils::rd_kafka_message_deleter>> poll_kafka_messages();
   utils::KafkaEncoding key_attr_encoding_attr_to_enum() const;
@@ -155,7 +153,6 @@ class ConsumeKafka : public core::Processor {
   }
 
   std::string kafka_brokers_;
-  std::string security_protocol_;
   std::vector<std::string> topic_names_;
   std::string topic_name_format_;
   bool honor_transactions_;
@@ -170,8 +167,6 @@ class ConsumeKafka : public core::Processor {
   std::chrono::milliseconds max_poll_time_milliseconds_;
   std::chrono::milliseconds session_timeout_milliseconds_;
 
-  utils::SSL_data ssl_data_;
-
   std::unique_ptr<rd_kafka_t, utils::rd_kafka_consumer_deleter> consumer_;
   std::unique_ptr<rd_kafka_conf_t, utils::rd_kafka_conf_deleter> conf_;
   std::unique_ptr<rd_kafka_topic_partition_list_t, 
utils::rd_kafka_topic_partition_list_deleter> kf_topic_partition_list_;
@@ -181,8 +176,6 @@ class ConsumeKafka : public core::Processor {
   std::vector<std::unique_ptr<rd_kafka_message_t, 
utils::rd_kafka_message_deleter>> pending_messages_;
 
   std::mutex do_not_call_on_trigger_concurrently_;
-
-  std::shared_ptr<core::logging::Logger> 
logger_{core::logging::LoggerFactory<ConsumeKafka>::getLogger()};
 };
 
 }  // namespace processors
diff --git a/extensions/librdkafka/KafkaProcessorBase.cpp 
b/extensions/librdkafka/KafkaProcessorBase.cpp
new file mode 100644
index 0000000..d54404c
--- /dev/null
+++ b/extensions/librdkafka/KafkaProcessorBase.cpp
@@ -0,0 +1,135 @@
+/**
+ * @file KafkaProcessorBase.cpp
+ * KafkaProcessorBase class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "KafkaProcessorBase.h"
+
+#include "rdkafka_utils.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property KafkaProcessorBase::SecurityProtocol(
+        core::PropertyBuilder::createProperty("Security Protocol")
+        ->withDescription("Protocol used to communicate with brokers. 
Corresponds to Kafka's 'security.protocol' property.")
+        
->withDefaultValue<std::string>(toString(SecurityProtocolOption::PLAINTEXT))
+        ->withAllowableValues<std::string>(SecurityProtocolOption::values())
+        ->isRequired(true)
+        ->build());
+const core::Property KafkaProcessorBase::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+        ->withDescription("SSL Context Service Name")
+        ->asType<minifi::controllers::SSLContextService>()
+        ->build());
+const core::Property KafkaProcessorBase::KerberosServiceName(
+    core::PropertyBuilder::createProperty("Kerberos Service Name")
+        ->withDescription("Kerberos Service Name")
+        ->build());
+const core::Property KafkaProcessorBase::KerberosPrincipal(
+    core::PropertyBuilder::createProperty("Kerberos Principal")
+        ->withDescription("Keberos Principal")
+        ->build());
+const core::Property KafkaProcessorBase::KerberosKeytabPath(
+    core::PropertyBuilder::createProperty("Kerberos Keytab Path")
+        ->withDescription("The path to the location on the local filesystem 
where the kerberos keytab is located. Read permission on the file is required.")
+        ->build());
+const core::Property KafkaProcessorBase::SASLMechanism(
+        core::PropertyBuilder::createProperty("SASL Mechanism")
+        ->withDescription("The SASL mechanism to use for authentication. 
Corresponds to Kafka's 'sasl.mechanism' property.")
+        ->withDefaultValue<std::string>(toString(SASLMechanismOption::GSSAPI))
+        ->withAllowableValues<std::string>(SASLMechanismOption::values())
+        ->isRequired(true)
+        ->build());
+const core::Property KafkaProcessorBase::Username(
+    core::PropertyBuilder::createProperty("Username")
+        ->withDescription("The username when the SASL Mechanism is 
sasl_plaintext")
+        ->build());
+const core::Property KafkaProcessorBase::Password(
+    core::PropertyBuilder::createProperty("Password")
+        ->withDescription("The password for the given username when the SASL 
Mechanism is sasl_plaintext")
+        ->build());
+
+std::optional<utils::SSL_data> 
KafkaProcessorBase::getSslData(core::ProcessContext& context) const {
+  std::string ssl_service_name;
+  if (context.getProperty(SSLContextService.getName(), ssl_service_name) && 
!ssl_service_name.empty()) {
+    std::shared_ptr<core::controller::ControllerService> service = 
context.getControllerService(ssl_service_name);
+    if (service) {
+      auto ssl_service = 
std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
+      utils::SSL_data ssl_data;
+      ssl_data.ca_loc = ssl_service->getCACertificate();
+      ssl_data.cert_loc = ssl_service->getCertificateFile();
+      ssl_data.key_loc = ssl_service->getPrivateKeyFile();
+      ssl_data.key_pw = ssl_service->getPassphrase();
+      return ssl_data;
+    } else {
+      logger_->log_warn("SSL Context Service property is set to '%s', but the 
controller service could not be found.", ssl_service_name);
+      return std::nullopt;
+    }
+  } else if (security_protocol_ == SecurityProtocolOption::SSL || 
security_protocol_ == SecurityProtocolOption::SASL_SSL) {
+    logger_->log_warn("Security protocol is set to %s, but no valid SSL 
Context Service property is set.", security_protocol_.toString());
+  }
+
+  return std::nullopt;
+}
+
+void 
KafkaProcessorBase::setKafkaAuthenticationParameters(core::ProcessContext& 
context, gsl::not_null<rd_kafka_conf_t*> config) {
+  security_protocol_ = 
utils::getRequiredPropertyOrThrow<SecurityProtocolOption>(context, 
SecurityProtocol.getName());
+  utils::setKafkaConfigurationField(*config, "security.protocol", 
security_protocol_.toString());
+  logger_->log_debug("Kafka security.protocol [%s]", 
security_protocol_.toString());
+  if (security_protocol_ == SecurityProtocolOption::SSL || security_protocol_ 
== SecurityProtocolOption::SASL_SSL) {
+    auto ssl_data = getSslData(context);
+    if (ssl_data) {
+      if (ssl_data->ca_loc.empty() && ssl_data->cert_loc.empty() && 
ssl_data->key_loc.empty() && ssl_data->key_pw.empty()) {
+        logger_->log_warn("Security protocol is set to %s, but no valid 
security parameters are set in the properties or in the SSL Context Service.", 
security_protocol_.toString());
+      } else {
+        utils::setKafkaConfigurationField(*config, "ssl.ca.location", 
ssl_data->ca_loc);
+        logger_->log_debug("Kafka ssl.ca.location [%s]", ssl_data->ca_loc);
+        utils::setKafkaConfigurationField(*config, "ssl.certificate.location", 
ssl_data->cert_loc);
+        logger_->log_debug("Kafka ssl.certificate.location [%s]", 
ssl_data->cert_loc);
+        utils::setKafkaConfigurationField(*config, "ssl.key.location", 
ssl_data->key_loc);
+        logger_->log_debug("Kafka ssl.key.location [%s]", ssl_data->key_loc);
+        utils::setKafkaConfigurationField(*config, "ssl.key.password", 
ssl_data->key_pw);
+        logger_->log_debug("Kafka ssl.key.password was set");
+      }
+    }
+  }
+
+  auto sasl_mechanism = 
utils::getRequiredPropertyOrThrow<SASLMechanismOption>(context, 
SASLMechanism.getName());
+  utils::setKafkaConfigurationField(*config, "sasl.mechanism", 
sasl_mechanism.toString());
+  logger_->log_debug("Kafka sasl.mechanism [%s]", sasl_mechanism.toString());
+
+  auto setKafkaConfigIfNotEmpty = [this, &context, config](const std::string& 
property_name, const std::string& kafka_config_name, bool log_value = true) {
+    std::string value;
+    if (context.getProperty(property_name, value) && !value.empty()) {
+      utils::setKafkaConfigurationField(*config, kafka_config_name, value);
+      if (log_value) {
+        logger_->log_debug("Kafka %s [%s]", kafka_config_name, value);
+      } else {
+        logger_->log_debug("Kafka %s was set", kafka_config_name);
+      }
+    }
+  };
+
+  setKafkaConfigIfNotEmpty(KerberosServiceName.getName(), 
"sasl.kerberos.service.name");
+  setKafkaConfigIfNotEmpty(KerberosPrincipal.getName(), 
"sasl.kerberos.principal");
+  setKafkaConfigIfNotEmpty(KerberosKeytabPath.getName(), 
"sasl.kerberos.keytab");
+  setKafkaConfigIfNotEmpty(Username.getName(), "sasl.username");
+  setKafkaConfigIfNotEmpty(Password.getName(), "sasl.password", false);
+}
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/librdkafka/KafkaProcessorBase.h 
b/extensions/librdkafka/KafkaProcessorBase.h
new file mode 100644
index 0000000..6879de2
--- /dev/null
+++ b/extensions/librdkafka/KafkaProcessorBase.h
@@ -0,0 +1,69 @@
+/**
+ * @file KafkaProcessorBase.h
+ * KafkaProcessorBase class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <optional>
+#include <memory>
+#include <string>
+
+#include "core/Processor.h"
+#include "rdkafka_utils.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+// PublishKafka Class
+class KafkaProcessorBase : public core::Processor {
+ public:
+  EXTENSIONAPI static const core::Property SSLContextService;
+  EXTENSIONAPI static const core::Property SecurityProtocol;
+  EXTENSIONAPI static const core::Property KerberosServiceName;
+  EXTENSIONAPI static const core::Property KerberosPrincipal;
+  EXTENSIONAPI static const core::Property KerberosKeytabPath;
+  EXTENSIONAPI static const core::Property SASLMechanism;
+  EXTENSIONAPI static const core::Property Username;
+  EXTENSIONAPI static const core::Property Password;
+
+  SMART_ENUM(SecurityProtocolOption,
+    (PLAINTEXT, "plaintext"),
+    (SSL, "ssl"),
+    (SASL_PLAIN, "sasl_plaintext"),
+    (SASL_SSL, "sasl_ssl")
+  )
+
+  SMART_ENUM(SASLMechanismOption,
+    (GSSAPI, "GSSAPI"),
+    (PLAIN, "PLAIN")
+  )
+
+  KafkaProcessorBase(const std::string& name, const utils::Identifier& uuid, 
std::shared_ptr<core::logging::Logger> logger)
+      : core::Processor(name, uuid),
+        logger_(logger) {
+  }
+
+ protected:
+  virtual std::optional<utils::SSL_data> getSslData(core::ProcessContext& 
context) const;
+  void setKafkaAuthenticationParameters(core::ProcessContext& context, 
gsl::not_null<rd_kafka_conf_t*> config);
+
+  SecurityProtocolOption security_protocol_;
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/librdkafka/PublishKafka.cpp 
b/extensions/librdkafka/PublishKafka.cpp
index cae955f..f021bed 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -41,23 +41,9 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-#define COMPRESSION_CODEC_NONE "none"
-#define COMPRESSION_CODEC_GZIP "gzip"
-#define COMPRESSION_CODEC_SNAPPY "snappy"
-#define ROUND_ROBIN_PARTITIONING "Round Robin"
-#define RANDOM_PARTITIONING "Random Robin"
-#define USER_DEFINED_PARTITIONING "User-Defined"
-#define DELIVERY_REPLICATED "all"
-#define DELIVERY_ONE_NODE "1"
-#define DELIVERY_BEST_EFFORT "0"
-#define SECURITY_PROTOCOL_PLAINTEXT "plaintext"
-#define SECURITY_PROTOCOL_SSL "ssl"
-#define KAFKA_KEY_ATTRIBUTE "kafka.key"
-
 const core::Property PublishKafka::SeedBrokers(
     core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A 
comma-separated list of known Kafka Brokers in the format <host>:<port>")
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
-
 const core::Property PublishKafka::Topic(
     core::PropertyBuilder::createProperty("Topic Name")->withDescription("The 
Kafka Topic of interest")
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
@@ -68,7 +54,6 @@ const core::Property PublishKafka::DeliveryGuarantee(
                                                                                
  "-1 or all (block until message is committed by all in sync replicas) "
                                                                                
  "or any concrete number of nodes.")
         
->isRequired(false)->supportsExpressionLanguage(true)->withDefaultValue(DELIVERY_ONE_NODE)->build());
-
 const core::Property PublishKafka::MaxMessageSize(
     core::PropertyBuilder::createProperty("Max Request 
Size")->withDescription("Maximum Kafka protocol request message size")
         ->isRequired(false)->build());
@@ -118,33 +103,14 @@ const core::Property PublishKafka::CompressCodec(
         ->withAllowableValues<std::string>({COMPRESSION_CODEC_NONE, 
COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY})
         ->withDescription("compression codec to use for compressing message 
sets")
         ->build());
-
 const core::Property PublishKafka::MaxFlowSegSize(
     core::PropertyBuilder::createProperty("Max Flow Segment 
Size")->withDescription("Maximum flow content payload segment size for the 
kafka record. 0 B means unlimited.")
         ->isRequired(false)->withDefaultValue<core::DataSizeValue>("0 
B")->build());
-const core::Property PublishKafka::SecurityProtocol(
-        core::PropertyBuilder::createProperty("Security Protocol")
-        ->withDescription("Protocol used to communicate with brokers")
-        ->withDefaultValue<std::string>(SECURITY_PROTOCOL_PLAINTEXT)
-        ->withAllowableValues<std::string>({SECURITY_PROTOCOL_PLAINTEXT, 
SECURITY_PROTOCOL_SSL})
-        ->isRequired(true)
-        ->build());
-
-const core::Property PublishKafka::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")
-        ->withDescription("SSL Context Service Name")
-        ->asType<minifi::controllers::SSLContextService>()
-        ->build());
 
 const core::Property PublishKafka::SecurityCA("Security CA", "DEPRECATED in 
favor of SSL Context Service. File or directory path to CA certificate(s) for 
verifying the broker's key", "");
 const core::Property PublishKafka::SecurityCert("Security Cert", "DEPRECATED 
in favor of SSL Context Service.Path to client's public key (PEM) used for 
authentication", "");
 const core::Property PublishKafka::SecurityPrivateKey("Security Private Key", 
"DEPRECATED in favor of SSL Context Service.Path to client's private key (PEM) 
used for authentication", "");
 const core::Property PublishKafka::SecurityPrivateKeyPassWord("Security Pass 
Phrase", "DEPRECATED in favor of SSL Context Service.Private key passphrase", 
"");
-const core::Property PublishKafka::KerberosServiceName("Kerberos Service 
Name", "Kerberos Service Name", "");
-const core::Property PublishKafka::KerberosPrincipal("Kerberos Principal", 
"Keberos Principal", "");
-const core::Property PublishKafka::KerberosKeytabPath("Kerberos Keytab Path",
-                                                "The path to the location on 
the local filesystem where the kerberos keytab is located. Read permission on 
the file is required.", "");
-
 const core::Property PublishKafka::KafkaKey(
     core::PropertyBuilder::createProperty("Kafka Key")
         ->withDescription("The key to use for the message. If not specified, 
the UUID of the flow file is used as the message key.")
@@ -475,41 +441,44 @@ void messageDeliveryCallback(rd_kafka_t* rk, const 
rd_kafka_message_t* rkmessage
 
 void PublishKafka::initialize() {
   // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(SeedBrokers);
-  properties.insert(Topic);
-  properties.insert(DeliveryGuarantee);
-  properties.insert(MaxMessageSize);
-  properties.insert(RequestTimeOut);
-  properties.insert(MessageTimeOut);
-  properties.insert(ClientName);
-  properties.insert(AttributeNameRegex);
-  properties.insert(BatchSize);
-  properties.insert(TargetBatchPayloadSize);
-  properties.insert(QueueBufferMaxTime);
-  properties.insert(QueueBufferMaxSize);
-  properties.insert(QueueBufferMaxMessage);
-  properties.insert(CompressCodec);
-  properties.insert(MaxFlowSegSize);
-  properties.insert(SecurityProtocol);
-  properties.insert(SSLContextService);
-  properties.insert(SecurityCA);
-  properties.insert(SecurityCert);
-  properties.insert(SecurityPrivateKey);
-  properties.insert(SecurityPrivateKeyPassWord);
-  properties.insert(KerberosServiceName);
-  properties.insert(KerberosPrincipal);
-  properties.insert(KerberosKeytabPath);
-  properties.insert(KafkaKey);
-  properties.insert(MessageKeyField);
-  properties.insert(DebugContexts);
-  properties.insert(FailEmptyFlowFiles);
-  setSupportedProperties(properties);
+  setSupportedProperties({
+    SeedBrokers,
+    Topic,
+    DeliveryGuarantee,
+    MaxMessageSize,
+    RequestTimeOut,
+    MessageTimeOut,
+    ClientName,
+    AttributeNameRegex,
+    BatchSize,
+    TargetBatchPayloadSize,
+    QueueBufferMaxTime,
+    QueueBufferMaxSize,
+    QueueBufferMaxMessage,
+    CompressCodec,
+    MaxFlowSegSize,
+    SecurityProtocol,
+    SSLContextService,
+    SecurityCA,
+    SecurityCert,
+    SecurityPrivateKey,
+    SecurityPrivateKeyPassWord,
+    KerberosServiceName,
+    KerberosPrincipal,
+    KerberosKeytabPath,
+    KafkaKey,
+    MessageKeyField,
+    DebugContexts,
+    FailEmptyFlowFiles,
+    SASLMechanism,
+    Username,
+    Password
+  });
   // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Failure);
-  relationships.insert(Success);
-  setSupportedRelationships(relationships);
+  setSupportedRelationships({
+    Success,
+    Failure
+  });
 }
 
 void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory>& 
/*sessionFactory*/) {
@@ -622,33 +591,7 @@ bool PublishKafka::configureNewConnection(const 
std::shared_ptr<core::ProcessCon
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
-  value = "";
-  if (context->getProperty(KerberosServiceName.getName(), value) && 
!value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "sasl.kerberos.service.name", 
value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value);
-    if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-    }
-  }
-  value = "";
-  if (context->getProperty(KerberosPrincipal.getName(), value) && 
!value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "sasl.kerberos.principal", 
value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value);
-    if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-    }
-  }
-  value = "";
-  if (context->getProperty(KerberosKeytabPath.getName(), value) && 
!value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "sasl.kerberos.keytab", 
value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value);
-    if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-    }
-  }
+
   value = "";
   if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty()) 
{
     result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", 
value.c_str(), errstr.data(), errstr.size());
@@ -709,97 +652,8 @@ bool PublishKafka::configureNewConnection(const 
std::shared_ptr<core::ProcessCon
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
-  value = "";
-  if (context->getProperty(SecurityProtocol.getName(), value) && 
!value.empty()) {
-    if (value == SECURITY_PROTOCOL_SSL) {
-      result = rd_kafka_conf_set(conf_.get(), "security.protocol", 
value.c_str(), errstr.data(), errstr.size());
-      logger_->log_debug("PublishKafka: security.protocol [%s]", value);
-      if (result != RD_KAFKA_CONF_OK) {
-        auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
-        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-      }
-
-      std::shared_ptr<minifi::controllers::SSLContextService> ssl_service;
-      if (context->getProperty(SSLContextService.getName(), value) && 
!value.empty()) {
-        std::shared_ptr<core::controller::ControllerService> service = 
context->getControllerService(value);
-        if (service) {
-          ssl_service = 
std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-        } else {
-          logger_->log_warn("SSL Context Service property is set to '%s', but 
the controller service could not be found.", value);
-        }
-      }
-
-      std::string security_ca;
-      if (ssl_service) {
-        security_ca = ssl_service->getCACertificate();
-      } else {
-        context->getProperty(SecurityCA.getName(), security_ca);
-      }
-
-      std::string security_cert;
-      if (ssl_service) {
-        security_cert = ssl_service->getCertificateFile();
-      } else {
-        context->getProperty(SecurityCert.getName(), security_cert);
-      }
-
-      std::string security_private_key;
-      if (ssl_service) {
-        security_private_key = ssl_service->getPrivateKeyFile();
-      } else {
-        context->getProperty(SecurityPrivateKey.getName(), 
security_private_key);
-      }
-
-      std::string security_private_key_password;
-      if (ssl_service) {
-        security_private_key_password = ssl_service->getPassphrase();
-      } else {
-        context->getProperty(SecurityPrivateKeyPassWord.getName(), 
security_private_key_password);
-      }
-
-      if (!security_ca.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.ca.location", 
security_ca.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.ca.location [%s]", security_ca);
-        if (result != RD_KAFKA_CONF_OK) {
-          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
-          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-        }
-      }
-      if (!security_cert.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.certificate.location", 
security_cert.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", 
security_cert);
-        if (result != RD_KAFKA_CONF_OK) {
-          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
-          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-        }
-      }
-      if (!security_private_key.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.key.location", 
security_private_key.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.key.location [%s]", 
security_private_key);
-        if (result != RD_KAFKA_CONF_OK) {
-          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
-          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-        }
-      }
-      if (!security_private_key_password.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.key.password", 
security_private_key_password.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.key.password [%s]", 
security_private_key_password);
-        if (result != RD_KAFKA_CONF_OK) {
-          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, 
errstr.data());
-          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-        }
-      }
 
-      if (security_ca.empty() && security_cert.empty() && 
security_private_key.empty() && security_private_key_password.empty()) {
-        logger_->log_warn("Security protocol is set to %s, but no valid 
security parameters are set in the properties or in the SSL Context Service.", 
SECURITY_PROTOCOL_SSL);
-      }
-    } else if (value == SECURITY_PROTOCOL_PLAINTEXT) {
-      // Do nothing
-    } else {
-      auto error_msg = utils::StringUtils::join_pack("PublishKafka: unknown 
Security Protocol: ", value);
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-    }
-  }
+  setKafkaAuthenticationParameters(*context, gsl::make_not_null(conf_.get()));
 
   // Add all of the dynamic properties as librdkafka configurations
   const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
@@ -919,6 +773,19 @@ bool PublishKafka::createNewTopic(const 
std::shared_ptr<core::ProcessContext> &c
   return true;
 }
 
+std::optional<utils::SSL_data> PublishKafka::getSslData(core::ProcessContext& 
context) const {
+  if (auto result = KafkaProcessorBase::getSslData(context); result) {
+    return result;
+  }
+
+  utils::SSL_data ssl_data;
+  context.getProperty(SecurityCA.getName(), ssl_data.ca_loc);
+  context.getProperty(SecurityCert.getName(), ssl_data.cert_loc);
+  context.getProperty(SecurityPrivateKey.getName(), ssl_data.key_loc);
+  context.getProperty(SecurityPrivateKeyPassWord.getName(), ssl_data.key_pw);
+  return ssl_data;
+}
+
 void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSession> &session) {
   // Check whether we have been interrupted
   if (interrupted_) {
diff --git a/extensions/librdkafka/PublishKafka.h 
b/extensions/librdkafka/PublishKafka.h
index c2a8197..3108c54 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -32,9 +32,9 @@
 #include <vector>
 #include <regex>
 
+#include "KafkaProcessorBase.h"
 #include "utils/GeneralUtils.h"
 #include "FlowFileRecord.h"
-#include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
 #include "core/Property.h"
@@ -51,7 +51,7 @@ namespace minifi {
 namespace processors {
 
 // PublishKafka Class
-class PublishKafka : public core::Processor {
+class PublishKafka : public KafkaProcessorBase {
  public:
   static constexpr char const* ProcessorName = "PublishKafka";
 
@@ -71,15 +71,10 @@ class PublishKafka : public core::Processor {
   EXTENSIONAPI static const core::Property QueueBufferMaxMessage;
   EXTENSIONAPI static const core::Property CompressCodec;
   EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property SSLContextService;
-  EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassWord;
-  EXTENSIONAPI static const core::Property KerberosServiceName;
-  EXTENSIONAPI static const core::Property KerberosPrincipal;
-  EXTENSIONAPI static const core::Property KerberosKeytabPath;
   EXTENSIONAPI static const core::Property KafkaKey;
   EXTENSIONAPI static const core::Property MessageKeyField;
   EXTENSIONAPI static const core::Property DebugContexts;
@@ -89,8 +84,19 @@ class PublishKafka : public core::Processor {
   EXTENSIONAPI static const core::Relationship Failure;
   EXTENSIONAPI static const core::Relationship Success;
 
+  static constexpr const char* COMPRESSION_CODEC_NONE = "none";
+  static constexpr const char* COMPRESSION_CODEC_GZIP = "gzip";
+  static constexpr const char* COMPRESSION_CODEC_SNAPPY = "snappy";
+  static constexpr const char* ROUND_ROBIN_PARTITIONING = "Round Robin";
+  static constexpr const char* RANDOM_PARTITIONING = "Random Robin";
+  static constexpr const char* USER_DEFINED_PARTITIONING = "User-Defined";
+  static constexpr const char* DELIVERY_REPLICATED = "all";
+  static constexpr const char* DELIVERY_ONE_NODE = "1";
+  static constexpr const char* DELIVERY_BEST_EFFORT = "0";
+  static constexpr const char* KAFKA_KEY_ATTRIBUTE = "kafka.key";
+
   explicit PublishKafka(const std::string& name, const utils::Identifier& uuid 
= {})
-      : core::Processor(name, uuid) {
+      : KafkaProcessorBase(name, uuid, 
core::logging::LoggerFactory<PublishKafka>::getLogger()) {
   }
 
   ~PublishKafka() override = default;
@@ -113,14 +119,13 @@ class PublishKafka : public core::Processor {
  protected:
   bool configureNewConnection(const std::shared_ptr<core::ProcessContext> 
&context);
   bool createNewTopic(const std::shared_ptr<core::ProcessContext> &context, 
const std::string& topic_name, const std::shared_ptr<core::FlowFile>& 
flow_file);
+  std::optional<utils::SSL_data> getSslData(core::ProcessContext& context) 
const override;
 
  private:
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
   }
 
-  std::shared_ptr<core::logging::Logger> 
logger_{core::logging::LoggerFactory<PublishKafka>::getLogger()};
-
   KafkaConnectionKey key_;
   std::unique_ptr<KafkaConnection> conn_;
   std::mutex connection_mutex_;
diff --git a/libminifi/include/utils/ProcessorConfigUtils.h 
b/libminifi/include/utils/ProcessorConfigUtils.h
index acf7fd5..9f28de8 100644
--- a/libminifi/include/utils/ProcessorConfigUtils.h
+++ b/libminifi/include/utils/ProcessorConfigUtils.h
@@ -30,11 +30,19 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, 
const std::string& property_name);
-std::vector<std::string> listFromCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name);
-std::vector<std::string> listFromRequiredCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name);
-bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const 
std::string& property_name);
-std::chrono::milliseconds parseTimePropertyMSOrThrow(core::ProcessContext* 
context, const std::string& property_name);
+template<typename PropertyType = std::string>
+PropertyType getRequiredPropertyOrThrow(const core::ProcessContext& context, 
const std::string& property_name) {
+  PropertyType value;
+  if (!context.getProperty(property_name, value)) {
+    throw std::runtime_error(property_name + " property missing or invalid");
+  }
+  return value;
+}
+
+std::vector<std::string> listFromCommaSeparatedProperty(const 
core::ProcessContext& context, const std::string& property_name);
+std::vector<std::string> listFromRequiredCommaSeparatedProperty(const 
core::ProcessContext& context, const std::string& property_name);
+bool parseBooleanPropertyOrThrow(const core::ProcessContext& context, const 
std::string& property_name);
+std::chrono::milliseconds parseTimePropertyMSOrThrow(const 
core::ProcessContext& context, const std::string& property_name);
 std::optional<uint64_t> getOptionalUintProperty(const core::ProcessContext& 
context, const std::string& property_name);
 std::string parsePropertyWithAllowableValuesOrThrow(const 
core::ProcessContext& context, const std::string& property_name, const 
std::set<std::string>& allowable_values);
 
diff --git a/libminifi/src/utils/ProcessorConfigUtils.cpp 
b/libminifi/src/utils/ProcessorConfigUtils.cpp
index a423d87..93b4ec8 100644
--- a/libminifi/src/utils/ProcessorConfigUtils.cpp
+++ b/libminifi/src/utils/ProcessorConfigUtils.cpp
@@ -28,25 +28,17 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, 
const std::string& property_name) {
-  std::string value;
-  if (!context->getProperty(property_name, value)) {
-    throw std::runtime_error(property_name + " property missing or invalid");
-  }
-  return value;
-}
-
-std::vector<std::string> listFromCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+std::vector<std::string> listFromCommaSeparatedProperty(const 
core::ProcessContext& context, const std::string& property_name) {
   std::string property_string;
-  context->getProperty(property_name, property_string);
+  context.getProperty(property_name, property_string);
   return utils::StringUtils::splitAndTrim(property_string, ",");
 }
 
-std::vector<std::string> listFromRequiredCommaSeparatedProperty(const 
core::ProcessContext* context, const std::string& property_name) {
+std::vector<std::string> listFromRequiredCommaSeparatedProperty(const 
core::ProcessContext& context, const std::string& property_name) {
   return utils::StringUtils::splitAndTrim(getRequiredPropertyOrThrow(context, 
property_name), ",");
 }
 
-bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const 
std::string& property_name) {
+bool parseBooleanPropertyOrThrow(const core::ProcessContext& context, const 
std::string& property_name) {
   const std::string value_str = getRequiredPropertyOrThrow(context, 
property_name);
   const auto maybe_value = utils::StringUtils::toBool(value_str);
   if (!maybe_value) {
@@ -55,7 +47,7 @@ bool parseBooleanPropertyOrThrow(core::ProcessContext* 
context, const std::strin
   return maybe_value.value();
 }
 
-std::chrono::milliseconds parseTimePropertyMSOrThrow(core::ProcessContext* 
context, const std::string& property_name) {
+std::chrono::milliseconds parseTimePropertyMSOrThrow(const 
core::ProcessContext& context, const std::string& property_name) {
   core::TimeUnit unit;
   uint64_t time_value_ms;
   const std::string value_str = getRequiredPropertyOrThrow(context, 
property_name);

Reply via email to