This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 16c527271b NIFI-11271: This closes #7795. Removed deprecated 
Kerberos-related properties and updated to make use of KerberosUserService; 
some code cleanup to bring up-to-date with Java 21 recommendations such as 
Stream.toList() instead of Stream.collect(Collectors.toList()) and using 
enhanced switch statements
16c527271b is described below

commit 16c527271bbfc2f83b251383880dcb78972ce7e4
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Tue Sep 26 11:42:22 2023 -0400

    NIFI-11271: This closes #7795. Removed deprecated Kerberos-related 
properties and updated to make use of KerberosUserService; some code cleanup to 
bring up-to-date with Java 21 recommendations such as Stream.toList() instead 
of Stream.collect(Collectors.toList()) and using enhanced switch statements
    
    Signed-off-by: Joseph Witt <joew...@apache.org>
---
 .../nifi-kudu-controller-service/pom.xml           |  11 +-
 .../nifi/controller/kudu/KuduLookupService.java    | 164 +++++-------
 .../processors/kudu/AbstractKuduProcessor.java     | 296 +++++----------------
 .../org/apache/nifi/processors/kudu/PutKudu.java   |  88 +++---
 .../apache/nifi/processors/kudu/MockPutKudu.java   | 146 ++--------
 .../apache/nifi/processors/kudu/TestPutKudu.java   | 134 ++--------
 6 files changed, 203 insertions(+), 636 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
index 1d23041702..36c48656be 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
@@ -63,11 +63,6 @@
             <version>2.0.0-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-record</artifactId>
