This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 13e5dc4  NIFI-8435 Added Kudu Client Worker Count property
13e5dc4 is described below

commit 13e5dc4691c8d6f7fb8fafe6ed16caeb603cd315
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Apr 21 13:02:28 2021 -0500

    NIFI-8435 Added Kudu Client Worker Count property
    
    - Implemented custom ThreadPoolExecutor with maximum pool size based on 
Worker Count property
    - Refactored processing methods to ensure KuduSession is always closed
    - Added SystemResourceConsideration for Memory
    - Removed duplicate dependency on nifi-security-kerberos
    - Adjusted method naming to clarify functionality
    - Reverted addition of defaultAdminOperationTimeoutMs()
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #5020.
---
 .../nifi-kudu-bundle/nifi-kudu-processors/pom.xml  |   6 -
 .../processors/kudu/AbstractKuduProcessor.java     |  63 ++++++++-
 .../org/apache/nifi/processors/kudu/PutKudu.java   | 146 ++++++++++++---------
 .../apache/nifi/processors/kudu/MockPutKudu.java   |   4 +-
 4 files changed, 146 insertions(+), 73 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
index 7eb99b8..03fdc6d 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
@@ -90,12 +90,6 @@
             <version>1.14.0-SNAPSHOT</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-security-kerberos</artifactId>
