This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new eb5d172693 NIFI-10991 Add AWS MSK IAM support to Kafka processors
eb5d172693 is described below
commit eb5d172693fcaa24804d9e5d7ab3fd77193efc14
Author: Nandor Soma Abonyi <[email protected]>
AuthorDate: Wed Dec 21 13:39:52 2022 +0100
NIFI-10991 Add AWS MSK IAM support to Kafka processors
This closes #6846.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
nifi-assembly/NOTICE | 44 +++
.../src/main/resources/META-INF/NOTICE | 328 ++++++++++++++++++++-
.../nifi-kafka-2-0-processors/pom.xml | 13 +
.../kafka/pubsub/ConsumeKafkaRecord_2_0.java | 1 +
.../processors/kafka/pubsub/ConsumeKafka_2_0.java | 1 +
.../kafka/pubsub/PublishKafkaRecord_2_0.java | 1 +
.../processors/kafka/pubsub/PublishKafka_2_0.java | 1 +
.../src/main/resources/META-INF/NOTICE | 328 ++++++++++++++++++++-
.../nifi-kafka-2-6-processors/pom.xml | 13 +
.../kafka/pubsub/ConsumeKafkaRecord_2_6.java | 1 +
.../processors/kafka/pubsub/ConsumeKafka_2_6.java | 1 +
.../kafka/pubsub/PublishKafkaRecord_2_6.java | 1 +
.../processors/kafka/pubsub/PublishKafka_2_6.java | 1 +
.../shared/component/KafkaClientComponent.java | 15 +-
.../shared/login/AwsMskIamLoginConfigProvider.java | 46 +++
.../login/DelegatingLoginConfigProvider.java | 1 +
.../KerberosUserServiceLoginConfigProvider.java | 49 +--
.../kafka/shared/login/LoginConfigBuilder.java | 74 +++++
.../shared/login/ScramLoginConfigProvider.java | 20 +-
.../kafka/shared/property/KafkaClientProperty.java | 2 +
.../nifi/kafka/shared/property/SaslMechanism.java | 15 +-
.../provider/StandardKafkaPropertyProvider.java | 17 +-
.../KafkaClientCustomValidationFunction.java | 20 ++
.../shared/login/LoginConfigBuilderTest.java} | 41 +--
nifi-nar-bundles/nifi-kafka-bundle/pom.xml | 9 +-
25 files changed, 950 insertions(+), 93 deletions(-)
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index f3529d5320..2da146f69d 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -199,6 +199,18 @@ The following binary components are provided under the
Apache Software License v
This project contains annotations derived from JCIP-ANNOTATIONS
Copyright (c) 2005 Brian Goetz and Tim Peierls. See http://www.jcip.net
+ Apache HttpComponents Client
+ Copyright 1999-2020 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ Apache HttpComponents Core
+ Copyright 2005-2020 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
(ASLv2) Apache Jakarta HttpClient
The following NOTICE information applies:
Apache Jakarta HttpClient
@@ -1134,6 +1146,23 @@ The following binary components are provided under the
Apache Software License v
Since product implements StAX API, it has dependencies to StAX API
classes.
+ (ASLv2) AWS SDK for Java
+ The following NOTICE information applies:
+ AWS SDK for Java
+ Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights
Reserved.
+
+ This product includes software developed by
+ Amazon Technologies, Inc (http://www.amazon.com/).
+
+ **********************
+ THIRD PARTY COMPONENTS
+ **********************
+ This software includes third party software subject to the following
copyrights:
+ - XML parsing and utility functions from JetS3t - Copyright 2006-2009
James Murty.
+ - PKCS#1 PEM encoded private key parsing and utility functions from
oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+
+ The licenses for these third party components are included in LICENSE.txt
+
(ASLv2) AWS SDK for Java 2.0
The following NOTICE information applies:
Copyright 2010-2021 Amazon.com, Inc. or its affiliates. All Rights
Reserved.
@@ -1155,6 +1184,16 @@ The following binary components are provided under the
Apache Software License v
- XML parsing and utility functions from JetS3t - Copyright 2006-2009
James Murty.
- PKCS#1 PEM encoded private key parsing and utility functions from
oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+ (ASLv2) AWS EventStream for Java
+ The following NOTICE information applies:
+ AWS EventStream for Java
+ Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+ (ASLv2) Amazon Ion Java
+ The following NOTICE information applies:
+ Amazon Ion Java
+ Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights
Reserved.
+
(ASLv2) Apache Commons DBCP
The following NOTICE information applies:
Apache Commons DBCP
@@ -2392,6 +2431,11 @@ The following binary components are provided under the
Apache Software License v
The Box SDK for Java
Copyright 2019 Box, Inc. All rights reserved.
+ (ASLv2) aws-msk-iam-auth
+ The following NOTICE information applies:
+ aws-msk-iam-auth
+ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
************************
Common Development and Distribution License 1.1
************************
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-nar/src/main/resources/META-INF/NOTICE
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-nar/src/main/resources/META-INF/NOTICE
index fc058deadf..5c63df88cb 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-nar/src/main/resources/META-INF/NOTICE
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-nar/src/main/resources/META-INF/NOTICE
@@ -1,5 +1,5 @@
-nifi-kafka-1-0-nar
-Copyright 2014-2022 The Apache Software Foundation
+nifi-kafka-2-0-nar
+Copyright 2014-2023 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
@@ -81,3 +81,327 @@ The following binary components are provided under the
Apache Software License v
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.
+ (ASLv2) aws-msk-iam-auth
+ The following NOTICE information applies:
+ aws-msk-iam-auth
+ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+ (ASLv2) AWS SDK for Java
+ The following NOTICE information applies:
+ AWS SDK for Java
+ Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights
Reserved.
+
+ This product includes software developed by
+ Amazon Technologies, Inc (http://www.amazon.com/).
+
+ **********************
+ THIRD PARTY COMPONENTS
+ **********************
+ This software includes third party software subject to the following
copyrights:
+ - XML parsing and utility functions from JetS3t - Copyright 2006-2009
James Murty.
+ - PKCS#1 PEM encoded private key parsing and utility functions from
oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+
+ The licenses for these third party components are included in LICENSE.txt
+
+ (ASLv2) AWS EventStream for Java
+ The following NOTICE information applies:
+ AWS EventStream for Java
+ Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+ (ASLv2) Apache HttpComponents
+ The following NOTICE information applies:
+ Apache HttpComponents Client
+ Copyright 1999-2020 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ Apache HttpComponents Core
+ Copyright 2005-2020 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Amazon Ion Java
+ The following NOTICE information applies:
+ Amazon Ion Java
+ Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights
Reserved.
+
+ (ASLv2) Joda-Time
+ The following NOTICE information applies:
+ This product includes software developed by
+ Joda.org (http://www.joda.org/).
+
+ (ASLv2) The Netty Project
+ The following NOTICE information applies:
+ The Netty Project
+ =================
+
+ Please visit the Netty web site for more information:
+
+ * https://netty.io/
+
+ Copyright 2014 The Netty Project
+
+ The Netty Project licenses this file to you under the Apache License,
+ version 2.0 (the "License"); you may not use this file except in
compliance
+ with the License. You may obtain a copy of the License at:
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and limitations
+ under the License.
+
+ Also, please refer to each LICENSE.<component>.txt file, which is
located in
+ the 'license' directory of the distribution file, for the license terms
of the
+ components that this product depends on.
+
+
-------------------------------------------------------------------------------
+ This product contains the extensions to Java Collections Framework which
has
+ been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
+
+ * LICENSE:
+ * license/LICENSE.jsr166y.txt (Public Domain)
+ * HOMEPAGE:
+ * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
+ *
http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
+
+ This product contains a modified version of Robert Harder's Public Domain
+ Base64 Encoder and Decoder, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.base64.txt (Public Domain)
+ * HOMEPAGE:
+ * http://iharder.sourceforge.net/current/java/base64/
+
+ This product contains a modified portion of 'Webbit', an event based
+ WebSocket and HTTP server, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.webbit.txt (BSD License)
+ * HOMEPAGE:
+ * https://github.com/joewalnes/webbit
+
+ This product contains a modified portion of 'SLF4J', a simple logging
+ facade for Java, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.slf4j.txt (MIT License)
+ * HOMEPAGE:
+ * https://www.slf4j.org/
+
+ This product contains a modified portion of 'Apache Harmony', an open
source
+ Java SE, which can be obtained at:
+
+ * NOTICE:
+ * license/NOTICE.harmony.txt
+ * LICENSE:
+ * license/LICENSE.harmony.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://archive.apache.org/dist/harmony/
+
+ This product contains a modified portion of 'jbzip2', a Java bzip2
compression
+ and decompression library written by Matthew J. Francis. It can be
obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jbzip2.txt (MIT License)
+ * HOMEPAGE:
+ * https://code.google.com/p/jbzip2/
+
+ This product contains a modified portion of 'libdivsufsort', a C API
library to construct
+ the suffix array and the Burrows-Wheeler transformed string for any
input string of
+ a constant-size alphabet written by Yuta Mori. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.libdivsufsort.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/y-256/libdivsufsort
+
+ This product contains a modified portion of Nitsan Wakart's 'JCTools',
Java Concurrency Tools for the JVM,
+ which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jctools.txt (ASL2 License)
+ * HOMEPAGE:
+ * https://github.com/JCTools/JCTools
+
+ This product optionally depends on 'JZlib', a re-implementation of zlib
in
+ pure Java, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jzlib.txt (BSD style License)
+ * HOMEPAGE:
+ * http://www.jcraft.com/jzlib/
+
+ This product optionally depends on 'Compress-LZF', a Java library for
encoding and
+ decoding data in LZF format, written by Tatu Saloranta. It can be
obtained at:
+
+ * LICENSE:
+ * license/LICENSE.compress-lzf.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/ning/compress
+
+ This product optionally depends on 'lz4', a LZ4 Java compression
+ and decompression library written by Adrien Grand. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.lz4.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jpountz/lz4-java
+
+ This product optionally depends on 'lzma-java', a LZMA Java compression
+ and decompression library, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.lzma-java.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jponge/lzma-java
+
+ This product optionally depends on 'zstd-jni', a zstd-jni Java
compression
+ and decompression library, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.zstd-jni.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/luben/zstd-jni
+
+ This product contains a modified portion of 'jfastlz', a Java port of
FastLZ compression
+ and decompression library written by William Kinney. It can be obtained
at:
+
+ * LICENSE:
+ * license/LICENSE.jfastlz.txt (MIT License)
+ * HOMEPAGE:
+ * https://code.google.com/p/jfastlz/
+
+ This product contains a modified portion of and optionally depends on
'Protocol Buffers', Google's data
+ interchange format, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.protobuf.txt (New BSD License)
+ * HOMEPAGE:
+ * https://github.com/google/protobuf
+
+ This product optionally depends on 'Bouncy Castle Crypto APIs' to
generate
+ a temporary self-signed X.509 certificate when the JVM does not provide
the
+ equivalent functionality. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.bouncycastle.txt (MIT License)
+ * HOMEPAGE:
+ * https://www.bouncycastle.org/
+
+ This product optionally depends on 'Snappy', a compression library
produced
+ by Google Inc, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.snappy.txt (New BSD License)
+ * HOMEPAGE:
+ * https://github.com/google/snappy
+
+ This product optionally depends on 'JBoss Marshalling', an alternative
Java
+ serialization API, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jboss-marshalling.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jboss-remoting/jboss-marshalling
+
+ This product optionally depends on 'Caliper', Google's micro-
+ benchmarking framework, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.caliper.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/google/caliper
+
+ This product optionally depends on 'Apache Commons Logging', a logging
+ framework, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.commons-logging.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://commons.apache.org/logging/
+
+ This product optionally depends on 'Apache Log4J', a logging framework,
which
+ can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.log4j.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://logging.apache.org/log4j/
+
+ This product optionally depends on 'Aalto XML', an ultra-high performance
+ non-blocking XML processor, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.aalto-xml.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://wiki.fasterxml.com/AaltoHome
+
+ This product contains a modified version of 'HPACK', a Java
implementation of
+ the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.hpack.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/twitter/hpack
+
+ This product contains a modified version of 'HPACK', a Java
implementation of
+ the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained
at:
+
+ * LICENSE:
+ * license/LICENSE.hyper-hpack.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/python-hyper/hpack/
+
+ This product contains a modified version of 'HPACK', a Java
implementation of
+ the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be
obtained at:
+
+ * LICENSE:
+ * license/LICENSE.nghttp2-hpack.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/nghttp2/nghttp2/
+
+ This product contains a modified portion of 'Apache Commons Lang', a
Java library
+ provides utilities for the java.lang API, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.commons-lang.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://commons.apache.org/proper/commons-lang/
+
+
+ This product contains the Maven wrapper scripts from 'Maven Wrapper',
that provides an easy way to ensure a user has everything necessary to run the
Maven build.
+
+ * LICENSE:
+ * license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/takari/maven-wrapper
+
+ This product contains the dnsinfo.h header file, that provides a way to
retrieve the system DNS configuration on MacOS.
+ This private header is also used by Apple's open source
+ mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/).
+
+ * LICENSE:
+ * license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0)
+ * HOMEPAGE:
+ *
https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h
+
+ This product optionally depends on 'Brotli4j', Brotli compression and
+ decompression for Java., which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.brotli4j.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/hyperxpro/Brotli4j
+
+************************
+Creative Commons Zero license version 1.0
+************************
+
+The following binary components are provided under the Creative Commons Zero
license version 1.0. See project link for details.
+
+ (CC0v1.0) Reactive Streams (org.reactivestreams:reactive-streams:jar:1.0.3
- http://www.reactive-streams.org/)
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
index 35351ad649..63f45a8d42 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
@@ -135,5 +135,18 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>include-kafka-aws</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>software.amazon.msk</groupId>
+ <artifactId>aws-msk-iam-auth</artifactId>
+ <version>${aws-msk-iam-auth.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
</project>
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
index e0e0cb5894..c8a9d7d7ec 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
@@ -266,6 +266,7 @@ public class ConsumeKafkaRecord_2_0 extends
AbstractProcessor implements KafkaCl
descriptors.add(SASL_USERNAME);
descriptors.add(SASL_PASSWORD);
descriptors.add(TOKEN_AUTHENTICATION);
+ descriptors.add(AWS_PROFILE_NAME);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(GROUP_ID);
descriptors.add(SEPARATE_BY_KEY);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
index e28852cdc9..94bc7a7c11 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
@@ -249,6 +249,7 @@ public class ConsumeKafka_2_0 extends AbstractProcessor
implements KafkaClientCo
descriptors.add(SASL_USERNAME);
descriptors.add(SASL_PASSWORD);
descriptors.add(TOKEN_AUTHENTICATION);
+ descriptors.add(AWS_PROFILE_NAME);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(TOPICS);
descriptors.add(TOPIC_TYPE);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
index b4f398464c..340d308c04 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
@@ -307,6 +307,7 @@ public class PublishKafkaRecord_2_0 extends
AbstractProcessor implements KafkaPu
properties.add(SASL_USERNAME);
properties.add(SASL_PASSWORD);
properties.add(TOKEN_AUTHENTICATION);
+ properties.add(AWS_PROFILE_NAME);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(MESSAGE_KEY_FIELD);
properties.add(MAX_REQUEST_SIZE);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
index 9bc312fc9e..2c6bfe8b45 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
@@ -291,6 +291,7 @@ public class PublishKafka_2_0 extends AbstractProcessor
implements KafkaPublishC
properties.add(SASL_USERNAME);
properties.add(SASL_PASSWORD);
properties.add(TOKEN_AUTHENTICATION);
+ properties.add(AWS_PROFILE_NAME);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(TOPIC);
properties.add(DELIVERY_GUARANTEE);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-nar/src/main/resources/META-INF/NOTICE
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-nar/src/main/resources/META-INF/NOTICE
index 0df5bf90a5..08530c98b9 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-nar/src/main/resources/META-INF/NOTICE
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-nar/src/main/resources/META-INF/NOTICE
@@ -1,5 +1,5 @@
-nifi-kafka-2-5-nar
-Copyright 2014-2022 The Apache Software Foundation
+nifi-kafka-2-6-nar
+Copyright 2014-2023 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
@@ -81,3 +81,327 @@ The following binary components are provided under the
Apache Software License v
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.
+ (ASLv2) aws-msk-iam-auth
+ The following NOTICE information applies:
+ aws-msk-iam-auth
+ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+ (ASLv2) AWS SDK for Java
+ The following NOTICE information applies:
+ AWS SDK for Java
+ Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights
Reserved.
+
+ This product includes software developed by
+ Amazon Technologies, Inc (http://www.amazon.com/).
+
+ **********************
+ THIRD PARTY COMPONENTS
+ **********************
+ This software includes third party software subject to the following
copyrights:
+ - XML parsing and utility functions from JetS3t - Copyright 2006-2009
James Murty.
+ - PKCS#1 PEM encoded private key parsing and utility functions from
oauth.googlecode.com - Copyright 1998-2010 AOL Inc.
+
+ The licenses for these third party components are included in LICENSE.txt
+
+ (ASLv2) AWS EventStream for Java
+ The following NOTICE information applies:
+ AWS EventStream for Java
+ Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+ (ASLv2) Apache HttpComponents
+ The following NOTICE information applies:
+ Apache HttpComponents Client
+ Copyright 1999-2020 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ Apache HttpComponents Core
+ Copyright 2005-2020 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Amazon Ion Java
+ The following NOTICE information applies:
+ Amazon Ion Java
+ Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights
Reserved.
+
+ (ASLv2) Joda-Time
+ The following NOTICE information applies:
+ This product includes software developed by
+ Joda.org (http://www.joda.org/).
+
+ (ASLv2) The Netty Project
+ The following NOTICE information applies:
+ The Netty Project
+ =================
+
+ Please visit the Netty web site for more information:
+
+ * https://netty.io/
+
+ Copyright 2014 The Netty Project
+
+ The Netty Project licenses this file to you under the Apache License,
+ version 2.0 (the "License"); you may not use this file except in
compliance
+ with the License. You may obtain a copy of the License at:
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and limitations
+ under the License.
+
+ Also, please refer to each LICENSE.<component>.txt file, which is
located in
+ the 'license' directory of the distribution file, for the license terms
of the
+ components that this product depends on.
+
+
-------------------------------------------------------------------------------
+ This product contains the extensions to Java Collections Framework which
has
+ been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
+
+ * LICENSE:
+ * license/LICENSE.jsr166y.txt (Public Domain)
+ * HOMEPAGE:
+ * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
+ *
http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
+
+ This product contains a modified version of Robert Harder's Public Domain
+ Base64 Encoder and Decoder, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.base64.txt (Public Domain)
+ * HOMEPAGE:
+ * http://iharder.sourceforge.net/current/java/base64/
+
+ This product contains a modified portion of 'Webbit', an event based
+ WebSocket and HTTP server, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.webbit.txt (BSD License)
+ * HOMEPAGE:
+ * https://github.com/joewalnes/webbit
+
+ This product contains a modified portion of 'SLF4J', a simple logging
+ facade for Java, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.slf4j.txt (MIT License)
+ * HOMEPAGE:
+ * https://www.slf4j.org/
+
+ This product contains a modified portion of 'Apache Harmony', an open
source
+ Java SE, which can be obtained at:
+
+ * NOTICE:
+ * license/NOTICE.harmony.txt
+ * LICENSE:
+ * license/LICENSE.harmony.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://archive.apache.org/dist/harmony/
+
+ This product contains a modified portion of 'jbzip2', a Java bzip2
compression
+ and decompression library written by Matthew J. Francis. It can be
obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jbzip2.txt (MIT License)
+ * HOMEPAGE:
+ * https://code.google.com/p/jbzip2/
+
+ This product contains a modified portion of 'libdivsufsort', a C API
library to construct
+ the suffix array and the Burrows-Wheeler transformed string for any
input string of
+ a constant-size alphabet written by Yuta Mori. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.libdivsufsort.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/y-256/libdivsufsort
+
+ This product contains a modified portion of Nitsan Wakart's 'JCTools',
Java Concurrency Tools for the JVM,
+ which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jctools.txt (ASL2 License)
+ * HOMEPAGE:
+ * https://github.com/JCTools/JCTools
+
+ This product optionally depends on 'JZlib', a re-implementation of zlib
in
+ pure Java, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jzlib.txt (BSD style License)
+ * HOMEPAGE:
+ * http://www.jcraft.com/jzlib/
+
+ This product optionally depends on 'Compress-LZF', a Java library for
encoding and
+ decoding data in LZF format, written by Tatu Saloranta. It can be
obtained at:
+
+ * LICENSE:
+ * license/LICENSE.compress-lzf.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/ning/compress
+
+ This product optionally depends on 'lz4', a LZ4 Java compression
+ and decompression library written by Adrien Grand. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.lz4.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jpountz/lz4-java
+
+ This product optionally depends on 'lzma-java', a LZMA Java compression
+ and decompression library, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.lzma-java.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jponge/lzma-java
+
+ This product optionally depends on 'zstd-jni', a zstd-jni Java
compression
+ and decompression library, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.zstd-jni.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/luben/zstd-jni
+
+ This product contains a modified portion of 'jfastlz', a Java port of
FastLZ compression
+ and decompression library written by William Kinney. It can be obtained
at:
+
+ * LICENSE:
+ * license/LICENSE.jfastlz.txt (MIT License)
+ * HOMEPAGE:
+ * https://code.google.com/p/jfastlz/
+
+ This product contains a modified portion of and optionally depends on
'Protocol Buffers', Google's data
+ interchange format, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.protobuf.txt (New BSD License)
+ * HOMEPAGE:
+ * https://github.com/google/protobuf
+
+ This product optionally depends on 'Bouncy Castle Crypto APIs' to
generate
+ a temporary self-signed X.509 certificate when the JVM does not provide
the
+ equivalent functionality. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.bouncycastle.txt (MIT License)
+ * HOMEPAGE:
+ * https://www.bouncycastle.org/
+
+ This product optionally depends on 'Snappy', a compression library
produced
+ by Google Inc, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.snappy.txt (New BSD License)
+ * HOMEPAGE:
+ * https://github.com/google/snappy
+
+ This product optionally depends on 'JBoss Marshalling', an alternative
Java
+ serialization API, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jboss-marshalling.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jboss-remoting/jboss-marshalling
+
+ This product optionally depends on 'Caliper', Google's micro-
+ benchmarking framework, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.caliper.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/google/caliper
+
+ This product optionally depends on 'Apache Commons Logging', a logging
+ framework, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.commons-logging.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://commons.apache.org/logging/
+
+ This product optionally depends on 'Apache Log4J', a logging framework,
which
+ can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.log4j.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://logging.apache.org/log4j/
+
+ This product optionally depends on 'Aalto XML', an ultra-high performance
+ non-blocking XML processor, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.aalto-xml.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://wiki.fasterxml.com/AaltoHome
+
+ This product contains a modified version of 'HPACK', a Java
implementation of
+ the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.hpack.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/twitter/hpack
+
+ This product contains a modified version of 'HPACK', a Java
implementation of
+ the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained
at:
+
+ * LICENSE:
+ * license/LICENSE.hyper-hpack.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/python-hyper/hpack/
+
+ This product contains a modified version of 'HPACK', a Java
implementation of
+ the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be
obtained at:
+
+ * LICENSE:
+ * license/LICENSE.nghttp2-hpack.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/nghttp2/nghttp2/
+
+ This product contains a modified portion of 'Apache Commons Lang', a
Java library
+ provides utilities for the java.lang API, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.commons-lang.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://commons.apache.org/proper/commons-lang/
+
+
+ This product contains the Maven wrapper scripts from 'Maven Wrapper',
that provides an easy way to ensure a user has everything necessary to run the
Maven build.
+
+ * LICENSE:
+ * license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/takari/maven-wrapper
+
+ This product contains the dnsinfo.h header file, that provides a way to
retrieve the system DNS configuration on MacOS.
+ This private header is also used by Apple's open source
+ mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/).
+
+ * LICENSE:
+ * license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0)
+ * HOMEPAGE:
+ *
https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h
+
+ This product optionally depends on 'Brotli4j', Brotli compression and
+ decompression for Java., which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.brotli4j.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/hyperxpro/Brotli4j
+
+************************
+Creative Commons Zero license version 1.0
+************************
+
+The following binary components are provided under the Creative Commons Zero
license version 1.0. See project link for details.
+
+ (CC0v1.0) Reactive Streams (org.reactivestreams:reactive-streams:jar:1.0.3
- http://www.reactive-streams.org/)
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml
index 255bd06cbd..5c439e96cb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml
@@ -147,5 +147,18 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>include-kafka-aws</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>software.amazon.msk</groupId>
+ <artifactId>aws-msk-iam-auth</artifactId>
+ <version>${aws-msk-iam-auth.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
</project>
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index c121355cf3..025a830552 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -320,6 +320,7 @@ public class ConsumeKafkaRecord_2_6 extends
AbstractProcessor implements KafkaCl
descriptors.add(SASL_USERNAME);
descriptors.add(SASL_PASSWORD);
descriptors.add(TOKEN_AUTHENTICATION);
+ descriptors.add(AWS_PROFILE_NAME);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(SEPARATE_BY_KEY);
descriptors.add(AUTO_OFFSET_RESET);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index 5976175b85..076cd474bb 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -274,6 +274,7 @@ public class ConsumeKafka_2_6 extends AbstractProcessor
implements KafkaClientCo
descriptors.add(SASL_USERNAME);
descriptors.add(SASL_PASSWORD);
descriptors.add(TOKEN_AUTHENTICATION);
+ descriptors.add(AWS_PROFILE_NAME);
descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(AUTO_OFFSET_RESET);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
index 5a445beca2..8b886584c5 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
@@ -350,6 +350,7 @@ public class PublishKafkaRecord_2_6 extends
AbstractProcessor implements KafkaPu
properties.add(SASL_USERNAME);
properties.add(SASL_PASSWORD);
properties.add(TOKEN_AUTHENTICATION);
+ properties.add(AWS_PROFILE_NAME);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(MESSAGE_KEY_FIELD);
properties.add(MAX_REQUEST_SIZE);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
index 4f94446570..2a08b1efb5 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
@@ -305,6 +305,7 @@ public class PublishKafka_2_6 extends AbstractProcessor
implements KafkaPublishC
properties.add(KERBEROS_KEYTAB);
properties.add(SASL_USERNAME);
properties.add(SASL_PASSWORD);
+ properties.add(AWS_PROFILE_NAME);
properties.add(TOKEN_AUTHENTICATION);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(KEY);
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
index 206bd479c9..d958997053 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java
@@ -58,7 +58,7 @@ public interface KafkaClientComponent {
.description("SASL mechanism used for authentication. Corresponds
to Kafka Client sasl.mechanism property")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .allowableValues(SaslMechanism.class)
+ .allowableValues(SaslMechanism.getAvailableSaslMechanisms())
.defaultValue(SaslMechanism.GSSAPI.getValue())
.build();
@@ -107,6 +107,19 @@ public interface KafkaClientComponent {
)
.build();
+ PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder()
+ .name("aws.profile.name")
+ .displayName("AWS Profile Name")
+ .description("The Amazon Web Services Profile to select when
multiple profiles are available.")
+ .dependsOn(
+ SASL_MECHANISM,
+ SaslMechanism.AWS_MSK_IAM
+ )
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.context.service")
.displayName("SSL Context Service")
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
new file mode 100644
index 0000000000..a5adfc12d1
--- /dev/null
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/AwsMskIamLoginConfigProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.util.StringUtils;
+
+import static
javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
+
+/**
+ * SASL AWS MSK IAM Login Module implementation of configuration provider
+ */
+public class AwsMskIamLoginConfigProvider implements LoginConfigProvider {
+
+ private static final String MODULE_CLASS =
"software.amazon.msk.auth.iam.IAMLoginModule";
+
+ private static final String AWS_PROFILE_NAME_KEY = "awsProfileName";
+
+ @Override
+ public String getConfiguration(PropertyContext context) {
+ final String awsProfileName =
context.getProperty(KafkaClientComponent.AWS_PROFILE_NAME).evaluateAttributeExpressions().getValue();
+
+ final LoginConfigBuilder builder = new
LoginConfigBuilder(MODULE_CLASS, REQUIRED);
+
+ if (StringUtils.isNotBlank(awsProfileName)) {
+ builder.append(AWS_PROFILE_NAME_KEY, awsProfileName);
+ }
+
+ return builder.build();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
index 2be8274606..307d373e6c 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/DelegatingLoginConfigProvider.java
@@ -36,6 +36,7 @@ public class DelegatingLoginConfigProvider implements
LoginConfigProvider {
PROVIDERS.put(SaslMechanism.PLAIN, new PlainLoginConfigProvider());
PROVIDERS.put(SaslMechanism.SCRAM_SHA_256, SCRAM_PROVIDER);
PROVIDERS.put(SaslMechanism.SCRAM_SHA_512, SCRAM_PROVIDER);
+ PROVIDERS.put(SaslMechanism.AWS_MSK_IAM, new
AwsMskIamLoginConfigProvider());
}
/**
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosUserServiceLoginConfigProvider.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosUserServiceLoginConfigProvider.java
index 257f98903e..e82207be91 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosUserServiceLoginConfigProvider.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/KerberosUserServiceLoginConfigProvider.java
@@ -16,38 +16,20 @@
*/
package org.apache.nifi.kafka.shared.login;
-import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE;
-
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
import org.apache.nifi.security.krb.KerberosUser;
import javax.security.auth.login.AppConfigurationEntry;
-import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Objects;
+
+import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SELF_CONTAINED_KERBEROS_USER_SERVICE;
/**
* Kerberos User Service Login Module implementation of configuration provider
*/
public class KerberosUserServiceLoginConfigProvider implements
LoginConfigProvider {
- private static final String SPACE = " ";
-
- private static final String EQUALS = "=";
-
- private static final String DOUBLE_QUOTE = "\"";
-
- private static final String SEMI_COLON = ";";
-
- private static final Map<AppConfigurationEntry.LoginModuleControlFlag,
String> CONTROL_FLAGS = new LinkedHashMap<>();
-
- static {
-
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
"optional");
-
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
"required");
-
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE,
"requisite");
-
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT,
"sufficient");
- }
/**
* Get JAAS configuration using configured Kerberos credentials
@@ -61,32 +43,11 @@ public class KerberosUserServiceLoginConfigProvider
implements LoginConfigProvid
final KerberosUser kerberosUser =
kerberosUserService.createKerberosUser();
final AppConfigurationEntry configurationEntry =
kerberosUser.getConfigurationEntry();
- final StringBuilder builder = new StringBuilder();
-
- final String loginModuleName = configurationEntry.getLoginModuleName();
- builder.append(loginModuleName);
-
- final AppConfigurationEntry.LoginModuleControlFlag controlFlag =
configurationEntry.getControlFlag();
- final String moduleControlFlag =
Objects.requireNonNull(CONTROL_FLAGS.get(controlFlag), "Control Flag not
found");
- builder.append(SPACE);
- builder.append(moduleControlFlag);
+ final LoginConfigBuilder builder = new
LoginConfigBuilder(configurationEntry.getLoginModuleName(),
configurationEntry.getControlFlag());
final Map<String, ?> options = configurationEntry.getOptions();
- options.forEach((key, value) -> {
- builder.append(SPACE);
-
- builder.append(key);
- builder.append(EQUALS);
- if (value instanceof String) {
- builder.append(DOUBLE_QUOTE);
- builder.append(value);
- builder.append(DOUBLE_QUOTE);
- } else {
- builder.append(value);
- }
- });
+ options.forEach(builder::append);
- builder.append(SEMI_COLON);
- return builder.toString();
+ return builder.build();
}
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilder.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilder.java
new file mode 100644
index 0000000000..4789f4fdcc
--- /dev/null
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.shared.login;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Helper class to build JAAS configuration
+ */
+public class LoginConfigBuilder {
+
+ private static final Map<AppConfigurationEntry.LoginModuleControlFlag,
String> CONTROL_FLAGS = new LinkedHashMap<>();
+
+ static {
+
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL,
"optional");
+
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
"required");
+
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE,
"requisite");
+
CONTROL_FLAGS.put(AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT,
"sufficient");
+ }
+
+ private static final String SPACE = " ";
+
+ private static final String EQUALS = "=";
+
+ private static final String DOUBLE_QUOTE = "\"";
+
+ private static final String SEMI_COLON = ";";
+
+ private final StringBuilder builder;
+
+ public LoginConfigBuilder(final String moduleClassName, final
AppConfigurationEntry.LoginModuleControlFlag controlFlag) {
+ final String moduleControlFlag =
Objects.requireNonNull(CONTROL_FLAGS.get(controlFlag), "Control Flag not
found");
+ this.builder = new
StringBuilder(moduleClassName).append(SPACE).append(moduleControlFlag);
+ }
+
+ public LoginConfigBuilder append(String key, Object value) {
+ builder.append(SPACE);
+
+ builder.append(key);
+ builder.append(EQUALS);
+ if (value instanceof String) {
+ builder.append(DOUBLE_QUOTE);
+ builder.append(value);
+ builder.append(DOUBLE_QUOTE);
+ } else {
+ builder.append(value);
+ }
+
+ return this;
+ }
+
+ public String build() {
+ builder.append(SEMI_COLON);
+ return builder.toString();
+ }
+
+}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/ScramLoginConfigProvider.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/ScramLoginConfigProvider.java
index 48290a541b..d4e51e63a2 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/ScramLoginConfigProvider.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/ScramLoginConfigProvider.java
@@ -19,17 +19,18 @@ package org.apache.nifi.kafka.shared.login;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import static
javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
+
/**
* SASL SCRAM Login Module implementation of configuration provider
*/
public class ScramLoginConfigProvider implements LoginConfigProvider {
private static final String MODULE_CLASS_NAME =
"org.apache.kafka.common.security.scram.ScramLoginModule";
- private static final String FORMAT = "%s required username=\"%s\"
password=\"%s\"";
-
- private static final String TOKEN_AUTH_ENABLED = "tokenauth=true";
+ private static final String USERNAME_KEY = "username";
+ private static final String PASSWORD_KEY = "password";
- private static final String SEMI_COLON = ";";
+ private static final String TOKEN_AUTH_KEY = "tokenauth";
/**
* Get JAAS configuration using configured username and password with
optional token authentication
@@ -39,20 +40,19 @@ public class ScramLoginConfigProvider implements
LoginConfigProvider {
*/
@Override
public String getConfiguration(final PropertyContext context) {
- final StringBuilder builder = new StringBuilder();
+ final LoginConfigBuilder builder = new
LoginConfigBuilder(MODULE_CLASS_NAME, REQUIRED);
final String username =
context.getProperty(KafkaClientComponent.SASL_USERNAME).evaluateAttributeExpressions().getValue();
final String password =
context.getProperty(KafkaClientComponent.SASL_PASSWORD).evaluateAttributeExpressions().getValue();
- final String moduleUsernamePassword = String.format(FORMAT,
MODULE_CLASS_NAME, username, password);
- builder.append(moduleUsernamePassword);
+ builder.append(USERNAME_KEY, username);
+ builder.append(PASSWORD_KEY, password);
final Boolean tokenAuthenticationEnabled =
context.getProperty(KafkaClientComponent.TOKEN_AUTHENTICATION).asBoolean();
if (Boolean.TRUE == tokenAuthenticationEnabled) {
- builder.append(TOKEN_AUTH_ENABLED);
+ builder.append(TOKEN_AUTH_KEY, Boolean.TRUE);
}
- builder.append(SEMI_COLON);
- return builder.toString();
+ return builder.build();
}
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
index 9ab885fb0b..c1316145a0 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
@@ -24,6 +24,8 @@ public enum KafkaClientProperty {
SASL_LOGIN_CLASS("sasl.login.class"),
+ SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"),
+
SSL_KEYSTORE_LOCATION("ssl.keystore.location"),
SSL_KEYSTORE_PASSWORD("ssl.keystore.password"),
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
index a9da7714e3..04e07a0dab 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/SaslMechanism.java
@@ -17,8 +17,10 @@
package org.apache.nifi.kafka.shared.property;
import org.apache.nifi.components.DescribedValue;
+import
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
import java.util.Arrays;
+import java.util.EnumSet;
import java.util.Optional;
/**
@@ -31,7 +33,10 @@ public enum SaslMechanism implements DescribedValue {
SCRAM_SHA_256("SCRAM-SHA-256", "SCRAM-SHA-256", "Salted Challenge Response
Authentication Mechanism using SHA-512 with username and password"),
- SCRAM_SHA_512("SCRAM-SHA-512", "SCRAM-SHA-512", "Salted Challenge Response
Authentication Mechanism using SHA-256 with username and password");
+ SCRAM_SHA_512("SCRAM-SHA-512", "SCRAM-SHA-512", "Salted Challenge Response
Authentication Mechanism using SHA-256 with username and password"),
+
+ AWS_MSK_IAM("AWS_MSK_IAM", "AWS_MSK_IAM", "Allows to use AWS IAM for
authentication and authorization against Amazon MSK clusters that have AWS IAM
enabled " +
+ "as an authentication mechanism. The IAM credentials will be found
using the AWS Default Credentials Provider Chain.");
private final String value;
@@ -52,6 +57,14 @@ public enum SaslMechanism implements DescribedValue {
return foundSaslMechanism.orElseThrow(() -> new
IllegalArgumentException(String.format("SaslMechanism value [%s] not found",
value)));
}
+ public static EnumSet<SaslMechanism> getAvailableSaslMechanisms() {
+ if (StandardKafkaPropertyProvider.isAwsMskIamCallbackHandlerFound()) {
+ return EnumSet.allOf(SaslMechanism.class);
+ } else {
+ return EnumSet.complementOf(EnumSet.of(SaslMechanism.AWS_MSK_IAM));
+ }
+ }
+
@Override
public String getValue() {
return value;
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
index fd06acb6d0..3159442bf6 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java
@@ -19,6 +19,7 @@ package org.apache.nifi.kafka.shared.property.provider;
import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SASL_MECHANISM;
import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SECURITY_PROTOCOL;
import static
org.apache.nifi.kafka.shared.component.KafkaClientComponent.SSL_CONTEXT_SERVICE;
+import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_CLIENT_CALLBACK_HANDLER_CLASS;
import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_JAAS_CONFIG;
import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SASL_LOGIN_CLASS;
import static
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION;
@@ -56,6 +57,8 @@ public class StandardKafkaPropertyProvider implements
KafkaPropertyProvider {
private static final String SASL_GSSAPI_CUSTOM_LOGIN_CLASS =
"org.apache.nifi.processors.kafka.pubsub.CustomKerberosLogin";
+ public static final String SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS
= "software.amazon.msk.auth.iam.IAMClientCallbackHandler";
+
private static final LoginConfigProvider LOGIN_CONFIG_PROVIDER = new
DelegatingLoginConfigProvider();
private final Set<String> clientPropertyNames;
@@ -86,6 +89,8 @@ public class StandardKafkaPropertyProvider implements
KafkaPropertyProvider {
final SaslMechanism saslMechanism =
SaslMechanism.getSaslMechanism(context.getProperty(SASL_MECHANISM).getValue());
if (SaslMechanism.GSSAPI == saslMechanism &&
isCustomKerberosLoginFound()) {
properties.put(SASL_LOGIN_CLASS.getProperty(),
SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
+ } else if (SaslMechanism.AWS_MSK_IAM == saslMechanism &&
isAwsMskIamCallbackHandlerFound()) {
+
properties.put(SASL_CLIENT_CALLBACK_HANDLER_CLASS.getProperty(),
SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
}
}
}
@@ -160,9 +165,17 @@ public class StandardKafkaPropertyProvider implements
KafkaPropertyProvider {
}
}
- private boolean isCustomKerberosLoginFound() {
+ private static boolean isCustomKerberosLoginFound() {
+ return isClassFound(SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
+ }
+
+ public static boolean isAwsMskIamCallbackHandlerFound() {
+ return isClassFound(SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
+ }
+
+ private static boolean isClassFound(final String className) {
try {
- Class.forName(SASL_GSSAPI_CUSTOM_LOGIN_CLASS);
+ Class.forName(className);
return true;
} catch (final ClassNotFoundException e) {
return false;
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java
index 4927b8c1fe..e36264f2af 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java
@@ -23,6 +23,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.kafka.shared.property.KafkaClientProperty;
import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
+import
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
@@ -74,6 +75,7 @@ public class KafkaClientCustomValidationFunction implements
Function<ValidationC
validateKerberosServices(validationContext, results);
validateKerberosCredentials(validationContext, results);
validateUsernamePassword(validationContext, results);
+ validateAwsMskIamMechanism(validationContext, results);
return results;
}
@@ -233,6 +235,24 @@ public class KafkaClientCustomValidationFunction
implements Function<ValidationC
}
}
+ private void validateAwsMskIamMechanism(final ValidationContext
validationContext, final Collection<ValidationResult> results) {
+ final PropertyValue saslMechanismProperty =
validationContext.getProperty(SASL_MECHANISM);
+ if (saslMechanismProperty.isSet()) {
+ final SaslMechanism saslMechanism =
SaslMechanism.getSaslMechanism(saslMechanismProperty.getValue());
+
+ if (SaslMechanism.AWS_MSK_IAM == saslMechanism &&
!StandardKafkaPropertyProvider.isAwsMskIamCallbackHandlerFound()) {
+ final String explanation = String.format("[%s] required class
not found: Kafka modules must be compiled with AWS MSK enabled",
+
StandardKafkaPropertyProvider.SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS);
+
+ results.add(new ValidationResult.Builder()
+ .subject(SASL_MECHANISM.getDisplayName())
+ .valid(false)
+ .explanation(explanation)
+ .build());
+ }
+ }
+ }
+
private boolean isEmpty(final String string) {
return string == null || string.isEmpty();
}
diff --git
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilderTest.java
similarity index 50%
copy from
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
copy to
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilderTest.java
index 9ab885fb0b..f6cfe6bfd2 100644
---
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/KafkaClientProperty.java
+++
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/test/java/org/apache/nifi/kafka/shared/login/LoginConfigBuilderTest.java
@@ -14,37 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.kafka.shared.property;
+package org.apache.nifi.kafka.shared.login;
-/**
- * Enumeration of Kafka Client property names without reference to Kafka
libraries
- */
-public enum KafkaClientProperty {
- SASL_JAAS_CONFIG("sasl.jaas.config"),
-
- SASL_LOGIN_CLASS("sasl.login.class"),
-
- SSL_KEYSTORE_LOCATION("ssl.keystore.location"),
-
- SSL_KEYSTORE_PASSWORD("ssl.keystore.password"),
+import org.junit.jupiter.api.Test;
- SSL_KEYSTORE_TYPE("ssl.keystore.type"),
+import static
javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
- SSL_KEY_PASSWORD("ssl.key.password"),
+class LoginConfigBuilderTest {
- SSL_TRUSTSTORE_LOCATION("ssl.truststore.location"),
+ @Test
+ void createExampleJaasConfigTest() {
+ String expectedConfig = "test.class.name required booleanFlag=true
numberFlag=1 stringFlag=\"string-flag\";";
- SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password"),
-
- SSL_TRUSTSTORE_TYPE("ssl.truststore.type");
-
- private final String property;
-
- KafkaClientProperty(final String property) {
- this.property = property;
- }
+ LoginConfigBuilder builder = new LoginConfigBuilder("test.class.name",
REQUIRED);
+ builder.append("booleanFlag", Boolean.TRUE);
+ builder.append("numberFlag", 1);
+ builder.append("stringFlag", "string-flag");
- public String getProperty() {
- return property;
+ assertEquals(expectedConfig, builder.build());
}
-}
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
index d9080203ed..6761b04748 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
@@ -23,9 +23,10 @@
<packaging>pom</packaging>
<properties>
- <kafka1.0.version>1.0.2</kafka1.0.version>
- <kafka2.0.version>2.0.0</kafka2.0.version>
- <kafka2.6.version>2.6.3</kafka2.6.version>
+ <kafka1.0.version>1.0.2</kafka1.0.version>
+ <kafka2.0.version>2.0.0</kafka2.0.version>
+ <kafka2.6.version>2.6.3</kafka2.6.version>
+ <aws-msk-iam-auth.version>1.1.5</aws-msk-iam-auth.version>
</properties>
<modules>
@@ -44,7 +45,7 @@
<artifactId>nifi-kafka-1-0-processors</artifactId>
<version>1.20.0-SNAPSHOT</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-2-0-processors</artifactId>
<version>1.20.0-SNAPSHOT</version>