This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 13e5dc4 NIFI-8435 Added Kudu Client Worker Count property
13e5dc4 is described below
commit 13e5dc4691c8d6f7fb8fafe6ed16caeb603cd315
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Apr 21 13:02:28 2021 -0500
NIFI-8435 Added Kudu Client Worker Count property
- Implemented custom ThreadPoolExecutor with maximum pool size based on
Worker Count property
- Refactored processing methods to ensure KuduSession is always closed
- Added SystemResourceConsideration for Memory
- Removed duplicate dependency on nifi-security-kerberos
- Adjusted method naming to clarify functionality
- Reverted addition of defaultAdminOperationTimeoutMs()
Signed-off-by: Pierre Villard <[email protected]>
This closes #5020.
---
.../nifi-kudu-bundle/nifi-kudu-processors/pom.xml | 6 -
.../processors/kudu/AbstractKuduProcessor.java | 63 ++++++++-
.../org/apache/nifi/processors/kudu/PutKudu.java | 146 ++++++++++++---------
.../apache/nifi/processors/kudu/MockPutKudu.java | 4 +-
4 files changed, 146 insertions(+), 73 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
index 7eb99b8..03fdc6d 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
@@ -90,12 +90,6 @@
<version>1.14.0-SNAPSHOT</version>
</dependency>
<dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-security-kerberos</artifactId>
- <version>1.14.0-SNAPSHOT</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-test-utils</artifactId>
<version>${kudu.version}</version>
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index a3ef9da..8d5f2b4 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -60,7 +60,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -123,6 +129,16 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ private static final int DEFAULT_WORKER_COUNT = 2 *
Runtime.getRuntime().availableProcessors();
+ static final PropertyDescriptor WORKER_COUNT = new Builder()
+ .name("worker-count")
+ .displayName("Kudu Client Worker Count")
+ .description("The maximum number of worker threads handling Kudu
client read and write operations. Defaults to the number of available
processors multiplied by 2.")
+ .required(true)
+ .defaultValue(Integer.toString(DEFAULT_WORKER_COUNT))
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
private volatile KuduClient kuduClient;
private final ReadWriteLock kuduClientReadWriteLock = new
ReentrantReadWriteLock();
private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
@@ -184,10 +200,25 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
final String masters =
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
final int operationTimeout =
context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final int adminOperationTimeout =
context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final int workerCount = context.getProperty(WORKER_COUNT).asInteger();
+
+ // Create Executor following approach of
Executors.newCachedThreadPool() using worker count as maximum pool size
+ final int corePoolSize = 0;
+ final long threadKeepAliveTime = 60;
+ final Executor nioExecutor = new ThreadPoolExecutor(
+ corePoolSize,
+ workerCount,
+ threadKeepAliveTime,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new ClientThreadFactory(getIdentifier())
+ );
return new KuduClient.KuduClientBuilder(masters)
.defaultOperationTimeoutMs(operationTimeout)
.defaultSocketReadTimeoutMs(adminOperationTimeout)
+ .workerCount(workerCount)
+ .nioExecutor(nioExecutor)
.build();
}
@@ -292,7 +323,7 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
}
@VisibleForTesting
- protected void buildPartialRow(Schema schema, PartialRow row, Record
record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
+ protected void buildPartialRow(Schema schema, PartialRow row, Record
record, List<String> fieldNames, boolean ignoreNull, boolean lowercaseFields) {
for (String recordFieldName : fieldNames) {
String colName = recordFieldName;
if (lowercaseFields) {
@@ -431,4 +462,34 @@ public abstract class AbstractKuduProcessor extends
AbstractProcessor {
return alterTable;
}
+
+ private static class ClientThreadFactory implements ThreadFactory {
+ private final ThreadFactory defaultThreadFactory =
Executors.defaultThreadFactory();
+
+ private final AtomicInteger threadCount = new AtomicInteger();
+
+ private final String identifier;
+
+ private ClientThreadFactory(final String identifier) {
+ this.identifier = identifier;
+ }
+
+ /**
+ * Create new daemon Thread with custom name
+ *
+ * @param runnable Runnable
+ * @return Created Thread
+ */
+ @Override
+ public Thread newThread(final Runnable runnable) {
+ final Thread thread = defaultThreadFactory.newThread(runnable);
+ thread.setDaemon(true);
+ thread.setName(getName());
+ return thread;
+ }
+
+ private String getName() {
+ return String.format("PutKudu[%s]-client-%d", identifier,
threadCount.getAndIncrement());
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 1fb9199..ba6b929 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -30,6 +30,8 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -78,6 +80,7 @@ import static
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIB
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
@EventDriven
@SupportsBatching
@RequiresInstanceClassLoading // Because of calls to
UserGroupInformation.setConfiguration
@@ -296,6 +299,7 @@ public class PutKudu extends AbstractKuduProcessor {
properties.add(IGNORE_NULL);
properties.add(KUDU_OPERATION_TIMEOUT_MS);
properties.add(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS);
+ properties.add(WORKER_COUNT);
return properties;
}
@@ -342,12 +346,12 @@ public class PutKudu extends AbstractKuduProcessor {
final KerberosUser user = getKerberosUser();
if (user == null) {
- executeOnKuduClient(kuduClient -> trigger(context, session,
flowFiles, kuduClient));
+ executeOnKuduClient(kuduClient -> processFlowFiles(context,
session, flowFiles, kuduClient));
return;
}
final PrivilegedExceptionAction<Void> privilegedAction = () -> {
- executeOnKuduClient(kuduClient -> trigger(context, session,
flowFiles, kuduClient));
+ executeOnKuduClient(kuduClient -> processFlowFiles(context,
session, flowFiles, kuduClient));
return null;
};
@@ -355,25 +359,60 @@ public class PutKudu extends AbstractKuduProcessor {
action.execute();
}
- private void trigger(final ProcessContext context, final ProcessSession
session, final List<FlowFile> flowFiles, KuduClient kuduClient) throws
ProcessException {
- final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ private void processFlowFiles(final ProcessContext context, final
ProcessSession session, final List<FlowFile> flowFiles, final KuduClient
kuduClient) {
+ final Map<FlowFile, Integer> processedRecords = new HashMap<>();
+ final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
+ final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
+ final List<RowError> pendingRowErrors = new ArrayList<>();
final KuduSession kuduSession = createKuduSession(kuduClient);
+ try {
+ processRecords(flowFiles,
+ processedRecords,
+ flowFileFailures,
+ operationFlowFileMap,
+ pendingRowErrors,
+ session,
+ context,
+ kuduClient,
+ kuduSession);
+ } finally {
+ try {
+ flushKuduSession(kuduSession, true, pendingRowErrors);
+ } catch (final KuduException|RuntimeException e) {
+ getLogger().error("KuduSession.close() Failed", e);
+ }
+ }
- final Map<FlowFile, Integer> numRecords = new HashMap<>();
- final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
- final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
+ if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() ||
!flowFileFailures.isEmpty())) {
+ logFailures(pendingRowErrors, operationFlowFileMap);
+ session.rollback();
+ context.yield();
+ } else {
+ transferFlowFiles(flowFiles, processedRecords, flowFileFailures,
operationFlowFileMap, pendingRowErrors, session);
+ }
+ }
- int numBuffered = 0;
+ private void processRecords(final List<FlowFile> flowFiles,
+ final Map<FlowFile, Integer> processedRecords,
+ final Map<FlowFile, Object> flowFileFailures,
+ final Map<Operation, FlowFile>
operationFlowFileMap,
+ final List<RowError> pendingRowErrors,
+ final ProcessSession session,
+ final ProcessContext context,
+ final KuduClient kuduClient,
+ final KuduSession kuduSession) {
+ final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+ int bufferedRecords = 0;
OperationType prevOperationType = OperationType.INSERT;
- final List<RowError> pendingRowErrors = new ArrayList<>();
for (FlowFile flowFile : flowFiles) {
try (final InputStream in = session.read(flowFile);
- final RecordReader recordReader =
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+ final RecordReader recordReader =
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final String tableName = getEvaluatedProperty(TABLE_NAME,
context, flowFile);
- final Boolean ignoreNull =
Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
- final Boolean lowercaseFields =
Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+ final boolean ignoreNull =
Boolean.parseBoolean(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+ final boolean lowercaseFields =
Boolean.parseBoolean(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context,
flowFile));
final boolean handleSchemaDrift =
Boolean.parseBoolean(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context,
flowFile));
final Function<Record, OperationType> operationTypeFunction;
@@ -391,11 +430,11 @@ public class PutKudu extends AbstractKuduProcessor {
if (handleSchemaDrift) {
final Schema schema = kuduTable.getSchema();
final List<RecordField> missing =
recordReader.getSchema().getFields().stream()
- .filter(field -> !schema.hasColumn(lowercaseFields ?
field.getFieldName().toLowerCase() : field.getFieldName()))
- .collect(Collectors.toList());
+ .filter(field -> !schema.hasColumn(lowercaseFields
? field.getFieldName().toLowerCase() : field.getFieldName()))
+ .collect(Collectors.toList());
if (!missing.isEmpty()) {
- getLogger().info("adding {} columns to table '{}' to
handle schema drift", new Object[]{missing.size(), tableName});
+ getLogger().info("adding {} columns to table '{}' to
handle schema drift", missing.size(), tableName);
// Add each column one at a time to avoid failing if
some of the missing columns
// we created by a concurrent thread or application
attempting to handle schema drift.
@@ -407,7 +446,7 @@ public class PutKudu extends AbstractKuduProcessor {
// Ignore the exception if the column already
exists due to concurrent
// threads or applications attempting to
handle schema drift.
if (e.getStatus().isAlreadyPresent()) {
- getLogger().info("Column already exists in
table '{}' while handling schema drift", new Object[]{tableName});
+ getLogger().info("Column already exists in
table '{}' while handling schema drift", tableName);
} else {
throw new ProcessException(e);
}
@@ -437,7 +476,7 @@ public class PutKudu extends AbstractKuduProcessor {
final RecordFieldType fieldType =
fieldValue.getField().getDataType().getFieldType();
if (fieldType != RecordFieldType.RECORD) {
throw new ProcessException("RecordPath " +
dataRecordPath.getPath() + " evaluated against Record expected to return one or
more Records but encountered field of type" +
- " " + fieldType);
+ " " + fieldType);
}
}
@@ -455,7 +494,7 @@ public class PutKudu extends AbstractKuduProcessor {
// This should be removed when the lowest supported
version of Kudu supports
// ignore operations.
if (!supportsInsertIgnoreOp && prevOperationType !=
operationType
- && (prevOperationType ==
OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
+ && (prevOperationType ==
OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
flushKuduSession(kuduSession, false,
pendingRowErrors);
kuduSession.setIgnoreAllDuplicateRows(operationType ==
OperationType.INSERT_IGNORE);
}
@@ -470,8 +509,8 @@ public class PutKudu extends AbstractKuduProcessor {
// Flush mutation buffer of KuduSession to avoid
"MANUAL_FLUSH is enabled
// but the buffer is too big" error. This can happen
when flush mode is
// MANUAL_FLUSH and a FlowFile has more than one
records.
- if (numBuffered == batchSize && flushMode ==
SessionConfiguration.FlushMode.MANUAL_FLUSH) {
- numBuffered = 0;
+ if (bufferedRecords == batchSize && flushMode ==
SessionConfiguration.FlushMode.MANUAL_FLUSH) {
+ bufferedRecords = 0;
flushKuduSession(kuduSession, false,
pendingRowErrors);
}
@@ -484,8 +523,8 @@ public class PutKudu extends AbstractKuduProcessor {
break recordReaderLoop;
}
- numBuffered++;
- numRecords.merge(flowFile, 1, Integer::sum);
+ bufferedRecords++;
+ processedRecords.merge(flowFile, 1, Integer::sum);
}
record = recordSet.next();
@@ -495,58 +534,37 @@ public class PutKudu extends AbstractKuduProcessor {
flowFileFailures.put(flowFile, ex);
}
}
+ }
- // If configured to rollback on failure, and there's at least one
error, rollback the session and return.
- if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() ||
!flowFileFailures.isEmpty())) {
- logFailures(pendingRowErrors, operationFlowFileMap);
- session.rollback();
- context.yield();
- return;
- }
-
- // If any data is buffered, flush it.
- if (numBuffered > 0) {
- try {
- flushKuduSession(kuduSession, true, pendingRowErrors);
- } catch (final Exception e) {
- getLogger().error("Failed to flush/close Kudu Session", e);
- for (final FlowFile flowFile : flowFiles) {
- session.transfer(flowFile, REL_FAILURE);
- }
-
- return;
- }
- }
-
- // It's possible that there were no row errors when this was checked
above, but flushing the Kudu session may have introduced
- // one or more Row Errors. So we need to check again.
- if (isRollbackOnFailure() && !pendingRowErrors.isEmpty()) {
- logFailures(pendingRowErrors, operationFlowFileMap);
- session.rollback();
- context.yield();
- return;
- }
-
+ private void transferFlowFiles(final List<FlowFile> flowFiles,
+ final Map<FlowFile, Integer>
processedRecords,
+ final Map<FlowFile, Object>
flowFileFailures,
+ final Map<Operation, FlowFile>
operationFlowFileMap,
+ final List<RowError> pendingRowErrors,
+ final ProcessSession session) {
// Find RowErrors for each FlowFile
- final Map<FlowFile, List<RowError>> flowFileRowErrors =
pendingRowErrors.stream().collect(
- Collectors.groupingBy(e ->
operationFlowFileMap.get(e.getOperation())));
+ final Map<FlowFile, List<RowError>> flowFileRowErrors =
pendingRowErrors.stream()
+ .filter(e -> operationFlowFileMap.get(e.getOperation()) !=
null)
+ .collect(
+ Collectors.groupingBy(e ->
operationFlowFileMap.get(e.getOperation()))
+ );
long totalCount = 0L;
- for (final FlowFile flowFile : flowFiles) {
- final int count = numRecords.getOrDefault(flowFile, 0);
+ for (FlowFile flowFile : flowFiles) {
+ final int count = processedRecords.getOrDefault(flowFile, 0);
totalCount += count;
final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
if (rowErrors != null) {
- rowErrors.forEach(rowError -> getLogger().error("Failed to
write due to {}", new Object[] {rowError.toString()}));
- session.putAttribute(flowFile, RECORD_COUNT_ATTR,
String.valueOf(count - rowErrors.size()));
+ rowErrors.forEach(rowError -> getLogger().error("Failed to
write due to {}", rowError.toString()));
+ flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR,
Integer.toString(count - rowErrors.size()));
totalCount -= rowErrors.size(); // Don't include error rows in
the the counter.
session.transfer(flowFile, REL_FAILURE);
} else {
- session.putAttribute(flowFile, RECORD_COUNT_ATTR,
String.valueOf(count));
+ flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR,
String.valueOf(count));
if (flowFileFailures.containsKey(flowFile)) {
- getLogger().error("Failed to write due to {}", new
Object[]{flowFileFailures.get(flowFile)});
+ getLogger().error("Failed to write due to {}",
flowFileFailures.get(flowFile));
session.transfer(flowFile, REL_FAILURE);
} else {
session.transfer(flowFile, REL_SUCCESS);
@@ -566,7 +584,7 @@ public class PutKudu extends AbstractKuduProcessor {
final FlowFile flowFile = entry.getKey();
final List<RowError> errors = entry.getValue();
- getLogger().error("Could not write {} to Kudu due to: {}", new
Object[] {flowFile, errors});
+ getLogger().error("Could not write {} to Kudu due to: {}",
flowFile, errors);
}
}
@@ -586,8 +604,8 @@ public class PutKudu extends AbstractKuduProcessor {
}
protected Operation createKuduOperation(OperationType operationType,
Record record,
- List<String> fieldNames, Boolean
ignoreNull,
- Boolean lowercaseFields, KuduTable
kuduTable) {
+ List<String> fieldNames, boolean
ignoreNull,
+ boolean lowercaseFields, KuduTable
kuduTable) {
Operation operation;
switch (operationType) {
case INSERT:
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index 1aff599..f394c0c 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -72,8 +72,8 @@ public class MockPutKudu extends PutKudu {
@Override
protected Operation createKuduOperation(OperationType operationType,
Record record,
- List<String> fieldNames, Boolean
ignoreNull,
- Boolean lowercaseFields, KuduTable
kuduTable) {
+ List<String> fieldNames, boolean
ignoreNull,
+ boolean lowercaseFields, KuduTable
kuduTable) {
Operation operation = opQueue.poll();
if (operation == null) {
switch (operationType) {