@@ -88,6 +83,12 @@
             <artifactId>kudu-client</artifactId>
             <version>${kudu.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
index b208186fac..a53444340d 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
@@ -16,6 +16,18 @@
  */
 package org.apache.nifi.controller.kudu;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.security.auth.login.LoginException;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
@@ -38,12 +50,11 @@ import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.lookup.RecordLookupService;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.security.krb.KerberosAction;
-import org.apache.nifi.security.krb.KerberosKeytabUser;
 import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
@@ -52,20 +63,6 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 
-import javax.security.auth.login.LoginException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
 
 @CapabilityDescription("Lookup a record from Kudu Server associated with the 
specified key. Binary columns are base64 encoded. Only one matched row will be 
returned")
 @Tags({"lookup", "enrich", "key", "value", "kudu"})
@@ -80,13 +77,12 @@ public class KuduLookupService extends 
AbstractControllerService implements Reco
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
             .build();
 
-    public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new 
PropertyDescriptor.Builder()
-            .name("kudu-lu-kerberos-credentials-service")
-            .displayName("Kerberos Credentials Service")
-            .description("Specifies the Kerberos Credentials to use for 
authentication")
-            .required(false)
-            .identifiesControllerService(KerberosCredentialsService.class)
-            .build();
+    public static final PropertyDescriptor KERBEROS_USER_SERVICE = new 
PropertyDescriptor.Builder()
+        .name("Kerberos User Service")
+        .description("Specifies the Kerberos Credentials to use for 
authentication")
+        .required(false)
+        .identifiesControllerService(KerberosUserService.class)
+        .build();
 
     public static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new 
PropertyDescriptor.Builder()
             .name("kudu-lu-operations-timeout-ms")
@@ -137,7 +133,6 @@ public class KuduLookupService extends 
AbstractControllerService implements Reco
 
     protected List<PropertyDescriptor> properties;
 
-    protected KerberosCredentialsService credentialsService;
     private volatile KerberosUser kerberosUser;
 
     protected String kuduMasters;
@@ -154,26 +149,22 @@ public class KuduLookupService extends 
AbstractControllerService implements Reco
     protected void init(final ControllerServiceInitializationContext context) {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(KUDU_MASTERS);
-        properties.add(KERBEROS_CREDENTIALS_SERVICE);
+        properties.add(KERBEROS_USER_SERVICE);
         properties.add(KUDU_OPERATION_TIMEOUT_MS);
         properties.add(KUDU_REPLICA_SELECTION);
         properties.add(TABLE_NAME);
         properties.add(RETURN_COLUMNS);
-        addProperties(properties);
         this.properties = Collections.unmodifiableList(properties);
     }
 
-    protected void addProperties(List<PropertyDescriptor> properties) {
-    }
 
     protected void createKuduClient(ConfigurationContext context) throws 
LoginException {
         final String kuduMasters = 
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
-        final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+        final KerberosUserService userService = 
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
 
-        if (credentialsService != null) {
-            final String keytab = credentialsService.getKeytab();
-            final String principal = credentialsService.getPrincipal();
-            kerberosUser = loginKerberosUser(principal, keytab);
+        if (userService != null) {
+            kerberosUser = userService.createKerberosUser();
+            kerberosUser.login();
 
             final KerberosAction<KuduClient> kerberosAction = new 
KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), 
getLogger());
             this.kuduClient = kerberosAction.execute();
@@ -182,14 +173,9 @@ public class KuduLookupService extends 
AbstractControllerService implements Reco
         }
     }
 
-    protected KerberosUser loginKerberosUser(final String principal, final 
String keytab) throws LoginException {
-        final KerberosUser kerberosUser = new KerberosKeytabUser(principal, 
keytab);
-        kerberosUser.login();
-        return kerberosUser;
-    }
 
     protected KuduClient buildClient(final String masters, final 
ConfigurationContext context) {
-        final Integer operationTimeout = 
context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final int operationTimeout = 
context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
 
         return new KuduClient.KuduClientBuilder(masters)
                 .defaultOperationTimeoutMs(operationTimeout)
@@ -203,10 +189,8 @@ public class KuduLookupService extends 
AbstractControllerService implements Reco
      */
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) throws 
InitializationException {
-
         try {
             kuduMasters = 
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
-            credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
 
             if (kuduClient == null) {
                 getLogger().debug("Setting up Kudu connection...");
@@ -214,7 +198,7 @@ public class KuduLookupService extends 
AbstractControllerService implements Reco
                 createKuduClient(context);
                 getLogger().debug("Kudu connection successfully initialized");
             }
-        } catch(Exception ex){
+        } catch (final Exception ex) {
             getLogger().error("Exception occurred while interacting with Kudu 
due to " + ex.getMessage(), ex);
             throw new InitializationException(ex);
         }
@@ -228,15 +212,14 @@ public class KuduLookupService extends 
AbstractControllerService implements Reco
 
             //Result Schema
             resultSchema = kuduSchemaToNiFiSchema(tableSchema, columnNames);
-
-        } catch (KuduException e) {
+        } catch (final KuduException e) {
             throw new IllegalArgumentException(e);
         }
     }
 
     @Override
     public Set<String> getRequiredKeys() {
-        return new HashSet<>();
+        return Collections.emptySet();
     }
 
     @Override
@@ -246,21 +229,17 @@ public class KuduLookupService extends 
AbstractControllerService implements Reco
 
     @Override
     public Optional<Record> lookup(Map<String, Object> coordinates) {
-        Optional<Record> record;
-
-        if (kerberosUser != null) {
-            final KerberosAction<Optional<Record>> kerberosAction = new 
KerberosAction<>(kerberosUser, () -> getRecord(coordinates), getLogger());
-            record = kerberosAction.execute();
+        if (kerberosUser == null) {
+            return getRecord(coordinates);
         } else {
-            record = getRecord(coordinates);
+            final KerberosAction<Optional<Record>> kerberosAction = new 
KerberosAction<>(kerberosUser, () -> getRecord(coordinates), getLogger());
+            return kerberosAction.execute();
         }
-
-        return record;
     }
 
     private Optional<Record> getRecord(Map<String, Object> coordinates) {
         //Scanner
-        KuduScanner.KuduScannerBuilder builder = 
kuduClient.newScannerBuilder(table);
+        final KuduScanner.KuduScannerBuilder builder = 
kuduClient.newScannerBuilder(table);
 
         builder.setProjectedColumnNames(columnNames);
         builder.replicaSelection(replicaSelection);
@@ -272,20 +251,22 @@ public class KuduLookupService extends 
AbstractControllerService implements Reco
                 
builder.addPredicate(KuduPredicate.newComparisonPredicate(tableSchema.getColumn(key),
 KuduPredicate.ComparisonOp.EQUAL, value))
         );
 
-        KuduScanner kuduScanner = builder.build();
+        final KuduScanner kuduScanner = builder.build();
 
         //Run lookup
-        for ( RowResult row : kuduScanner){
+        for (final RowResult row : kuduScanner) {
             final Map<String, Object> values = new HashMap<>();
-            for(String columnName : columnNames){
+            for (final String columnName : columnNames) {
                 Object object;
-                if(row.getColumnType(columnName) == Type.BINARY){
+                if (row.getColumnType(columnName) == Type.BINARY) {
                     object = 
Base64.getEncoder().encodeToString(row.getBinaryCopy(columnName));
                 } else {
                     object = row.getObject(columnName);
                 }
+
                 values.put(columnName, object);
             }
+
             return Optional.of(new MapRecord(resultSchema, values));
         }
 
@@ -293,8 +274,8 @@ public class KuduLookupService extends 
AbstractControllerService implements Reco
         return Optional.empty();
     }
 
-    private List<String> getColumns(String columns){
-        if(columns.equals("*")){
+    private List<String> getColumns(final String columns) {
+        if (columns.equals("*")) {
             return tableSchema
                     .getColumns()
                     .stream().map(ColumnSchema::getName)
@@ -306,50 +287,31 @@ public class KuduLookupService extends 
AbstractControllerService implements Reco
 
     private RecordSchema kuduSchemaToNiFiSchema(Schema kuduTableSchema, 
List<String> columnNames){
         final List<RecordField> fields = new ArrayList<>();
-        for(String columnName : columnNames) {
-            if(!kuduTableSchema.hasColumn(columnName)){
+        for (final String columnName : columnNames) {
+            if (!kuduTableSchema.hasColumn(columnName)) {
                 throw new IllegalArgumentException("Column not found in Kudu 
table schema " + columnName);
             }
-            ColumnSchema cs = kuduTableSchema.getColumn(columnName);
-            switch (cs.getType()) {
-                case INT8:
-                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.BYTE.getDataType()));
-                    break;
-                case INT16:
-                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.SHORT.getDataType()));
-                    break;
-                case INT32:
-                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.INT.getDataType()));
-                    break;
-                case INT64:
-                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.LONG.getDataType()));
-                    break;
-                case DECIMAL:
-                    final ColumnTypeAttributes attributes = 
cs.getTypeAttributes();
-                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.DECIMAL.getDecimalDataType(attributes.getPrecision(), 
attributes.getScale())));
-                    break;
-                case UNIXTIME_MICROS:
-                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.TIMESTAMP.getDataType()));
-                    break;
-                case BINARY:
-                case STRING:
-                case VARCHAR:
-                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.STRING.getDataType()));
-                    break;
-                case DOUBLE:
-                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.DOUBLE.getDataType()));
-                    break;
-                case BOOL:
-                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.BOOLEAN.getDataType()));
-                    break;
-                case FLOAT:
-                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.FLOAT.getDataType()));
-                    break;
-                case DATE:
-                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.DATE.getDataType()));
-                    break;
-            }
+
+            final ColumnSchema cs = kuduTableSchema.getColumn(columnName);
+            final ColumnTypeAttributes attributes = cs.getTypeAttributes();
+
+            final RecordField field = switch (cs.getType()) {
+                case INT8 -> new RecordField(cs.getName(), 
RecordFieldType.BYTE.getDataType());
+                case INT16 -> new RecordField(cs.getName(), 
RecordFieldType.SHORT.getDataType());
+                case INT32 -> new RecordField(cs.getName(), 
RecordFieldType.INT.getDataType());
+                case INT64 -> new RecordField(cs.getName(), 
RecordFieldType.LONG.getDataType());
+                case DECIMAL -> new RecordField(cs.getName(), 
RecordFieldType.DECIMAL.getDecimalDataType(attributes.getPrecision(), 
attributes.getScale()));
+                case UNIXTIME_MICROS -> new RecordField(cs.getName(), 
RecordFieldType.TIMESTAMP.getDataType());
+                case BINARY, STRING, VARCHAR -> new RecordField(cs.getName(), 
RecordFieldType.STRING.getDataType());
+                case DOUBLE -> new RecordField(cs.getName(), 
RecordFieldType.DOUBLE.getDataType());
+                case BOOL -> new RecordField(cs.getName(), 
RecordFieldType.BOOLEAN.getDataType());
+                case FLOAT -> new RecordField(cs.getName(), 
RecordFieldType.FLOAT.getDataType());
+                case DATE -> new RecordField(cs.getName(), 
RecordFieldType.DATE.getDataType());
+            };
+
+            fields.add(field);
         }