-            <version>1.14.0-SNAPSHOT</version>
-        </dependency>
-
-        <dependency>
             <groupId>org.apache.kudu</groupId>
             <artifactId>kudu-test-utils</artifactId>
             <version>${kudu.version}</version>
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
index a3ef9da..8d5f2b4 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKuduProcessor.java
@@ -60,7 +60,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -123,6 +129,16 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    private static final int DEFAULT_WORKER_COUNT = 2 * 
Runtime.getRuntime().availableProcessors();
+    static final PropertyDescriptor WORKER_COUNT = new Builder()
+            .name("worker-count")
+            .displayName("Kudu Client Worker Count")
+            .description("The maximum number of worker threads handling Kudu 
client read and write operations. Defaults to the number of available 
processors multiplied by 2.")
+            .required(true)
+            .defaultValue(Integer.toString(DEFAULT_WORKER_COUNT))
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
     private volatile KuduClient kuduClient;
     private final ReadWriteLock kuduClientReadWriteLock = new 
ReentrantReadWriteLock();
     private final Lock kuduClientReadLock = kuduClientReadWriteLock.readLock();
@@ -184,10 +200,25 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
         final String masters = 
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
         final int operationTimeout = 
context.getProperty(KUDU_OPERATION_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
         final int adminOperationTimeout = 
context.getProperty(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        final int workerCount = context.getProperty(WORKER_COUNT).asInteger();
+
+        // Create Executor following approach of 
Executors.newCachedThreadPool() using worker count as maximum pool size
+        final int corePoolSize = 0;
+        final long threadKeepAliveTime = 60;
+        final Executor nioExecutor = new ThreadPoolExecutor(
+                corePoolSize,
+                workerCount,
+                threadKeepAliveTime,
+                TimeUnit.SECONDS,
+                new SynchronousQueue<>(),
+                new ClientThreadFactory(getIdentifier())
+        );
 
         return new KuduClient.KuduClientBuilder(masters)
                 .defaultOperationTimeoutMs(operationTimeout)
                 .defaultSocketReadTimeoutMs(adminOperationTimeout)
+                .workerCount(workerCount)
+                .nioExecutor(nioExecutor)
                 .build();
     }
 
@@ -292,7 +323,7 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
     }
 
     @VisibleForTesting
-    protected void buildPartialRow(Schema schema, PartialRow row, Record 
record, List<String> fieldNames, Boolean ignoreNull, Boolean lowercaseFields) {
+    protected void buildPartialRow(Schema schema, PartialRow row, Record 
record, List<String> fieldNames, boolean ignoreNull, boolean lowercaseFields) {
         for (String recordFieldName : fieldNames) {
             String colName = recordFieldName;
             if (lowercaseFields) {
@@ -431,4 +462,34 @@ public abstract class AbstractKuduProcessor extends 
AbstractProcessor {
 
         return alterTable;
     }
+
+    private static class ClientThreadFactory implements ThreadFactory {
+        private final ThreadFactory defaultThreadFactory = 
Executors.defaultThreadFactory();
+
+        private final AtomicInteger threadCount = new AtomicInteger();
+
+        private final String identifier;
+
+        private ClientThreadFactory(final String identifier) {
+            this.identifier = identifier;
+        }
+
+        /**
+         * Create new daemon Thread with custom name
+         *
+         * @param runnable Runnable
+         * @return Created Thread
+         */
+        @Override
+        public Thread newThread(final Runnable runnable) {
+            final Thread thread = defaultThreadFactory.newThread(runnable);
+            thread.setDaemon(true);
+            thread.setName(getName());
+            return thread;
+        }
+
+        private String getName() {
+            return String.format("PutKudu[%s]-client-%d", identifier, 
threadCount.getAndIncrement());
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 1fb9199..ba6b929 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -30,6 +30,8 @@ import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -78,6 +80,7 @@ import static 
org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIB
 import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
 import static 
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
 
+@SystemResourceConsideration(resource = SystemResource.MEMORY)
 @EventDriven
 @SupportsBatching
 @RequiresInstanceClassLoading // Because of calls to 
UserGroupInformation.setConfiguration
@@ -296,6 +299,7 @@ public class PutKudu extends AbstractKuduProcessor {
         properties.add(IGNORE_NULL);
         properties.add(KUDU_OPERATION_TIMEOUT_MS);
         properties.add(KUDU_KEEP_ALIVE_PERIOD_TIMEOUT_MS);
+        properties.add(WORKER_COUNT);
         return properties;
     }
 
@@ -342,12 +346,12 @@ public class PutKudu extends AbstractKuduProcessor {
 
         final KerberosUser user = getKerberosUser();
         if (user == null) {
-            executeOnKuduClient(kuduClient -> trigger(context, session, 
flowFiles, kuduClient));
+            executeOnKuduClient(kuduClient -> processFlowFiles(context, 
session, flowFiles, kuduClient));
             return;
         }
 
         final PrivilegedExceptionAction<Void> privilegedAction = () -> {
-            executeOnKuduClient(kuduClient -> trigger(context, session, 
flowFiles, kuduClient));
+            executeOnKuduClient(kuduClient -> processFlowFiles(context, 
session, flowFiles, kuduClient));
             return null;
         };
 
@@ -355,25 +359,60 @@ public class PutKudu extends AbstractKuduProcessor {
         action.execute();
     }
 
-    private void trigger(final ProcessContext context, final ProcessSession 
session, final List<FlowFile> flowFiles, KuduClient kuduClient) throws 
ProcessException {
-        final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+    private void processFlowFiles(final ProcessContext context, final 
ProcessSession session, final List<FlowFile> flowFiles, final KuduClient 
kuduClient) {
+        final Map<FlowFile, Integer> processedRecords = new HashMap<>();
+        final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
+        final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
+        final List<RowError> pendingRowErrors = new ArrayList<>();
 
         final KuduSession kuduSession = createKuduSession(kuduClient);
+        try {
+            processRecords(flowFiles,
+                    processedRecords,
+                    flowFileFailures,
+                    operationFlowFileMap,
+                    pendingRowErrors,
+                    session,
+                    context,
+                    kuduClient,
+                    kuduSession);
+        } finally {
+            try {
+                flushKuduSession(kuduSession, true, pendingRowErrors);
+            } catch (final KuduException|RuntimeException e) {
+                getLogger().error("KuduSession.close() Failed", e);
+            }
+        }
 
-        final Map<FlowFile, Integer> numRecords = new HashMap<>();
-        final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
-        final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
+        if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() || 
!flowFileFailures.isEmpty())) {
+            logFailures(pendingRowErrors, operationFlowFileMap);
+            session.rollback();
+            context.yield();
+        } else {
+            transferFlowFiles(flowFiles, processedRecords, flowFileFailures, 
operationFlowFileMap, pendingRowErrors, session);
+        }
+    }
 
-        int numBuffered = 0;
+    private void processRecords(final List<FlowFile> flowFiles,
+                                 final Map<FlowFile, Integer> processedRecords,
+                                 final Map<FlowFile, Object> flowFileFailures,
+                                 final Map<Operation, FlowFile> 
operationFlowFileMap,
+                                 final List<RowError> pendingRowErrors,
+                                 final ProcessSession session,
+                                 final ProcessContext context,
+                                 final KuduClient kuduClient,
+                                 final KuduSession kuduSession) {
+        final RecordReaderFactory recordReaderFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+        int bufferedRecords = 0;
         OperationType prevOperationType = OperationType.INSERT;
-        final List<RowError> pendingRowErrors = new ArrayList<>();
         for (FlowFile flowFile : flowFiles) {
             try (final InputStream in = session.read(flowFile);
-                final RecordReader recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+                 final RecordReader recordReader = 
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
 
                 final String tableName = getEvaluatedProperty(TABLE_NAME, 
context, flowFile);
-                final Boolean ignoreNull = 
Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
-                final Boolean lowercaseFields = 
Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
+                final boolean ignoreNull = 
Boolean.parseBoolean(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
+                final boolean lowercaseFields = 
Boolean.parseBoolean(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, 
flowFile));
                 final boolean handleSchemaDrift = 
Boolean.parseBoolean(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, 
flowFile));
 
                 final Function<Record, OperationType> operationTypeFunction;
@@ -391,11 +430,11 @@ public class PutKudu extends AbstractKuduProcessor {
                 if (handleSchemaDrift) {
                     final Schema schema = kuduTable.getSchema();
                     final List<RecordField> missing = 
recordReader.getSchema().getFields().stream()
-                        .filter(field -> !schema.hasColumn(lowercaseFields ? 
field.getFieldName().toLowerCase() : field.getFieldName()))
-                        .collect(Collectors.toList());
+                            .filter(field -> !schema.hasColumn(lowercaseFields 
? field.getFieldName().toLowerCase() : field.getFieldName()))
+                            .collect(Collectors.toList());
 
                     if (!missing.isEmpty()) {
-                        getLogger().info("adding {} columns to table '{}' to 
handle schema drift", new Object[]{missing.size(), tableName});
+                        getLogger().info("adding {} columns to table '{}' to 
handle schema drift", missing.size(), tableName);
 
                         // Add each column one at a time to avoid failing if 
some of the missing columns
                         // we created by a concurrent thread or application 
attempting to handle schema drift.
@@ -407,7 +446,7 @@ public class PutKudu extends AbstractKuduProcessor {
                                 // Ignore the exception if the column already 
exists due to concurrent
                                 // threads or applications attempting to 
handle schema drift.
                                 if (e.getStatus().isAlreadyPresent()) {
-                                    getLogger().info("Column already exists in 
table '{}' while handling schema drift", new Object[]{tableName});
+                                    getLogger().info("Column already exists in 
table '{}' while handling schema drift", tableName);
                                 } else {
                                     throw new ProcessException(e);
                                 }
@@ -437,7 +476,7 @@ public class PutKudu extends AbstractKuduProcessor {
                             final RecordFieldType fieldType = 
fieldValue.getField().getDataType().getFieldType();
                             if (fieldType != RecordFieldType.RECORD) {
                                 throw new ProcessException("RecordPath " + 
dataRecordPath.getPath() + " evaluated against Record expected to return one or 
more Records but encountered field of type" +
-                                    " " + fieldType);
+                                        " " + fieldType);
                             }
                         }
 
@@ -455,7 +494,7 @@ public class PutKudu extends AbstractKuduProcessor {
                         // This should be removed when the lowest supported 
version of Kudu supports
                         // ignore operations.
                         if (!supportsInsertIgnoreOp && prevOperationType != 
operationType
-                            && (prevOperationType == 
OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
+                                && (prevOperationType == 
OperationType.INSERT_IGNORE || operationType == OperationType.INSERT_IGNORE)) {
                             flushKuduSession(kuduSession, false, 
pendingRowErrors);
                             
kuduSession.setIgnoreAllDuplicateRows(operationType == 
OperationType.INSERT_IGNORE);
                         }
@@ -470,8 +509,8 @@ public class PutKudu extends AbstractKuduProcessor {
                         // Flush mutation buffer of KuduSession to avoid 
"MANUAL_FLUSH is enabled
                         // but the buffer is too big" error. This can happen 
when flush mode is
                         // MANUAL_FLUSH and a FlowFile has more than one 
records.
-                        if (numBuffered == batchSize && flushMode == 
SessionConfiguration.FlushMode.MANUAL_FLUSH) {
-                            numBuffered = 0;
+                        if (bufferedRecords == batchSize && flushMode == 
SessionConfiguration.FlushMode.MANUAL_FLUSH) {
+                            bufferedRecords = 0;
                             flushKuduSession(kuduSession, false, 
pendingRowErrors);
                         }
 
@@ -484,8 +523,8 @@ public class PutKudu extends AbstractKuduProcessor {
                             break recordReaderLoop;
                         }
 
-                        numBuffered++;
-                        numRecords.merge(flowFile, 1, Integer::sum);
+                        bufferedRecords++;
+                        processedRecords.merge(flowFile, 1, Integer::sum);
                     }
 
                     record = recordSet.next();
@@ -495,58 +534,37 @@ public class PutKudu extends AbstractKuduProcessor {
                 flowFileFailures.put(flowFile, ex);
             }
         }
+    }
 
-        // If configured to rollback on failure, and there's at least one 
error, rollback the session and return.
-        if (isRollbackOnFailure() && (!pendingRowErrors.isEmpty() || 
!flowFileFailures.isEmpty())) {
-            logFailures(pendingRowErrors, operationFlowFileMap);
-            session.rollback();
-            context.yield();
-            return;
-        }
-
-        // If any data is buffered, flush it.
-        if (numBuffered > 0) {
-            try {
-                flushKuduSession(kuduSession, true, pendingRowErrors);
-            } catch (final Exception e) {
-                getLogger().error("Failed to flush/close Kudu Session", e);
-                for (final FlowFile flowFile : flowFiles) {
-                    session.transfer(flowFile, REL_FAILURE);
-                }
-
-                return;
-            }
-        }
-
-        // It's possible that there were no row errors when this was checked 
above, but flushing the Kudu session may have introduced
-        // one or more Row Errors. So we need to check again.
-        if (isRollbackOnFailure() && !pendingRowErrors.isEmpty()) {
-            logFailures(pendingRowErrors, operationFlowFileMap);
-            session.rollback();
-            context.yield();
-            return;
-        }
-
+    private void transferFlowFiles(final List<FlowFile> flowFiles,
+                                   final Map<FlowFile, Integer> 
processedRecords,
+                                   final Map<FlowFile, Object> 
flowFileFailures,
+                                   final Map<Operation, FlowFile> 
operationFlowFileMap,
+                                   final List<RowError> pendingRowErrors,
+                                   final ProcessSession session) {
         // Find RowErrors for each FlowFile
-        final Map<FlowFile, List<RowError>> flowFileRowErrors = 
pendingRowErrors.stream().collect(
-            Collectors.groupingBy(e -> 
operationFlowFileMap.get(e.getOperation())));
+        final Map<FlowFile, List<RowError>> flowFileRowErrors = 
pendingRowErrors.stream()
+                .filter(e -> operationFlowFileMap.get(e.getOperation()) != 
null)
+                .collect(
+                    Collectors.groupingBy(e -> 
operationFlowFileMap.get(e.getOperation()))
+                );
 
         long totalCount = 0L;
-        for (final FlowFile flowFile : flowFiles) {
-            final int count = numRecords.getOrDefault(flowFile, 0);
+        for (FlowFile flowFile : flowFiles) {
+            final int count = processedRecords.getOrDefault(flowFile, 0);
             totalCount += count;
             final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
 
             if (rowErrors != null) {
-                rowErrors.forEach(rowError -> getLogger().error("Failed to 
write due to {}", new Object[] {rowError.toString()}));
-                session.putAttribute(flowFile, RECORD_COUNT_ATTR, 
String.valueOf(count - rowErrors.size()));
+                rowErrors.forEach(rowError -> getLogger().error("Failed to 
write due to {}", rowError.toString()));
+                flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, 
Integer.toString(count - rowErrors.size()));
                 totalCount -= rowErrors.size(); // Don't include error rows in 
the the counter.
                 session.transfer(flowFile, REL_FAILURE);
             } else {
-                session.putAttribute(flowFile, RECORD_COUNT_ATTR, 
String.valueOf(count));
+                flowFile = session.putAttribute(flowFile, RECORD_COUNT_ATTR, 
String.valueOf(count));
 
                 if (flowFileFailures.containsKey(flowFile)) {
-                    getLogger().error("Failed to write due to {}", new 
Object[]{flowFileFailures.get(flowFile)});
+                    getLogger().error("Failed to write due to {}", 
flowFileFailures.get(flowFile));
                     session.transfer(flowFile, REL_FAILURE);
                 } else {
                     session.transfer(flowFile, REL_SUCCESS);
@@ -566,7 +584,7 @@ public class PutKudu extends AbstractKuduProcessor {
             final FlowFile flowFile = entry.getKey();
             final List<RowError> errors = entry.getValue();
 
-            getLogger().error("Could not write {} to Kudu due to: {}", new 
Object[] {flowFile, errors});
+            getLogger().error("Could not write {} to Kudu due to: {}", 
flowFile, errors);
         }
     }
 
@@ -586,8 +604,8 @@ public class PutKudu extends AbstractKuduProcessor {
     }
 
     protected Operation createKuduOperation(OperationType operationType, 
Record record,
-                                          List<String> fieldNames, Boolean 
ignoreNull,
-                                          Boolean lowercaseFields, KuduTable 
kuduTable) {
+                                          List<String> fieldNames, boolean 
ignoreNull,
+                                          boolean lowercaseFields, KuduTable 
kuduTable) {
         Operation operation;
         switch (operationType) {
             case INSERT:
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index 1aff599..f394c0c 100644
--- 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -72,8 +72,8 @@ public class MockPutKudu extends PutKudu {
 
     @Override
     protected Operation createKuduOperation(OperationType operationType, 
Record record,
-                                            List<String> fieldNames, Boolean 
ignoreNull,
-                                            Boolean lowercaseFields, KuduTable 
kuduTable) {
+                                            List<String> fieldNames, boolean 
ignoreNull,
+                                            boolean lowercaseFields, KuduTable 
kuduTable) {
         Operation operation = opQueue.poll();
         if (operation == null) {
             switch (operationType) {

Reply via email to