This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 16c527271b NIFI-11271: This closes #7795. Removed deprecated
Kerberos-related properties and updated to make use of KerberosUserService;
some code cleanup to bring up-to-date with Java 21 recommendations such as
Stream.toList() instead of Stream.collect(Collectors.toList()) and using
enhanced switch statements
16c527271b is described below
commit 16c527271bbfc2f83b251383880dcb78972ce7e4
Author: Mark Payne <[email protected]>
AuthorDate: Tue Sep 26 11:42:22 2023 -0400
NIFI-11271: This closes #7795. Removed deprecated Kerberos-related
properties and updated to make use of KerberosUserService; some code cleanup to
bring up-to-date with Java 21 recommendations such as Stream.toList() instead
of Stream.collect(Collectors.toList()) and using enhanced switch statements
Signed-off-by: Joseph Witt <[email protected]>
---
.../nifi-kudu-controller-service/pom.xml | 11 +-
.../nifi/controller/kudu/KuduLookupService.java | 164 +++++-------
.../processors/kudu/AbstractKuduProcessor.java | 296 +++++----------------
.../org/apache/nifi/processors/kudu/PutKudu.java | 88 +++---
.../apache/nifi/processors/kudu/MockPutKudu.java | 146 ++--------
.../apache/nifi/processors/kudu/TestPutKudu.java | 134 ++--------
6 files changed, 203 insertions(+), 636 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
index 1d23041702..36c48656be 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
@@ -63,11 +63,6 @@
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-kerberos-credentials-service-api</artifactId>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
@@ -88,6 +83,12 @@
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kerberos-user-service-api</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<profiles>
<profile>
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
index b208186fac..a53444340d 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
@@ -16,6 +16,18 @@
*/
package org.apache.nifi.controller.kudu;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.security.auth.login.LoginException;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
@@ -38,12 +50,11 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.lookup.RecordLookupService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosAction;
-import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
@@ -52,20 +63,6 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
-import javax.security.auth.login.LoginException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
@CapabilityDescription("Lookup a record from Kudu Server associated with the
specified key. Binary columns are base64 encoded. Only one matched row will be
returned")
@Tags({"lookup", "enrich", "key", "value", "kudu"})
@@ -80,13 +77,12 @@ public class KuduLookupService extends
AbstractControllerService implements Reco
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
- public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new
PropertyDescriptor.Builder()
- .name("kudu-lu-kerberos-credentials-service")
- .displayName("Kerberos Credentials Service")
- .description("Specifies the Kerberos Credentials to use for
authentication")
- .required(false)
- .identifiesControllerService(KerberosCredentialsService.class)
- .build();
+ public static final PropertyDescriptor KERBEROS_USER_SERVICE = new
PropertyDescriptor.Builder()
+ .name("Kerberos User Service")
+ .description("Specifies the Kerberos Credentials to use for
authentication")
+ .required(false)
+ .identifiesControllerService(KerberosUserService.class)
+ .build();
public static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new
PropertyDescriptor.Builder()
.name("kudu-lu-operations-timeout-ms")
@@ -137,7 +133,6 @@ public class KuduLookupService extends
AbstractControllerService implements Reco
protected List<PropertyDescriptor> properties;
- protected KerberosCredentialsService credentialsService;
private volatile KerberosUser kerberosUser;
protected String kuduMasters;
@@ -154,26 +149,22 @@ public class KuduLookupService extends
AbstractControllerService implements Reco
protected void init(final ControllerServiceInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(KUDU_MASTERS);
- properties.add(KERBEROS_CREDENTIALS_SERVICE);
+ properties.add(KERBEROS_USER_SERVICE);
properties.add(KUDU_OPERATION_TIMEOUT_MS);
properties.add(KUDU_REPLICA_SELECTION);
properties.add(TABLE_NAME);
properties.add(RETURN_COLUMNS);
- addProperties(properties);
this.properties = Collections.unmodifiableList(properties);
}
- protected void addProperties(List<PropertyDescriptor> properties) {
- }
protected void createKuduClient(ConfigurationContext context) throws
LoginException {
final String kuduMasters =
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
- final KerberosCredentialsService credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+ final KerberosUserService userService =
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
- if (credentialsService != null) {
- final String keytab = credentialsService.getKeytab();
- final String principal = credentialsService.getPrincipal();
- kerberosUser = loginKerberosUser(principal, keytab);
+ if (userService != null) {
+ kerberosUser = userService.createKerberosUser();
+ kerberosUser.login();
final KerberosAction<KuduClient> kerberosAction = new
KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context),
getLogger());
this.kuduClient = kerberosAction.execute();
@@ -182,14 +173,9 @@ public class KuduLookupService extends
AbstractControllerService implements Reco
}
}
- protected KerberosUser loginKerberosUser(final String principal, final
String keytab) throws LoginException {
- final KerberosUser kerberosUser = new KerberosKeytabUser(principal,
keytab);
- kerberosUser.login();
- return kerberosUser;
- }
protected KuduClient buildClient(final String masters, final
ConfigurationContext context) {
- final Integer operationTimeout =
context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final int operationTimeout =
context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
return new KuduClient.KuduClientBuilder(masters)
.defaultOperationTimeoutMs(operationTimeout)
@@ -203,10 +189,8 @@ public class KuduLookupService extends
AbstractControllerService implements Reco
*/
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws
InitializationException {
-
try {
kuduMasters =
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
- credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
if (kuduClient == null) {
getLogger().debug("Setting up Kudu connection...");
@@ -214,7 +198,7 @@ public class KuduLookupService extends
AbstractControllerService implements Reco
createKuduClient(context);
getLogger().debug("Kudu connection successfully initialized");
}
- } catch(Exception ex){
+ } catch (final Exception ex) {
getLogger().error("Exception occurred while interacting with Kudu
due to " + ex.getMessage(), ex);
throw new InitializationException(ex);
}
@@ -228,15 +212,14 @@ public class KuduLookupService extends
AbstractControllerService implements Reco
//Result Schema
resultSchema = kuduSchemaToNiFiSchema(tableSchema, columnNames);
-
- } catch (KuduException e) {
+ } catch (final KuduException e) {
throw new IllegalArgumentException(e);
}
}
@Override
public Set<String> getRequiredKeys() {
- return new HashSet<>();
+ return Collections.emptySet();
}
@Override
@@ -246,21 +229,17 @@ public class KuduLookupService extends
AbstractControllerService implements Reco
@Override
public Optional<Record> lookup(Map<String, Object> coordinates) {
- Optional<Record> record;
-
- if (kerberosUser != null) {
- final KerberosAction<Optional<Record>> kerberosAction = new
KerberosAction<>(kerberosUser, () -> getRecord(coordinates), getLogger());
- record = kerberosAction.execute();
+ if (kerberosUser == null) {
+ return getRecord(coordinates);
} else {
- record = getRecord(coordinates);
+ final KerberosAction<Optional<Record>> kerberosAction = new
KerberosAction<>(kerberosUser, () -> getRecord(coordinates), getLogger());
+ return kerberosAction.execute();
}
-
- return record;
}
private Optional<Record> getRecord(Map<String, Object> coordinates) {
//Scanner
- KuduScanner.KuduScannerBuilder builder =
kuduClient.newScannerBuilder(table);
+ final KuduScanner.KuduScannerBuilder builder =
kuduClient.newScannerBuilder(table);
builder.setProjectedColumnNames(columnNames);
builder.replicaSelection(replicaSelection);
@@ -272,20 +251,22 @@ public class KuduLookupService extends
AbstractControllerService implements Reco
builder.addPredicate(KuduPredicate.newComparisonPredicate(tableSchema.getColumn(key),
KuduPredicate.ComparisonOp.EQUAL, value))
);
- KuduScanner kuduScanner = builder.build();
+ final KuduScanner kuduScanner = builder.build();
//Run lookup
- for ( RowResult row : kuduScanner){
+ for (final RowResult row : kuduScanner) {
final Map<String, Object> values = new HashMap<>();
- for(String columnName : columnNames){
+ for (final String columnName : columnNames) {
Object object;
- if(row.getColumnType(columnName) == Type.BINARY){
+ if (row.getColumnType(columnName) == Type.BINARY) {
object =
Base64.getEncoder().encodeToString(row.getBinaryCopy(columnName));
} else {
object = row.getObject(columnName);
}
+
values.put(columnName, object);
}
+
return Optional.of(new MapRecord(resultSchema, values));
}
@@ -293,8 +274,8 @@ public class KuduLookupService extends
AbstractControllerService implements Reco
return Optional.empty();
}
- private List<String> getColumns(String columns){
- if(columns.equals("*")){
+ private List<String> getColumns(final String columns) {
+ if (columns.equals("*")) {
return tableSchema
.getColumns()
.stream().map(ColumnSchema::getName)
@@ -306,50 +287,31 @@ public class KuduLookupService extends
AbstractControllerService implements Reco
private RecordSchema kuduSchemaToNiFiSchema(Schema kuduTableSchema,
List<String> columnNames){
final List<RecordField> fields = new ArrayList<>();
- for(String columnName : columnNames) {
- if(!kuduTableSchema.hasColumn(columnName)){
+ for (final String columnName : columnNames) {
+ if (!kuduTableSchema.hasColumn(columnName)) {
throw new IllegalArgumentException("Column not found in Kudu
table schema " + columnName);
}
- ColumnSchema cs = kuduTableSchema.getColumn(columnName);
- switch (cs.getType()) {
- case INT8:
- fields.add(new RecordField(cs.getName(),
RecordFieldType.BYTE.getDataType()));
- break;
- case INT16:
- fields.add(new RecordField(cs.getName(),
RecordFieldType.SHORT.getDataType()));
- break;
- case INT32:
- fields.add(new RecordField(cs.getName(),
RecordFieldType.INT.getDataType()));
- break;
- case INT64:
- fields.add(new RecordField(cs.getName(),
RecordFieldType.LONG.getDataType()));
- break;
- case DECIMAL:
- final ColumnTypeAttributes attributes =
cs.getTypeAttributes();
- fields.add(new RecordField(cs.getName(),
RecordFieldType.DECIMAL.getDecimalDataType(attributes.getPrecision(),
attributes.getScale())));
- break;
- case UNIXTIME_MICROS:
- fields.add(new RecordField(cs.getName(),
RecordFieldType.TIMESTAMP.getDataType()));
- break;
- case BINARY:
- case STRING:
- case VARCHAR:
- fields.add(new RecordField(cs.getName(),
RecordFieldType.STRING.getDataType()));
- break;
- case DOUBLE:
- fields.add(new RecordField(cs.getName(),
RecordFieldType.DOUBLE.getDataType()));
- break;
- case BOOL:
- fields.add(new RecordField(cs.getName(),
RecordFieldType.BOOLEAN.getDataType()));
- break;
- case FLOAT:
- fields.add(new RecordField(cs.getName(),
RecordFieldType.FLOAT.getDataType()));
- break;
- case DATE:
- fields.add(new RecordField(cs.getName(),
RecordFieldType.DATE.getDataType()));
- break;
- }
+
+ final ColumnSchema cs = kuduTableSchema.getColumn(columnName);
+ final ColumnTypeAttributes attributes = cs.getTypeAttributes();
+
+ final RecordField field = switch (cs.getType()) {
+ case INT8 -> new RecordField(cs.getName(),
RecordFieldType.BYTE.getDataType());
+ case INT16 -> new RecordField(cs.getName(),
RecordFieldType.SHORT.getDataType());
+ case INT32 -> new RecordField(cs.getName(),
RecordFieldType.INT.getDataType());
+ case INT64 -> new RecordField(cs.getName(),
RecordFieldType.LONG.getDataType());
+ case DECIMAL -> new RecordField(cs.getName(),
RecordFieldType.DECIMAL.getDecimalDataType(attributes.getPrecision(),
attributes.getScale()));
+ case UNIXTIME_MICROS -> new RecordField(cs.getName(),
RecordFieldType.TIMESTAMP.getDataType());
+ case BINARY, STRING, VARCHAR -> new RecordField(cs.getName(),
RecordFieldType.STRING.getDataType());
+ case DOUBLE -> new RecordField(cs.getName(),
RecordFieldType.DOUBLE.getDataType());
+ case BOOL -> new RecordField(cs.getName(),
RecordFieldType.BOOLEAN.getDataType());
+ case FLOAT -> new RecordField(cs.getName(),
RecordFieldType.FLOAT.getDataType());
+ case DATE -> new RecordField(cs.getName(),
RecordFieldType.DATE.getDataType());
+ };
+
+ fields.add(field);
}
+
return new SimpleRecordSchema(fields);
}
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index dd6801c7b4..54b71e5c29 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -17,6 +17,25 @@
package org.apache.nifi.processors.kudu;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
@@ -33,18 +52,12 @@ import org.apache.kudu.client.SessionConfiguration;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
-import org.apache.nifi.security.krb.KerberosKeytabUser;
-import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
@@ -53,29 +66,6 @@ import
org.apache.nifi.serialization.record.field.FieldConverter;
import
org.apache.nifi.serialization.record.field.ObjectTimestampFieldConverter;
import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
-import org.apache.nifi.util.StringUtils;
-
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.time.LocalDate;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Consumer;
public abstract class AbstractKuduProcessor extends AbstractProcessor {
@@ -87,14 +77,6 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
- static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new
Builder()
- .name("kerberos-credentials-service")
- .displayName("Kerberos Credentials Service")
- .description("Specifies the Kerberos Credentials to use for
authentication")
- .required(false)
- .identifiesControllerService(KerberosCredentialsService.class)
- .build();
-
static final PropertyDescriptor KERBEROS_USER_SERVICE = new
PropertyDescriptor.Builder()
.name("kerberos-user-service")
.displayName("Kerberos User Service")
@@ -103,25 +85,6 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
.required(false)
.build();
- static final PropertyDescriptor KERBEROS_PRINCIPAL = new
PropertyDescriptor.Builder()
- .name("kerberos-principal")
- .displayName("Kerberos Principal")
- .description("The principal to use when specifying the principal
and password directly in the processor for authenticating via Kerberos.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
- .expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
- .build();
-
- static final PropertyDescriptor KERBEROS_PASSWORD = new
PropertyDescriptor.Builder()
- .name("kerberos-password")
- .displayName("Kerberos Password")
- .description("The password to use when specifying the principal
and password directly in the processor for authenticating via Kerberos.")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .sensitive(true)
- .build();
-
static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new Builder()
.name("kudu-operations-timeout-ms")
.displayName("Kudu Operation Timeout")
@@ -187,25 +150,13 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
protected void createKerberosUserAndOrKuduClient(ProcessContext context) {
final KerberosUserService kerberosUserService =
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
- if (kerberosUserService != null) {
- kerberosUser = kerberosUserService.createKerberosUser();
- kerberosUser.login();
- createKuduClient(context);
- } else {
- final KerberosCredentialsService credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
- final String kerberosPrincipal =
context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
- final String kerberosPassword =
context.getProperty(KERBEROS_PASSWORD).getValue();
-
- if (credentialsService != null) {
- kerberosUser =
createKerberosKeytabUser(credentialsService.getPrincipal(),
credentialsService.getKeytab(), context);
- kerberosUser.login(); // login creates the kudu client as well
- } else if (!StringUtils.isBlank(kerberosPrincipal) &&
!StringUtils.isBlank(kerberosPassword)) {
- kerberosUser = createKerberosPasswordUser(kerberosPrincipal,
kerberosPassword, context);
- kerberosUser.login(); // login creates the kudu client as well
- } else {
- createKuduClient(context);
- }
+ if (kerberosUserService == null) {
+ return;
}
+
+ kerberosUser = kerberosUserService.createKerberosUser();
+ kerberosUser.login();
+ createKuduClient(context);
}
protected void createKuduClient(ProcessContext context) {
@@ -250,8 +201,8 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
);
return new KuduClient.KuduClientBuilder(masters)
+ .defaultAdminOperationTimeoutMs(adminOperationTimeout)
.defaultOperationTimeoutMs(operationTimeout)
- .defaultSocketReadTimeoutMs(adminOperationTimeout)
.saslProtocolName(saslProtocolName)
.workerCount(workerCount)
.nioExecutor(nioExecutor)
@@ -280,86 +231,6 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
}
}
- protected KerberosUser createKerberosKeytabUser(String principal, String
keytab, ProcessContext context) {
- return new KerberosKeytabUser(principal, keytab) {
- @Override
- public synchronized void login() {
- if (isLoggedIn()) {
- return;
- }
-
- super.login();
- createKuduClient(context);
- }
- };
- }
-
- protected KerberosUser createKerberosPasswordUser(String principal, String
password, ProcessContext context) {
- return new KerberosPasswordUser(principal, password) {
- @Override
- public synchronized void login() {
- if (isLoggedIn()) {
- return;
- }
-
- super.login();
- createKuduClient(context);
- }
- };
- }
-
- @Override
- protected Collection<ValidationResult> customValidate(ValidationContext
context) {
- final List<ValidationResult> results = new ArrayList<>();
-
- final boolean kerberosPrincipalProvided =
!StringUtils.isBlank(context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue());
- final boolean kerberosPasswordProvided =
!StringUtils.isBlank(context.getProperty(KERBEROS_PASSWORD).getValue());
-
- if (kerberosPrincipalProvided && !kerberosPasswordProvided) {
- results.add(new ValidationResult.Builder()
- .subject(KERBEROS_PASSWORD.getDisplayName())
- .valid(false)
- .explanation("a password must be provided for the given
principal")
- .build());
- }
-
- if (kerberosPasswordProvided && !kerberosPrincipalProvided) {
- results.add(new ValidationResult.Builder()
- .subject(KERBEROS_PRINCIPAL.getDisplayName())
- .valid(false)
- .explanation("a principal must be provided for the given
password")
- .build());
- }
-
- final KerberosCredentialsService kerberosCredentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
- final KerberosUserService kerberosUserService =
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
-
- if (kerberosCredentialsService != null && (kerberosPrincipalProvided
|| kerberosPasswordProvided)) {
- results.add(new ValidationResult.Builder()
- .subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
- .valid(false)
- .explanation("kerberos principal/password and kerberos
credential service cannot be configured at the same time")
- .build());
- }
-
- if (kerberosUserService != null && (kerberosPrincipalProvided ||
kerberosPasswordProvided)) {
- results.add(new ValidationResult.Builder()
- .subject(KERBEROS_USER_SERVICE.getDisplayName())
- .valid(false)
- .explanation("kerberos principal/password and kerberos
user service cannot be configured at the same time")
- .build());
- }
-
- if (kerberosUserService != null && kerberosCredentialsService != null)
{
- results.add(new ValidationResult.Builder()
- .subject(KERBEROS_USER_SERVICE.getDisplayName())
- .valid(false)
- .explanation("kerberos user service and kerberos
credentials service cannot be configured at the same time")
- .build());
- }
-
- return results;
- }
@OnStopped
public void shutdown() throws Exception {
@@ -407,51 +278,28 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
final Optional<DataType> fieldDataType =
record.getSchema().getDataType(recordFieldName);
final String dataTypeFormat =
fieldDataType.map(DataType::getFormat).orElse(null);
switch (colType) {
- case BOOL:
- row.addBoolean(columnIndex,
DataTypeUtils.toBoolean(value, recordFieldName));
- break;
- case INT8:
- row.addByte(columnIndex, DataTypeUtils.toByte(value,
recordFieldName));
- break;
- case INT16:
- row.addShort(columnIndex,
DataTypeUtils.toShort(value, recordFieldName));
- break;
- case INT32:
- row.addInt(columnIndex,
DataTypeUtils.toInteger(value, recordFieldName));
- break;
- case INT64:
- row.addLong(columnIndex, DataTypeUtils.toLong(value,
recordFieldName));
- break;
- case UNIXTIME_MICROS:
+ case BOOL -> row.addBoolean(columnIndex,
DataTypeUtils.toBoolean(value, recordFieldName));
+ case INT8 -> row.addByte(columnIndex,
DataTypeUtils.toByte(value, recordFieldName));
+ case INT16 -> row.addShort(columnIndex,
DataTypeUtils.toShort(value, recordFieldName));
+ case INT32 -> row.addInt(columnIndex,
DataTypeUtils.toInteger(value, recordFieldName));
+ case INT64 -> row.addLong(columnIndex,
DataTypeUtils.toLong(value, recordFieldName));
+ case UNIXTIME_MICROS -> {
final Optional<DataType> optionalDataType =
record.getSchema().getDataType(recordFieldName);
- final Optional<String> optionalPattern =
getTimestampPattern(optionalDataType);
+ final Optional<String> optionalPattern =
getTimestampPattern(optionalDataType.orElse(null));
final Timestamp timestamp =
TIMESTAMP_FIELD_CONVERTER.convertField(value, optionalPattern, recordFieldName);
row.addTimestamp(columnIndex, timestamp);
- break;
- case STRING:
- row.addString(columnIndex,
DataTypeUtils.toString(value, dataTypeFormat));
- break;
- case BINARY:
- row.addBinary(columnIndex,
DataTypeUtils.toString(value, dataTypeFormat).getBytes());
- break;
- case FLOAT:
- row.addFloat(columnIndex, DataTypeUtils.toFloat(value,
recordFieldName));
- break;
- case DOUBLE:
- row.addDouble(columnIndex,
DataTypeUtils.toDouble(value, recordFieldName));
- break;
- case DECIMAL:
- row.addDecimal(columnIndex, new
BigDecimal(DataTypeUtils.toString(value, dataTypeFormat)));
- break;
- case VARCHAR:
- row.addVarchar(columnIndex,
DataTypeUtils.toString(value, dataTypeFormat));
- break;
- case DATE:
+ }
+ case STRING -> row.addString(columnIndex,
DataTypeUtils.toString(value, dataTypeFormat));
+ case BINARY -> row.addBinary(columnIndex,
DataTypeUtils.toString(value, dataTypeFormat).getBytes());
+ case FLOAT -> row.addFloat(columnIndex,
DataTypeUtils.toFloat(value, recordFieldName));
+ case DOUBLE -> row.addDouble(columnIndex,
DataTypeUtils.toDouble(value, recordFieldName));
+ case DECIMAL -> row.addDecimal(columnIndex, new
BigDecimal(DataTypeUtils.toString(value, dataTypeFormat)));
+ case VARCHAR -> row.addVarchar(columnIndex,
DataTypeUtils.toString(value, dataTypeFormat));
+ case DATE -> {
final String dateFormat = dataTypeFormat == null ?
RecordFieldType.DATE.getDefaultFormat() : dataTypeFormat;
row.addDate(columnIndex, getDate(value,
recordFieldName, dateFormat));
- break;
- default:
- throw new IllegalStateException(String.format("unknown
column type %s", colType));
+ }
+ default -> throw new
IllegalStateException(String.format("unknown column type %s", colType));
}
}
}
@@ -460,20 +308,15 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
/**
* Get Timestamp Pattern and override Timestamp Record Field pattern with
optional microsecond pattern
*
- * @param optionalDataType Optional Data Type
+ * @param dataType Data Type
* @return Optional Timestamp Pattern
*/
- private Optional<String> getTimestampPattern(final Optional<DataType>
optionalDataType) {
- String pattern = null;
- if (optionalDataType.isPresent()) {
- final DataType dataType = optionalDataType.get();
- if (RecordFieldType.TIMESTAMP == dataType.getFieldType()) {
- pattern = MICROSECOND_TIMESTAMP_PATTERN;
- } else {
- pattern = dataType.getFormat();
- }
+ private Optional<String> getTimestampPattern(final DataType dataType) {
+ if (dataType == null) {
+ return Optional.empty();
}
- return Optional.ofNullable(pattern);
+
+ return Optional.of(RecordFieldType.TIMESTAMP ==
dataType.getFieldType() ? MICROSECOND_TIMESTAMP_PATTERN : dataType.getFormat());
}
/**
@@ -493,33 +336,20 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
* Converts a NiFi DataType to it's equivalent Kudu Type.
*/
private Type toKuduType(DataType nifiType) {
- switch (nifiType.getFieldType()) {
- case BOOLEAN:
- return Type.BOOL;
- case BYTE:
- return Type.INT8;
- case SHORT:
- return Type.INT16;
- case INT:
- return Type.INT32;
- case LONG:
- return Type.INT64;
- case FLOAT:
- return Type.FLOAT;
- case DOUBLE:
- return Type.DOUBLE;
- case DECIMAL:
- return Type.DECIMAL;
- case TIMESTAMP:
- return Type.UNIXTIME_MICROS;
- case CHAR:
- case STRING:
- return Type.STRING;
- case DATE:
- return Type.DATE;
- default:
- throw new IllegalArgumentException(String.format("unsupported
type %s", nifiType));
- }
+ return switch (nifiType.getFieldType()) {
+ case BOOLEAN -> Type.BOOL;
+ case BYTE -> Type.INT8;
+ case SHORT -> Type.INT16;
+ case INT -> Type.INT32;
+ case LONG -> Type.INT64;
+ case FLOAT -> Type.FLOAT;
+ case DOUBLE -> Type.DOUBLE;
+ case DECIMAL -> Type.DECIMAL;
+ case TIMESTAMP -> Type.UNIXTIME_MICROS;
+ case CHAR, STRING -> Type.STRING;
+ case DATE -> Type.DATE;
+ default -> throw new
IllegalArgumentException(String.format("unsupported type %s", nifiType));
+ };
}
private ColumnTypeAttributes getKuduTypeAttributes(final DataType
nifiType) {
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 7fba59fa17..3a3273559f 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -17,6 +17,19 @@
package org.apache.nifi.processors.kudu;
+import java.io.InputStream;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.security.auth.login.LoginException;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
@@ -61,23 +74,9 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSet;
-import javax.security.auth.login.LoginException;
-import java.io.InputStream;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
+import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT;
import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
-import static org.apache.nifi.expression.ExpressionLanguageScope.ENVIRONMENT;
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@SupportsBatching
@@ -211,26 +210,27 @@ public class PutKudu extends AbstractKuduProcessor {
protected static final PropertyDescriptor FLUSH_MODE = new Builder()
.name("Flush Mode")
- .description("Set the new flush mode for a kudu session.\n" +
- "AUTO_FLUSH_SYNC: the call returns when the operation is
persisted, else it throws an exception.\n" +
- "AUTO_FLUSH_BACKGROUND: the call returns when the operation has
been added to the buffer. This call should normally perform only fast
in-memory" +
- " operations but it may have to wait when the buffer is full and
there's another buffer being flushed.\n" +
- "MANUAL_FLUSH: the call returns when the operation has been added
to the buffer, else it throws a KuduException if the buffer is full.")
+ .description("""
+ Set the new flush mode for a kudu session.
+ AUTO_FLUSH_SYNC: the call returns when the operation is
persisted, else it throws an exception.
+ AUTO_FLUSH_BACKGROUND: the call returns when the operation has
been added to the buffer. This call should normally perform only fast in-memory
+ operations but it may have to wait when the buffer is full and
there's another buffer being flushed.
+ "MANUAL_FLUSH: the call returns when the operation has been
added to the buffer, else it throws a KuduException if the buffer is full.
+ """)
.allowableValues(SessionConfiguration.FlushMode.values())
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
protected static final PropertyDescriptor FLOWFILE_BATCH_SIZE = new
Builder()
.name("FlowFiles per Batch")
- .description("The maximum number of FlowFiles to process in a single
execution, between 1 - 100000. " +
+ .description("The maximum number of FlowFiles to process in a single
execution, between 1 and 100,000. " +
"Depending on your memory size, and data size per row set an
appropriate batch size " +
"for the number of FlowFiles to process per client connection
setup." +
"Gradually increase this number, only if your FlowFiles typically
contain a few records.")
.defaultValue("1")
.required(true)
- .addValidator(StandardValidators.createLongValidator(1, 100000, true))
+ .addValidator(StandardValidators.createLongValidator(1, 100_000, true))
.expressionLanguageSupported(ENVIRONMENT)
.build();
@@ -282,9 +282,6 @@ public class PutKudu extends AbstractKuduProcessor {
properties.add(TABLE_NAME);
properties.add(FAILURE_STRATEGY);
properties.add(KERBEROS_USER_SERVICE);
- properties.add(KERBEROS_CREDENTIALS_SERVICE);
- properties.add(KERBEROS_PRINCIPAL);
- properties.add(KERBEROS_PASSWORD);
properties.add(SKIP_HEAD_LINE);
properties.add(LOWERCASE_FIELD_NAMES);
properties.add(HANDLE_SCHEMA_DRIFT);
@@ -447,7 +444,7 @@ public class PutKudu extends AbstractKuduProcessor {
dataRecords = Collections.singletonList(record);
} else {
final RecordPathResult result =
dataRecordPath.evaluate(record);
- final List<FieldValue> fieldValues =
result.getSelectedFields().collect(Collectors.toList());
+ final List<FieldValue> fieldValues =
result.getSelectedFields().toList();
if (fieldValues.isEmpty()) {
throw new ProcessException("RecordPath " +
dataRecordPath.getPath() + " evaluated against Record yielded no results.");
}
@@ -530,7 +527,7 @@ public class PutKudu extends AbstractKuduProcessor {
recordFields = record.getSchema().getFields();
} else {
final RecordPathResult recordPathResult =
dataRecordPath.evaluate(record);
- final List<FieldValue> fieldValues =
recordPathResult.getSelectedFields().collect(Collectors.toList());
+ final List<FieldValue> fieldValues =
recordPathResult.getSelectedFields().toList();
recordFields = new ArrayList<>();
for (final FieldValue fieldValue : fieldValues) {
@@ -548,7 +545,7 @@ public class PutKudu extends AbstractKuduProcessor {
final List<RecordField> missing = recordFields.stream()
.filter(field -> !schema.hasColumn(lowercaseFields ?
field.getFieldName().toLowerCase() : field.getFieldName()))
- .collect(Collectors.toList());
+ .toList();
if (missing.isEmpty()) {
getLogger().debug("No schema drift detected for {}", flowFile);
@@ -649,10 +646,8 @@ public class PutKudu extends AbstractKuduProcessor {
boolean lowercaseFields, KuduTable
kuduTable) {
Operation operation;
switch (operationType) {
- case INSERT:
- operation = kuduTable.newInsert();
- break;
- case INSERT_IGNORE:
+ case INSERT -> operation = kuduTable.newInsert();
+ case INSERT_IGNORE -> {
// If the target Kudu cluster does not support ignore
operations use an insert.
// The legacy session based insert ignore will be used instead.
if (!supportsInsertIgnoreOp) {
@@ -660,24 +655,13 @@ public class PutKudu extends AbstractKuduProcessor {
} else {
operation = kuduTable.newInsertIgnore();
}
- break;
- case UPSERT:
- operation = kuduTable.newUpsert();
- break;
- case UPDATE:
- operation = kuduTable.newUpdate();
- break;
- case UPDATE_IGNORE:
- operation = kuduTable.newUpdateIgnore();
- break;
- case DELETE:
- operation = kuduTable.newDelete();
- break;
- case DELETE_IGNORE:
- operation = kuduTable.newDeleteIgnore();
- break;
- default:
- throw new
IllegalArgumentException(String.format("OperationType: %s not supported by
Kudu", operationType));
+ }
+ case UPSERT -> operation = kuduTable.newUpsert();
+ case UPDATE -> operation = kuduTable.newUpdate();
+ case UPDATE_IGNORE -> operation = kuduTable.newUpdateIgnore();
+ case DELETE -> operation = kuduTable.newDelete();
+ case DELETE_IGNORE -> operation = kuduTable.newDeleteIgnore();
+ default -> throw new
IllegalArgumentException(String.format("OperationType: %s not supported by
Kudu", operationType));
}
buildPartialRow(kuduTable.getSchema(), operation.getRow(), record,
fieldNames, ignoreNull, lowercaseFields);
return operation;
@@ -693,7 +677,7 @@ public class PutKudu extends AbstractKuduProcessor {
@Override
public OperationType apply(final Record record) {
final RecordPathResult recordPathResult =
recordPath.evaluate(record);
- final List<FieldValue> resultList =
recordPathResult.getSelectedFields().distinct().collect(Collectors.toList());
+ final List<FieldValue> resultList =
recordPathResult.getSelectedFields().distinct().toList();
if (resultList.isEmpty()) {
throw new ProcessException("Evaluated RecordPath " +
recordPath.getPath() + " against Record but got no results");
}
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index 1b807d7516..a1efdb71e9 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -17,47 +17,37 @@
package org.apache.nifi.processors.kudu;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import org.apache.kudu.Schema;
+import org.apache.kudu.client.Delete;
import org.apache.kudu.client.DeleteIgnore;
+import org.apache.kudu.client.Insert;
import org.apache.kudu.client.InsertIgnore;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Delete;
-import org.apache.kudu.client.Insert;
import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.Update;
import org.apache.kudu.client.UpdateIgnore;
import org.apache.kudu.client.Upsert;
-import org.apache.kudu.client.Update;
import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.record.Record;
-import javax.security.auth.login.AppConfigurationEntry;
-import java.security.PrivilegedAction;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MockPutKudu extends PutKudu {
- private KuduSession session;
- private LinkedList<Operation> opQueue;
+ private final KuduSession session;
+ private final LinkedList<Operation> opQueue;
// Atomic reference is used as the set and use of the schema are in
different thread
- private AtomicReference<Schema> tableSchema = new AtomicReference<>();
-
- private boolean loggedIn = false;
- private boolean loggedOut = false;
+ private final AtomicReference<Schema> tableSchema = new
AtomicReference<>();
public MockPutKudu() {
this(mock(KuduSession.class));
@@ -78,31 +68,15 @@ public class MockPutKudu extends PutKudu {
boolean lowercaseFields, KuduTable
kuduTable) {
Operation operation = opQueue.poll();
if (operation == null) {
- switch (operationType) {
- case INSERT:
- operation = mock(Insert.class);
- break;
- case INSERT_IGNORE:
- operation = mock(InsertIgnore.class);
- break;
- case UPSERT:
- operation = mock(Upsert.class);
- break;
- case UPDATE:
- operation = mock(Update.class);
- break;
- case UPDATE_IGNORE:
- operation = mock(UpdateIgnore.class);
- break;
- case DELETE:
- operation = mock(Delete.class);
- break;
- case DELETE_IGNORE:
- operation = mock(DeleteIgnore.class);
- break;
- default:
- throw new
IllegalArgumentException(String.format("OperationType: %s not supported by
Kudu", operationType));
- }
+ operation = switch (operationType) {
+ case INSERT -> mock(Insert.class);
+ case INSERT_IGNORE -> mock(InsertIgnore.class);
+ case UPSERT -> mock(Upsert.class);
+ case UPDATE -> mock(Update.class);
+ case UPDATE_IGNORE -> mock(UpdateIgnore.class);
+ case DELETE -> mock(Delete.class);
+ case DELETE_IGNORE -> mock(DeleteIgnore.class);
+ };
}
return operation;
}
@@ -140,86 +114,6 @@ public class MockPutKudu extends PutKudu {
actionOnKuduClient.accept(client);
}
- public boolean loggedIn() {
- return loggedIn;
- }
-
- public boolean loggedOut() {
- return loggedOut;
- }
-
- @Override
- protected KerberosUser createKerberosKeytabUser(String principal, String
keytab, ProcessContext context) {
- return createMockKerberosUser(principal);
- }
-
- @Override
- protected KerberosUser createKerberosPasswordUser(String principal, String
password, ProcessContext context) {
- return createMockKerberosUser(principal);
- }
-
- private KerberosUser createMockKerberosUser(final String principal) {
- return new KerberosUser() {
-
- @Override
- public void login() {
- loggedIn = true;
- }
-
- @Override
- public void logout() {
- loggedOut = true;
- }
-
- @Override
- public <T> T doAs(final PrivilegedAction<T> action) throws
IllegalStateException {
- return action.run();
- }
-
- @Override
- public <T> T doAs(PrivilegedAction<T> action, ClassLoader
contextClassLoader) throws IllegalStateException {
- return action.run();
- }
-
- @Override
- public <T> T doAs(final PrivilegedExceptionAction<T> action)
throws IllegalStateException, PrivilegedActionException {
- try {
- return action.run();
- } catch (Exception e) {
- throw new PrivilegedActionException(e);
- }
- }
-
- @Override
- public <T> T doAs(PrivilegedExceptionAction<T> action, ClassLoader
contextClassLoader) throws IllegalStateException, PrivilegedActionException {
- try {
- return action.run();
- } catch (Exception e) {
- throw new PrivilegedActionException(e);
- }
- }
-
- @Override
- public boolean checkTGTAndRelogin() {
- return true;
- }
-
- @Override
- public boolean isLoggedIn() {
- return loggedIn && !loggedOut;
- }
-
- @Override
- public String getPrincipal() {
- return principal;
- }
-
- @Override
- public AppConfigurationEntry getConfigurationEntry() {
- return new AppConfigurationEntry("LoginModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, Collections.emptyMap());
- }
- };
- }
@Override
protected KuduSession createKuduSession(final KuduClient client) {
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index eb72477a51..39247984f6 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -17,6 +17,20 @@
package org.apache.nifi.processors.kudu;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
@@ -29,11 +43,8 @@ import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.RowErrorsAndOverflowStatus;
import org.apache.kudu.client.SessionConfiguration.FlushMode;
-import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
-import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
@@ -58,29 +69,11 @@ import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;
-import java.io.IOException;
-import java.io.InputStream;
-import java.math.BigDecimal;
-import java.sql.Timestamp;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -148,50 +141,6 @@ public class TestPutKudu {
testRunner.enableControllerService(readerFactory);
}
- @Test
- public void testCustomValidate() throws InitializationException {
- createRecordReader(1);
-
- testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal");
- testRunner.assertNotValid();
-
- testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL);
- testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password");
- testRunner.assertNotValid();
-
- testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal");
- testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password");
- testRunner.assertValid();
-
- final KerberosCredentialsService kerberosCredentialsService = new
MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab");
- testRunner.addControllerService("kerb", kerberosCredentialsService);
- testRunner.enableControllerService(kerberosCredentialsService);
- testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb");
- testRunner.assertNotValid();
-
- testRunner.removeProperty(PutKudu.KERBEROS_PRINCIPAL);
- testRunner.removeProperty(PutKudu.KERBEROS_PASSWORD);
- testRunner.assertValid();
-
- final KerberosUserService kerberosUserService =
enableKerberosUserService(testRunner);
- testRunner.setProperty(PutKudu.KERBEROS_USER_SERVICE,
kerberosUserService.getIdentifier());
- testRunner.assertNotValid();
-
- testRunner.removeProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE);
- testRunner.assertValid();
-
- testRunner.setProperty(PutKudu.KERBEROS_PRINCIPAL, "principal");
- testRunner.setProperty(PutKudu.KERBEROS_PASSWORD, "password");
- testRunner.assertNotValid();
- }
-
- private KerberosUserService enableKerberosUserService(final TestRunner
runner) throws InitializationException {
- final KerberosUserService kerberosUserService =
mock(KerberosUserService.class);
- when(kerberosUserService.getIdentifier()).thenReturn("userService1");
- runner.addControllerService(kerberosUserService.getIdentifier(),
kerberosUserService);
- runner.enableControllerService(kerberosUserService);
- return kerberosUserService;
- }
@Test
public void testWriteKuduWithDefaults() throws InitializationException {
@@ -221,39 +170,6 @@ public class TestPutKudu {
assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
}
- @Test
- public void testKerberosEnabled() throws InitializationException {
- createRecordReader(1);
-
- final KerberosCredentialsService kerberosCredentialsService = new
MockKerberosCredentialsService("unit-test-principal", "unit-test-keytab");
- testRunner.addControllerService("kerb", kerberosCredentialsService);
- testRunner.enableControllerService(kerberosCredentialsService);
-
- testRunner.setProperty(PutKudu.KERBEROS_CREDENTIALS_SERVICE, "kerb");
-
- testRunner.run(1, false);
-
- final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor();
- assertTrue(proc.loggedIn());
- assertFalse(proc.loggedOut());
-
- testRunner.run(1, true, false);
- assertTrue(proc.loggedOut());
- }
-
- @Test
- public void testInsecureClient() throws InitializationException {
- createRecordReader(1);
-
- testRunner.run(1, false);
-
- final MockPutKudu proc = (MockPutKudu) testRunner.getProcessor();
- assertFalse(proc.loggedIn());
- assertFalse(proc.loggedOut());
-
- testRunner.run(1, true, false);
- assertFalse(proc.loggedOut());
- }
@Test
@@ -751,7 +667,7 @@ public class TestPutKudu {
flowFileResponses = flowFileResponses.subList(sliceSize,
flowFileResponses.size());
List<OperationResponse> batch = new ArrayList<>();
- for (OperationResponse response :
slice.stream().flatMap(List::stream).collect(Collectors.toList())) {
+ for (OperationResponse response :
slice.stream().flatMap(List::stream).toList()) {
if (batch.size() == batchSize) {
flushes.add(batch);
batch = new ArrayList<>();
@@ -814,24 +730,4 @@ public class TestPutKudu {
public void testKuduPartialFailuresOnManualFlush() throws Exception {
testKuduPartialFailure(FlushMode.MANUAL_FLUSH);
}
-
- public static class MockKerberosCredentialsService extends
AbstractControllerService implements KerberosCredentialsService {
- private final String keytab;
- private final String principal;
-
- public MockKerberosCredentialsService(final String keytab, final
String principal) {
- this.keytab = keytab;
- this.principal = principal;
- }
-
- @Override
- public String getKeytab() {
- return keytab;
- }
-
- @Override
- public String getPrincipal() {
- return principal;
- }
- }
}