+
         return new SimpleRecordSchema(fields);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index dd6801c7b4..54b71e5c29 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -17,6 +17,25 @@
 
 package org.apache.nifi.processors.kudu;
 
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
@@ -33,18 +52,12 @@ import org.apache.kudu.client.SessionConfiguration;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.security.krb.KerberosAction;
-import org.apache.nifi.security.krb.KerberosKeytabUser;
-import org.apache.nifi.security.krb.KerberosPasswordUser;
 import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
@@ -53,29 +66,6 @@ import 
org.apache.nifi.serialization.record.field.FieldConverter;
 import 
org.apache.nifi.serialization.record.field.ObjectTimestampFieldConverter;
 import org.apache.nifi.serialization.record.type.DecimalDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
-import org.apache.nifi.util.StringUtils;
-
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.time.LocalDate;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Consumer;
 
 public abstract class AbstractKuduProcessor extends AbstractProcessor {
 
@@ -87,14 +77,6 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
             .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
             .build();
 
-    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new 
Builder()
-            .name("kerberos-credentials-service")
-            .displayName("Kerberos Credentials Service")
-            .description("Specifies the Kerberos Credentials to use for 
authentication")
-            .required(false)
-            .identifiesControllerService(KerberosCredentialsService.class)
-            .build();
-
     static final PropertyDescriptor KERBEROS_USER_SERVICE = new 
PropertyDescriptor.Builder()
             .name("kerberos-user-service")
             .displayName("Kerberos User Service")
@@ -103,25 +85,6 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
             .required(false)
             .build();
 
-    static final PropertyDescriptor KERBEROS_PRINCIPAL = new 
PropertyDescriptor.Builder()
-            .name("kerberos-principal")
-            .displayName("Kerberos Principal")
-            .description("The principal to use when specifying the principal 
and password directly in the processor for authenticating via Kerberos.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
-            .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
-            .build();
-
-    static final PropertyDescriptor KERBEROS_PASSWORD = new 
PropertyDescriptor.Builder()
-            .name("kerberos-password")
-            .displayName("Kerberos Password")
-            .description("The password to use when specifying the principal 
and password directly in the processor for authenticating via Kerberos.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .sensitive(true)
-            .build();
-
     static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new Builder()
             .name("kudu-operations-timeout-ms")
             .displayName("Kudu Operation Timeout")
@@ -187,25 +150,13 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
 
     protected void createKerberosUserAndOrKuduClient(ProcessContext context) {
         final KerberosUserService kerberosUserService = 
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
-        if (kerberosUserService != null) {
-            kerberosUser = kerberosUserService.createKerberosUser();
-            kerberosUser.login();
-            createKuduClient(context);
-        } else {
-            final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
-            final String kerberosPrincipal = 
context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
-            final String kerberosPassword = 
context.getProperty(KERBEROS_PASSWORD).getValue();
-
-            if (credentialsService != null) {
-                kerberosUser = 
createKerberosKeytabUser(credentialsService.getPrincipal(), 
credentialsService.getKeytab(), context);
-                kerberosUser.login(); // login creates the kudu client as well
-            } else if (!StringUtils.isBlank(kerberosPrincipal) && 
!StringUtils.isBlank(kerberosPassword)) {
-                kerberosUser = createKerberosPasswordUser(kerberosPrincipal, 
kerberosPassword, context);
-                kerberosUser.login(); // login creates the kudu client as well
-            } else {
-                createKuduClient(context);
-            }
+        if (kerberosUserService == null) {
+            return;
         }
+
+        kerberosUser = kerberosUserService.createKerberosUser();
+        kerberosUser.login();
+        createKuduClient(context);
     }
 
     protected void createKuduClient(ProcessContext context) {
@@ -250,8 +201,8 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
         );
 
         return new KuduClient.KuduClientBuilder(masters)
+            .defaultAdminOperationTimeoutMs(adminOperationTimeout)
                 .defaultOperationTimeoutMs(operationTimeout)
-                .defaultSocketReadTimeoutMs(adminOperationTimeout)
                 .saslProtocolName(saslProtocolName)
                 .workerCount(workerCount)
                 .nioExecutor(nioExecutor)
@@ -280,86 +231,6 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
         }
     }
 
-    protected KerberosUser createKerberosKeytabUser(String principal, String 
keytab, ProcessContext context) {
-        return new KerberosKeytabUser(principal, keytab) {
-            @Override
-            public synchronized void login() {
-                if (isLoggedIn()) {
-                    return;
-                }
-
-                super.login();
-                createKuduClient(context);
-            }
-        };
-    }
-
-    protected KerberosUser createKerberosPasswordUser(String principal, String 
password, ProcessContext context) {
-        return new KerberosPasswordUser(principal, password) {
-            @Override
-            public synchronized void login() {
-                if (isLoggedIn()) {
-                    return;
-                }
-
-                super.login();
-                createKuduClient(context);
-            }
-        };
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
-        final List<ValidationResult> results = new ArrayList<>();
-
-        final boolean kerberosPrincipalProvided = 
!StringUtils.isBlank(context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue());
-        final boolean kerberosPasswordProvided = 
!StringUtils.isBlank(context.getProperty(KERBEROS_PASSWORD).getValue());
-
-        if (kerberosPrincipalProvided && !kerberosPasswordProvided) {
-            results.add(new ValidationResult.Builder()
-                    .subject(KERBEROS_PASSWORD.getDisplayName())
-                    .valid(false)
-                    .explanation("a password must be provided for the given 
principal")
-                    .build());
-        }
-
-        if (kerberosPasswordProvided && !kerberosPrincipalProvided) {
-            results.add(new ValidationResult.Builder()
-                    .subject(KERBEROS_PRINCIPAL.getDisplayName())
-                    .valid(false)
-                    .explanation("a principal must be provided for the given 
password")
-                    .build());
-        }
-
-        final KerberosCredentialsService kerberosCredentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
-        final KerberosUserService kerberosUserService = 
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
-
-        if (kerberosCredentialsService != null && (kerberosPrincipalProvided 
|| kerberosPasswordProvided)) {
-            results.add(new ValidationResult.Builder()
-                    .subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
-                    .valid(false)
-                    .explanation("kerberos principal/password and kerberos 
credential service cannot be configured at the same time")
-                    .build());
-        }
-
-        if (kerberosUserService != null && (kerberosPrincipalProvided || 
kerberosPasswordProvided)) {
-            results.add(new ValidationResult.Builder()
-                    .subject(KERBEROS_USER_SERVICE.getDisplayName())
-                    .valid(false)
-                    .explanation("kerberos principal/password and kerberos 
user service cannot be configured at the same time")
-                    .build());
-        }
-
-        if (kerberosUserService != null && kerberosCredentialsService != null) 
{
-            results.add(new ValidationResult.Builder()
-                    .subject(KERBEROS_USER_SERVICE.getDisplayName())
-                    .valid(false)
-                    .explanation("kerberos user service and kerberos 
credentials service cannot be configured at the same time")
-                    .build());
-        }
-
-        return results;
-    }
 
     @OnStopped
     public void shutdown() throws Exception {
@@ -407,51 +278,28 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
                 final Optional<DataType> fieldDataType = 
record.getSchema().getDataType(recordFieldName);
                 final String dataTypeFormat = 
fieldDataType.map(DataType::getFormat).orElse(null);
                 switch (colType) {
-                    case BOOL:
-                        row.addBoolean(columnIndex, 
DataTypeUtils.toBoolean(value, recordFieldName));
-                        break;
-                    case INT8:
-                        row.addByte(columnIndex, DataTypeUtils.toByte(value, 
recordFieldName));
-                        break;
-                    case INT16:
-                        row.addShort(columnIndex,  
DataTypeUtils.toShort(value, recordFieldName));
-                        break;
-                    case INT32:
-                        row.addInt(columnIndex,  
DataTypeUtils.toInteger(value, recordFieldName));
-                        break;
-                    case INT64:
-                        row.addLong(columnIndex,  DataTypeUtils.toLong(value, 
recordFieldName));
-                        break;
-                    case UNIXTIME_MICROS:
+                    case BOOL -> row.addBoolean(columnIndex, 
DataTypeUtils.toBoolean(value, recordFieldName));
+                    case INT8 -> row.addByte(columnIndex, 
DataTypeUtils.toByte(value, recordFieldName));
+                    case INT16 -> row.addShort(columnIndex, 
DataTypeUtils.toShort(value, recordFieldName));
+                    case INT32 -> row.addInt(columnIndex, 
DataTypeUtils.toInteger(value, recordFieldName));
+                    case INT64 -> row.addLong(columnIndex, 
DataTypeUtils.toLong(value, recordFieldName));
+                    case UNIXTIME_MICROS -> {
                         final Optional<DataType> optionalDataType = 
record.getSchema().getDataType(recordFieldName);
-                        final Optional<String> optionalPattern = 
getTimestampPattern(optionalDataType);
+                        final Optional<String> optionalPattern = 
getTimestampPattern(optionalDataType.orElse(null));
                         final Timestamp timestamp = 
TIMESTAMP_FIELD_CONVERTER.convertField(value, optionalPattern, recordFieldName);
                         row.addTimestamp(columnIndex, timestamp);
-                        break;
-                    case STRING:
-                        row.addString(columnIndex, 
DataTypeUtils.toString(value, dataTypeFormat));
-                        break;
-                    case BINARY:
-                        row.addBinary(columnIndex, 
DataTypeUtils.toString(value, dataTypeFormat).getBytes());
-                        break;
-                    case FLOAT:
-                        row.addFloat(columnIndex, DataTypeUtils.toFloat(value, 
recordFieldName));
-                        break;
-                    case DOUBLE:
-                        row.addDouble(columnIndex, 
DataTypeUtils.toDouble(value, recordFieldName));
-                        break;
-                    case DECIMAL:
-                        row.addDecimal(columnIndex, new 
BigDecimal(DataTypeUtils.toString(value, dataTypeFormat)));
-                        break;
-                    case VARCHAR:
-                        row.addVarchar(columnIndex, 
DataTypeUtils.toString(value, dataTypeFormat));
-                        break;
-                    case DATE:
+                    }
+                    case STRING -> row.addString(columnIndex, 
DataTypeUtils.toString(value, dataTypeFormat));
+                    case BINARY -> row.addBinary(columnIndex, 
DataTypeUtils.toString(value, dataTypeFormat).getBytes());
+                    case FLOAT -> row.addFloat(columnIndex, 
DataTypeUtils.toFloat(value, recordFieldName));
+                    case DOUBLE -> row.addDouble(columnIndex, 
DataTypeUtils.toDouble(value, recordFieldName));
+                    case DECIMAL -> row.addDecimal(columnIndex, new 
BigDecimal(DataTypeUtils.toString(value, dataTypeFormat)));
+                    case VARCHAR -> row.addVarchar(columnIndex, 
DataTypeUtils.toString(value, dataTypeFormat));
+                    case DATE -> {
                         final String dateFormat = dataTypeFormat == null ? 
RecordFieldType.DATE.getDefaultFormat() : dataTypeFormat;
                         row.addDate(columnIndex, getDate(value, 
recordFieldName, dateFormat));
-                        break;
-                    default:
-                        throw new IllegalStateException(String.format("unknown 
column type %s", colType));
+                    }
+                    default -> throw new 
IllegalStateException(String.format("unknown column type %s", colType));
                 }
             }
         }
@@ -460,20 +308,15 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
     /**
      * Get Timestamp Pattern and override Timestamp Record Field pattern with 
optional microsecond pattern
      *
-     * @param optionalDataType Optional Data Type
+     * @param dataType Data Type
      * @return Optional Timestamp Pattern
      */
-    private Optional<String> getTimestampPattern(final Optional<DataType> 
optionalDataType) {
-        String pattern = null;
-        if (optionalDataType.isPresent()) {
-            final DataType dataType = optionalDataType.get();
-            if (RecordFieldType.TIMESTAMP == dataType.getFieldType()) {
-                pattern = MICROSECOND_TIMESTAMP_PATTERN;
-            } else {
-                pattern = dataType.getFormat();
-            }
+    private Optional<String> getTimestampPattern(final DataType dataType) {
+        if (dataType == null) {
+            return Optional.empty();
         }
-        return Optional.ofNullable(pattern);
+
+        return Optional.of(RecordFieldType.TIMESTAMP == 
dataType.getFieldType() ? MICROSECOND_TIMESTAMP_PATTERN : dataType.getFormat());
     }
 
     /**
@@ -493,33 +336,20 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
      * Converts a NiFi DataType to it's equivalent Kudu Type.
      */
     private Type toKuduType(DataType nifiType) {
-        switch (nifiType.getFieldType()) {
-            case BOOLEAN:
-                return Type.BOOL;
-            case BYTE:
-                return Type.INT8;
-            case SHORT:
-                return Type.INT16;
-            case INT:
-                return Type.INT32;
-            case LONG:
-                return Type.INT64;
-            case FLOAT:
-                return Type.FLOAT;
-            case DOUBLE:
-                return Type.DOUBLE;
-            case DECIMAL:
-                return Type.DECIMAL;
-            case TIMESTAMP:
-                return Type.UNIXTIME_MICROS;
-            case CHAR:
-            case STRING:
-                return Type.STRING;
-            case DATE:
-                return Type.DATE;
-            default:
-                throw new IllegalArgumentException(String.format("unsupported 
type %s", nifiType));
-        }
+        return switch (nifiType.getFieldType()) {
+            case BOOLEAN -> Type.BOOL;
+            case BYTE -> Type.INT8;
+            case SHORT -> Type.INT16;
+            case INT -> Type.INT32;
+            case LONG -> Type.INT64;
+            case FLOAT -> Type.FLOAT;
+            case DOUBLE -> Type.DOUBLE;
+            case DECIMAL -> Type.DECIMAL;
+            case TIMESTAMP -> Type.UNIXTIME_MICROS;
+            case CHAR, STRING -> Type.STRING;
+            case DATE -> Type.DATE;
+            default -> throw new 
IllegalArgumentException(String.format("unsupported type %s", nifiType));
+        };
     }
 
     private ColumnTypeAttributes getKuduTypeAttributes(final DataType 
nifiType) {
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 7fba59fa17..3a3273559f 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -17,6 +17,19 @@
 
 package org.apache.nifi.processors.kudu;
 
+import java.io.InputStream;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.security.auth.login.LoginException;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
@@ -61,23 +74,9 @@ import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSet;
 
-import javax.security.auth.login.LoginException;
-import java.io.InputStream;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
+import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT;
 import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
 import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
-import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT;
 
 @SystemResourceConsideration(resource = SystemResource.MEMORY)
 @SupportsBatching
@@ -211,26 +210,27 @@ public class PutKudu extends AbstractKuduProcessor {
 
     protected static final PropertyDescriptor FLUSH_MODE = new Builder()
         .name("Flush Mode")
-        .description("Set the new flush mode for a kudu session.\n" +
-            "AUTO_FLUSH_SYNC: the call returns when the operation is 
persisted, else it throws an exception.\n" +
-            "AUTO_FLUSH_BACKGROUND: the call returns when the operation has 
been added to the buffer. This call should normally perform only fast 
in-memory" +
-            " operations but it may have to wait when the buffer is full and 
there's another buffer being flushed.\n" +
-            "MANUAL_FLUSH: the call returns when the operation has been added 
to the buffer, else it throws a KuduException if the buffer is full.")
+        .description("""
+               Set the new flush mode for a kudu session.
+               AUTO_FLUSH_SYNC: the call returns when the operation is 
persisted, else it throws an exception.
+               AUTO_FLUSH_BACKGROUND: the call returns when the operation has 
been added to the buffer. This call should normally perform only fast in-memory
+               operations but it may have to wait when the buffer is full and 
there's another buffer being flushed.
+               "MANUAL_FLUSH: the call returns when the operation has been 
added to the buffer, else it throws a KuduException if the buffer is full.
+            """)
         .allowableValues(SessionConfiguration.FlushMode.values())
         
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .required(true)
         .build();
 
     protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new 
Builder()
         .name("FlowFiles per Batch")
-        .description("The maximum number of FlowFiles to process in a single 
execution, between 1 - 100000. " +
+        .description("The maximum number of FlowFiles to process in a single 
execution, between 1 and 100,000. " +
             "Depending on your memory size, and data size per row set an 
appropriate batch size " +
             "for the number of FlowFiles to process per client connection 
setup." +
             "Gradually increase this number, only if your FlowFiles typically 
contain a few records.")
         .defaultValue("1")
         .required(true)
-        .addValidator(StandardValidators.createLongValidator(1, 100000, true))
+        .addValidator(StandardValidators.createLongValidator(1, 100_000, true))
         .expressionLanguageSupported(ENVIRONMENT)
         .build();
 
@@ -282,9 +282,6 @@ public class PutKudu extends AbstractKuduProcessor {
         properties.add(TABLE_NAME);
         properties.add(FAILURE_STRATEGY);
         properties.add(KERBEROS_USER_SERVICE);
-        properties.add(KERBEROS_CREDENTIALS_SERVICE);
-        properties.add(KERBEROS_PRINCIPAL);
-        properties.add(KERBEROS_PASSWORD);
         properties.add(SKIP_HEAD_LINE);
         properties.add(LOWERCASE_FIELD_NAMES);
         properties.add(HANDLE_SCHEMA_DRIFT);
@@ -447,7 +444,7 @@ public class PutKudu extends AbstractKuduProcessor {
                         dataRecords = Collections.singletonList(record);
                     } else {
                         final RecordPathResult result = 
dataRecordPath.evaluate(record);
-                        final List<FieldValue> fieldValues = 
result.getSelectedFields().collect(Collectors.toList());
+                        final List<FieldValue> fieldValues = 
result.getSelectedFields().toList();
                         if (fieldValues.isEmpty()) {
                             throw new ProcessException("RecordPath " + 
dataRecordPath.getPath() + " evaluated against Record yielded no results.");
                         }
@@ -530,7 +527,7 @@ public class PutKudu extends AbstractKuduProcessor {
             recordFields = record.getSchema().getFields();
         } else {
             final RecordPathResult recordPathResult = 
dataRecordPath.evaluate(record);
-            final List<FieldValue> fieldValues =  
recordPathResult.getSelectedFields().collect(Collectors.toList());
+            final List<FieldValue> fieldValues = 
recordPathResult.getSelectedFields().toList();
 
             recordFields = new ArrayList<>();
             for (final FieldValue fieldValue : fieldValues) {
@@ -548,7 +545,7 @@ public class PutKudu extends AbstractKuduProcessor {
 
         final List<RecordField> missing = recordFields.stream()
             .filter(field -> !schema.hasColumn(lowercaseFields ? 
field.getFieldName().toLowerCase() : field.getFieldName()))
-            .collect(Collectors.toList());
+            .toList();
 
         if (missing.isEmpty()) {
             getLogger().debug("No schema drift detected for {}", flowFile);
@@ -649,10 +646,8 @@ public class PutKudu extends AbstractKuduProcessor {
                                           boolean lowercaseFields, KuduTable 
kuduTable) {
         Operation operation;
         switch (operationType) {
-            case INSERT:
-                operation = kuduTable.newInsert();
-                break;
-            case INSERT_IGNORE:
+            case INSERT -> operation = kuduTable.newInsert();
+            case INSERT_IGNORE -> {
                 // If the target Kudu cluster does not support ignore 
operations use an insert.
                 // The legacy session based insert ignore will be used instead.
                 if (!supportsInsertIgnoreOp) {
@@ -660,24 +655,13 @@ public class PutKudu extends AbstractKuduProcessor {
                 } else {
                     operation = kuduTable.newInsertIgnore();
                 }
-                break;
-            case UPSERT:
-                operation = kuduTable.newUpsert();
-                break;
-            case UPDATE:
-                operation = kuduTable.newUpdate();
-                break;
-            case UPDATE_IGNORE:
-                operation = kuduTable.newUpdateIgnore();
-                break;
-            case DELETE:
-                operation = kuduTable.newDelete();
-                break;
-            case DELETE_IGNORE:
-                operation = kuduTable.newDeleteIgnore();
-                break;
-            default:
-                throw new 
IllegalArgumentException(String.format("OperationType: %s not supported by 
Kudu", operationType));
+            }
+            case UPSERT -> operation = kuduTable.newUpsert();
+            case UPDATE -> operation = kuduTable.newUpdate();
+            case UPDATE_IGNORE -> operation = kuduTable.newUpdateIgnore();
+            case DELETE -> operation = kuduTable.newDelete();
+            case DELETE_IGNORE -> operation = kuduTable.newDeleteIgnore();
+            default -> throw new 
IllegalArgumentException(String.format("OperationType: %s not supported by 
Kudu", operationType));
         }
         buildPartialRow(kuduTable.getSchema(), operation.getRow(), record, 
fieldNames, ignoreNull, lowercaseFields);
         return operation;
@@ -693,7 +677,7 @@ public class PutKudu extends AbstractKuduProcessor {
         @Override
         public OperationType apply(final Record record) {
             final RecordPathResult recordPathResult = 
recordPath.evaluate(record);
-            final List<FieldValue> resultList = 
recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+            final List<FieldValue> resultList = 
recordPathResult.getSelectedFields().distinct().toList();
             if (resultList.isEmpty()) {
                 throw new ProcessException("Evaluated RecordPath " + 
recordPath.getPath() + " against Record but got no results");
             }
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index 1b807d7516..a1efdb71e9 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -17,47 +17,37 @@
 
 package org.apache.nifi.processors.kudu;
 
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import org.apache.kudu.Schema;
+import org.apache.kudu.client.Delete;
 import org.apache.kudu.client.DeleteIgnore;
+import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.InsertIgnore;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduSession;
 import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Delete;
-import org.apache.kudu.client.Insert;
 import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.Update;
 import org.apache.kudu.client.UpdateIgnore;
 import org.apache.kudu.client.Upsert;
-import org.apache.kudu.client.Update;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.serialization.record.Record;
 
-import javax.security.auth.login.AppConfigurationEntry;
-import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class MockPutKudu extends PutKudu {
 
-    private KuduSession session;
-    private LinkedList<Operation> opQueue;
+    private final KuduSession session;
+    private final LinkedList<Operation> opQueue;
 
     // Atomic reference is used as the set and use of the schema are in 
different thread
-    private AtomicReference<Schema> tableSchema = new AtomicReference<>();
-
-    private boolean loggedIn = false;
-    private boolean loggedOut = false;
+    private final AtomicReference<Schema> tableSchema = new 
AtomicReference<>();
 
     public MockPutKudu() {
         this(mock(KuduSession.class));
@@ -78,31 +68,15 @@ public class MockPutKudu extends PutKudu {
                                             boolean lowercaseFields, KuduTable 
kuduTable) {
         Operation operation = opQueue.poll();
         if (operation == null) {
-            switch (operationType) {
-                case INSERT:
-                    operation = mock(Insert.class);
-                    break;
-                case INSERT_IGNORE:
-                    operation = mock(InsertIgnore.class);
-                    break;
-                case UPSERT:
-                    operation = mock(Upsert.class);
-                    break;
-                case UPDATE:
-                    operation = mock(Update.class);
-                    break;
-                case UPDATE_IGNORE:
-                    operation = mock(UpdateIgnore.class);
-                    break;
-                case DELETE:
-                    operation = mock(Delete.class);
-                    break;
-                case DELETE_IGNORE:
-                    operation = mock(DeleteIgnore.class);
-                    break;
-                default:
-                    throw new 
IllegalArgumentException(String.format("OperationType: %s not supported by 
Kudu", operationType));
-            }
+            operation = switch (operationType) {
+                case INSERT -> mock(Insert.class);
+                case INSERT_IGNORE -> mock(InsertIgnore.class);
+                case UPSERT -> mock(Upsert.class);
+                case UPDATE -> mock(Update.class);
+                case UPDATE_IGNORE -> mock(UpdateIgnore.class);
+                case DELETE -> mock(Delete.class);
+                case DELETE_IGNORE -> mock(DeleteIgnore.class);
+            };
         }
         return operation;
     }
@@ -140,86 +114,6 @@ public class MockPutKudu extends PutKudu {
         actionOnKuduClient.accept(client);
     }
 
-    public boolean loggedIn() {
-        return loggedIn;
-    }
-
-    public boolean loggedOut() {
-        return loggedOut;
-    }
-
-    @Override
-    protected KerberosUser createKerberosKeytabUser(String principal, String 
keytab, ProcessContext context) {
-        return createMockKerberosUser(principal);
-    }
-
-    @Override
-    protected KerberosUser createKerberosPasswordUser(String principal, String 
password, ProcessContext context) {
-        return createMockKerberosUser(principal);
-    }
-
-    private KerberosUser createMockKerberosUser(final String principal) {
-        return new KerberosUser() {
-
-            @Override
-            public void login() {
-                loggedIn = true;
-            }
-
-            @Override
-            public void logout() {
-                loggedOut = true;
-            }
-
-            @Override
-            public <T> T doAs(final PrivilegedAction<T> action) throws 
IllegalStateException {
-                return action.run();
-            }
-
-            @Override
-            public <T> T doAs(PrivilegedAction<T> action, ClassLoader 
contextClassLoader) throws IllegalStateException {
-                return action.run();
-            }
-
-            @Override
-            public <T> T doAs(final PrivilegedExceptionAction<T> action) 
throws IllegalStateException, PrivilegedActionException {
-                try {
-                    return action.run();
-                } catch (Exception e) {
-                    throw new PrivilegedActionException(e);
-                }
-            }
-
-            @Override
-            public <T> T doAs(PrivilegedExceptionAction<T> action, ClassLoader 
contextClassLoader) throws IllegalStateException, PrivilegedActionException {
-                try {
-                    return action.run();
-                } catch (Exception e) {
-                    throw new PrivilegedActionException(e);
-                }
-            }
-
-            @Override
-            public boolean checkTGTAndRelogin() {
-                return true;
-            }
-
-            @Override
-            public boolean isLoggedIn() {
-                return loggedIn && !loggedOut;
-            }
-
-            @Override
-            public String getPrincipal() {
-                return principal;
-            }
-
-            @Override
-            public AppConfigurationEntry getConfigurationEntry() {
-                return new AppConfigurationEntry("LoginModule", 
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, Collections.emptyMap());
-            }
-        };
-    }
 
     @Override
     protected KuduSession createKuduSession(final KuduClient client) {
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index eb72477a51..39247984f6 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -17,6 +17,20 @@
 
 package org.apache.nifi.processors.kudu;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.ColumnTypeAttributes;
 import org.apache.kudu.Schema;
@@ -29,11 +43,8 @@ import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RowError;
 import org.apache.kudu.client.RowErrorsAndOverflowStatus;
 import org.apache.kudu.client.SessionConfiguration.FlushMode;
-import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
-import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
@@ -58,29 +69,11 @@ import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 import org.mockito.stubbing.OngoingStubbing;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigDecimal;
-import java.sql.Timestamp;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
 import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
 import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
 import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -148,50 +141,6 @@ public class TestPutKudu {
         testRunner.enableControllerService(readerFactory);
     }
 
-    @Test
-    public void testCustomValidate() throws InitializationException {
-        createRecordReader(1);
-
-        testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal");
-        testRunner.assertNotValid();
-
-        testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL);
-        testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password");
-        testRunner.assertNotValid();
-
-        testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal");
-        testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password");
-        testRunner.assertValid();
-
-        final KerberosCredentialsService kerberosCredentialsService = new 
MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab");
-        testRunner.addControllerService("kerb", kerberosCredentialsService);
-        testRunner.enableControllerService(kerberosCredentialsService);
-        testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb");
-        testRunner.assertNotValid();
-
-        testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL);
-        testRunner.removeProperty(PutKudu.KERBEROS_PASSWORD);
-        testRunner.assertValid();
-
-        final KerberosUserService kerberosUserService = 
enableKerberosUserService(testRunner);
-        testRunner.setProperty(PutKudu.KERBEROS_USER_SERVICE, 
kerberosUserService.getIdentifier());
-        testRunner.assertNotValid();
-
-        testRunner.removeProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE);
-        testRunner.assertValid();
-
-        testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal");
-        testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password");
-        testRunner.assertNotValid();
-    }
-
-    private KerberosUserService enableKerberosUserService(final TestRunner 
runner) throws InitializationException {
-        final KerberosUserService kerberosUserService = 
mock(KerberosUserService.class);
-        when(kerberosUserService.getIdentifier()).thenReturn("userService1");
-        runner.addControllerService(kerberosUserService.getIdentifier(), 
kerberosUserService);
-        runner.enableControllerService(kerberosUserService);
-        return kerberosUserService;
-    }
 
     @Test
     public void testWriteKuduWithDefaults() throws InitializationException {
@@ -221,39 +170,6 @@ public class TestPutKudu {
         assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
     }
 
-    @Test
-    public void testKerberosEnabled() throws InitializationException {
-        createRecordReader(1);
-
-        final KerberosCredentialsService kerberosCredentialsService = new 
MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab");
-        testRunner.addControllerService("kerb", kerberosCredentialsService);
-        testRunner.enableControllerService(kerberosCredentialsService);
-
-        testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb");
-
-        testRunner.run(1, false);
-
-        final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor();
-        assertTrue(proc.loggedIn());
-        assertFalse(proc.loggedOut());
-
-        testRunner.run(1, true, false);
-        assertTrue(proc.loggedOut());
-    }
-
-    @Test
-    public void testInsecureClient() throws InitializationException {
-        createRecordReader(1);
-
-        testRunner.run(1, false);
-
-        final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor();
-        assertFalse(proc.loggedIn());
-        assertFalse(proc.loggedOut());
-
-        testRunner.run(1, true, false);
-        assertFalse(proc.loggedOut());
-    }
 
 
     @Test
@@ -751,7 +667,7 @@ public class TestPutKudu {
                     flowFileResponses = flowFileResponses.subList(sliceSize, 
flowFileResponses.size());
 
                     List<OperationResponse> batch = new ArrayList<>();
-                    for (OperationResponse response : 
slice.stream().flatMap(List::stream).collect(Collectors.toList())) {
+                    for (OperationResponse response : 
slice.stream().flatMap(List::stream).toList()) {
                         if (batch.size() == batchSize) {
                             flushes.add(batch);
                             batch = new ArrayList<>();
@@ -814,24 +730,4 @@ public class TestPutKudu {
     public void testKuduPartialFailuresOnManualFlush() throws Exception {
         testKuduPartialFailure(FlushMode.MANUAL_FLUSH);
     }
-
-    public static class MockKerberosCredentialsService extends 
AbstractControllerService implements KerberosCredentialsService {
-        private final String keytab;
-        private final String principal;
-
-        public MockKerberosCredentialsService(final String keytab, final 
String principal) {
-            this.keytab = keytab;
-            this.principal = principal;
-        }
-
-        @Override
-        public String getKeytab() {
-            return keytab;
-        }
-
-        @Override
-        public String getPrincipal() {
-            return principal;
-        }
-    }
 }

Reply via email to