This is an automated email from the ASF dual-hosted git repository.
jstorck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new fd3d69b NIFI-5949: Addressed problematic error handling logic in
PutKudu. Also removed AbstractKudu and moved all logic into PutKudu, as the
abstract class was clearly designed to be extended only by PutKudu and no other
processors exist for Kudu
fd3d69b is described below
commit fd3d69bc9097e273756432a545eab7d15253d794
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jan 10 16:26:13 2019 -0500
NIFI-5949: Addressed problematic error handling logic in PutKudu. Also
removed AbstractKudu and moved all logic into PutKudu, as the abstract class
was clearly designed to be extended only by PutKudu and no other processors
exist for Kudu
Added explicit .toString() call on rowError to avoid IllegalStateException
during logging of row errors
This closes #3262.
---
.../nifi/controller/StandardProcessorNode.java | 6 +-
.../apache/nifi/processors/kudu/AbstractKudu.java | 304 ---------------------
.../org/apache/nifi/processors/kudu/PutKudu.java | 273 +++++++++++++++++-
.../apache/nifi/processors/kudu/MockPutKudu.java | 75 ++---
.../apache/nifi/processors/kudu/TestPutKudu.java | 31 +--
5 files changed, 315 insertions(+), 374 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index a68eaab..e59e7fd 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -77,6 +77,7 @@ import org.springframework.util.Assert;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
+import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
@@ -1544,9 +1545,10 @@ public class StandardProcessorNode extends ProcessorNode
implements Connectable
} finally {
schedulingAgentCallback.onTaskComplete();
}
- } catch (final Exception e) {
+ } catch (Exception e) {
+ final Throwable cause = (e instanceof
InvocationTargetException) ? e.getCause() : e;
procLog.error("Failed to properly initialize Processor. If
still scheduled to run, NiFi will attempt to "
- + "initialize and run the Processor again after the
'Administrative Yield Duration' has elapsed. Failure is due to " + e, e);
+ + "initialize and run the Processor again after the
'Administrative Yield Duration' has elapsed. Failure is due to " + cause,
cause);
// If processor's task completed Exceptionally, then we want
to retry initiating the start (if Processor is still scheduled to run).
try (final NarCloseable nc =
NarCloseable.withComponentNarLoader(getExtensionManager(),
processor.getClass(), processor.getIdentifier())) {
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
deleted file mode 100644
index 8e78eb4..0000000
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kudu;
-
-import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.KuduSession;
-import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.Operation;
-import org.apache.kudu.client.OperationResponse;
-import org.apache.kudu.client.RowError;
-import org.apache.kudu.client.SessionConfiguration.FlushMode;
-import org.apache.kudu.client.Upsert;
-import org.apache.kudu.client.SessionConfiguration;
-
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.flowfile.FlowFile;
-
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.serialization.record.Record;
-
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-public abstract class AbstractKudu extends AbstractProcessor {
-
- protected static final PropertyDescriptor KUDU_MASTERS = new
PropertyDescriptor.Builder()
- .name("Kudu Masters")
- .description("List all kudu masters's ip with port (e.g. 7051),
comma separated")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- protected static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
- .name("Table Name")
- .description("The name of the Kudu Table to put data into")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
- .name("record-reader")
- .displayName("Record Reader")
- .description("The service for reading records from incoming flow
files.")
- .identifiesControllerService(RecordReaderFactory.class)
- .required(true)
- .build();
-
- protected static final PropertyDescriptor SKIP_HEAD_LINE = new
PropertyDescriptor.Builder()
- .name("Skip head line")
- .description("Deprecated. Used to ignore header lines, but this
should be handled by a RecordReader " +
- "(e.g. \"Treat First Line as Header\" property of
CSVReader)")
- .allowableValues("true", "false")
- .defaultValue("false")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- protected static final PropertyDescriptor INSERT_OPERATION = new
PropertyDescriptor.Builder()
- .name("Insert Operation")
- .description("Specify operationType for this processor.
Insert-Ignore will ignore duplicated rows")
- .allowableValues(OperationType.values())
- .defaultValue(OperationType.INSERT.toString())
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- protected static final PropertyDescriptor FLUSH_MODE = new
PropertyDescriptor.Builder()
- .name("Flush Mode")
- .description("Set the new flush mode for a kudu session.\n" +
- "AUTO_FLUSH_SYNC: the call returns when the operation is
persisted, else it throws an exception.\n" +
- "AUTO_FLUSH_BACKGROUND: the call returns when the
operation has been added to the buffer. This call should normally perform only
fast in-memory" +
- " operations but it may have to wait when the buffer is
full and there's another buffer being flushed.\n" +
- "MANUAL_FLUSH: the call returns when the operation has
been added to the buffer, else it throws a KuduException if the buffer is
full.")
- .allowableValues(SessionConfiguration.FlushMode.values())
-
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
- .required(true)
- .build();
-
- protected static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
- .name("Batch Size")
- .description("The maximum number of FlowFiles to process in a
single execution, between 1 - 100000. " +
- "Depending on your memory size, and data size per row set
an appropriate batch size. " +
- "Gradually increase this number to find out the best one
for best performances.")
- .defaultValue("100")
- .required(true)
- .addValidator(StandardValidators.createLongValidator(1, 100000,
true))
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .build();
-
- protected static final Relationship REL_SUCCESS = new
Relationship.Builder()
- .name("success")
- .description("A FlowFile is routed to this relationship after it
has been successfully stored in Kudu")
- .build();
- protected static final Relationship REL_FAILURE = new
Relationship.Builder()
- .name("failure")
- .description("A FlowFile is routed to this relationship if it
cannot be sent to Kudu")
- .build();
-
- public static final String RECORD_COUNT_ATTR = "record.count";
-
- protected String kuduMasters;
- protected String tableName;
- protected boolean skipHeadLine;
- protected OperationType operationType;
- protected SessionConfiguration.FlushMode flushMode;
- protected int batchSize = 100;
-
- protected KuduClient kuduClient;
- protected KuduTable kuduTable;
-
- @OnScheduled
- public void OnScheduled(final ProcessContext context) {
- try {
- tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
- kuduMasters =
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
- operationType =
OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
- batchSize =
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
- flushMode =
SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
- skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean();
-
- if (kuduClient == null) {
- getLogger().debug("Setting up Kudu connection...");
- kuduClient = getKuduConnection(kuduMasters);
- kuduTable = this.getKuduTable(kuduClient, tableName);
- getLogger().debug("Kudu connection successfully initialized");
- }
- } catch(KuduException ex){
- getLogger().error("Exception occurred while interacting with Kudu
due to " + ex.getMessage(), ex);
- }
- }
-
- @OnStopped
- public final void closeClient() throws KuduException {
- if (kuduClient != null) {
- getLogger().info("Closing KuduClient");
- kuduClient.close();
- kuduClient = null;
- }
- }
-
- private Stream<RowError> flushKuduSession(final KuduSession kuduSession,
boolean close) throws Exception {
- List<OperationResponse> responses = close ? kuduSession.close() :
kuduSession.flush();
- Stream<RowError> rowErrors;
- if (kuduSession.getFlushMode() == FlushMode.AUTO_FLUSH_BACKGROUND) {
- rowErrors =
Stream.of(kuduSession.getPendingErrors().getRowErrors());
- } else {
- rowErrors = responses.stream()
- .filter(OperationResponse::hasRowError)
- .map(OperationResponse::getRowError);
- }
- return rowErrors;
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- final List<FlowFile> flowFiles = session.get(batchSize);
- if (flowFiles.isEmpty()) return;
-
- final KuduSession kuduSession = getKuduSession(kuduClient);
- final RecordReaderFactory recordReaderFactory =
-
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
-
- final Map<FlowFile, Integer> numRecords = new HashMap<>();
- final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
- final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
-
- int numBuffered = 0;
- Stream<RowError> pendingRowErrors = Stream.empty();
- for (FlowFile flowFile : flowFiles) {
- try (final InputStream in = session.read(flowFile);
- final RecordReader recordReader =
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
- final List<String> fieldNames =
recordReader.getSchema().getFieldNames();
- final RecordSet recordSet = recordReader.createRecordSet();
-
- // Deprecated
- if (skipHeadLine) recordSet.next();
-
- Record record = recordSet.next();
- while (record != null) {
- Operation operation = operationType == OperationType.UPSERT
- ? upsertRecordToKudu(kuduTable, record, fieldNames)
- : insertRecordToKudu(kuduTable, record, fieldNames);
- // We keep track of mappings between Operations and their
origins,
- // so that we know which FlowFiles should be marked
failure after buffered flush.
- operationFlowFileMap.put(operation, flowFile);
-
- // Flush mutation buffer of KuduSession to avoid
"MANUAL_FLUSH is enabled
- // but the buffer is too big" error. This can happen when
flush mode is
- // MANUAL_FLUSH and a FlowFile has more than one records.
- if (numBuffered == batchSize && flushMode ==
FlushMode.MANUAL_FLUSH) {
- numBuffered = 0;
- pendingRowErrors = Stream.concat(pendingRowErrors,
flushKuduSession(kuduSession, false));
- }
-
- // OperationResponse is returned only when flush mode is
set to AUTO_FLUSH_SYNC
- OperationResponse response = kuduSession.apply(operation);
- if (response != null && response.hasRowError()) {
- // Stop processing the records on the first error.
- // Note that Kudu does not support rolling back of
previous operations.
- flowFileFailures.put(flowFile, response.getRowError());
- break;
- }
-
- numBuffered++;
- numRecords.merge(flowFile, 1, Integer::sum);
- record = recordSet.next();
- }
- } catch (Exception ex) {
- flowFileFailures.put(flowFile, ex);
- }
- }
-
- try {
- // Find RowErrors for each FlowFile
- Map<FlowFile, List<RowError>> flowFileRowErrors =
- Stream.concat(
- pendingRowErrors,
- numBuffered > 0 ? flushKuduSession(kuduSession, true) :
Stream.empty()
- ).collect(Collectors.groupingBy(e ->
operationFlowFileMap.get(e.getOperation())));
-
- flowFiles.forEach(ff -> {
- int count = numRecords.getOrDefault(ff, 0);
- List<RowError> rowErrors = flowFileRowErrors.get(ff);
- if (rowErrors != null) {
- rowErrors.forEach(rowError ->
- getLogger().error("Failed to write due to {}", new
Object[]{rowError}));
- session.putAttribute(ff, RECORD_COUNT_ATTR,
String.valueOf(count - rowErrors.size()));
- session.transfer(ff, REL_FAILURE);
- } else {
- session.putAttribute(ff, RECORD_COUNT_ATTR,
String.valueOf(count));
- if (flowFileFailures.containsKey(ff)) {
- getLogger().error("Failed to write due to {}", new
Object[]{flowFileFailures.get(ff)});
- session.transfer(ff, REL_FAILURE);
- } else {
- session.transfer(ff, REL_SUCCESS);
- session.getProvenanceReporter().send(ff, "Successfully
added FlowFile to Kudu");
- }
- }
- });
- } catch (Exception ex) {
- throw new ProcessException(ex);
- }
- }
-
- protected KuduClient getKuduConnection(String masters) {
- return new KuduClient.KuduClientBuilder(kuduMasters).build();
- }
-
- protected KuduTable getKuduTable(KuduClient client, String tableName)
throws KuduException {
- return client.openTable(tableName);
- }
-
- protected KuduSession getKuduSession(KuduClient client){
-
- KuduSession kuduSession = client.newSession();
-
- kuduSession.setMutationBufferSpace(batchSize);
- kuduSession.setFlushMode(flushMode);
-
- if(operationType == OperationType.INSERT_IGNORE){
- kuduSession.setIgnoreAllDuplicateRows(true);
- }
-
- return kuduSession;
- }
-
- protected abstract Insert insertRecordToKudu(final KuduTable table, final
Record record, final List<String> fields) throws Exception;
- protected abstract Upsert upsertRecordToKudu(final KuduTable table, final
Record record, final List<String> fields) throws Exception;
-}
-
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
index 108f5cf..9605315 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
@@ -17,31 +17,53 @@
package org.apache.nifi.processors.kudu;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.Insert;
-import org.apache.kudu.client.Upsert;
-import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
-
+import org.apache.kudu.client.Operation;
+import org.apache.kudu.client.OperationResponse;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.RowError;
+import org.apache.kudu.client.SessionConfiguration;
+import org.apache.kudu.client.Upsert;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
-import com.google.common.annotations.VisibleForTesting;
-
+import java.io.InputStream;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
@EventDriven
@SupportsBatching
@@ -51,7 +73,89 @@ import java.util.Set;
"to the specified Kudu's table. The schema for the table must be
provided in the processor properties or from your source." +
" If any error occurs while reading records from the input, or writing
records to Kudu, the FlowFile will be routed to failure")
@WritesAttribute(attribute = "record.count", description = "Number of records
written to Kudu")
-public class PutKudu extends AbstractKudu {
+public class PutKudu extends AbstractProcessor {
+ protected static final PropertyDescriptor KUDU_MASTERS = new
PropertyDescriptor.Builder()
+ .name("Kudu Masters")
+ .description("List all kudu masters's ip with port (e.g. 7051), comma
separated")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ protected static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
+ .name("Table Name")
+ .description("The name of the Kudu Table to put data into")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("The service for reading records from incoming flow
files.")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ protected static final PropertyDescriptor SKIP_HEAD_LINE = new
PropertyDescriptor.Builder()
+ .name("Skip head line")
+ .description("Deprecated. Used to ignore header lines, but this should
be handled by a RecordReader " +
+ "(e.g. \"Treat First Line as Header\" property of CSVReader)")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor INSERT_OPERATION = new
PropertyDescriptor.Builder()
+ .name("Insert Operation")
+ .description("Specify operationType for this processor. Insert-Ignore
will ignore duplicated rows")
+ .allowableValues(OperationType.values())
+ .defaultValue(OperationType.INSERT.toString())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ protected static final PropertyDescriptor FLUSH_MODE = new
PropertyDescriptor.Builder()
+ .name("Flush Mode")
+ .description("Set the new flush mode for a kudu session.\n" +
+ "AUTO_FLUSH_SYNC: the call returns when the operation is
persisted, else it throws an exception.\n" +
+ "AUTO_FLUSH_BACKGROUND: the call returns when the operation has
been added to the buffer. This call should normally perform only fast
in-memory" +
+ " operations but it may have to wait when the buffer is full and
there's another buffer being flushed.\n" +
+ "MANUAL_FLUSH: the call returns when the operation has been added
to the buffer, else it throws a KuduException if the buffer is full.")
+ .allowableValues(SessionConfiguration.FlushMode.values())
+
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND.toString())
+ .required(true)
+ .build();
+
+ protected static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("The maximum number of FlowFiles to process in a single
execution, between 1 - 100000. " +
+ "Depending on your memory size, and data size per row set an
appropriate batch size. " +
+ "Gradually increase this number to find out the best one for best
performances.")
+ .defaultValue("100")
+ .required(true)
+ .addValidator(StandardValidators.createLongValidator(1, 100000, true))
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ protected static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship after it has
been successfully stored in Kudu")
+ .build();
+ protected static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if it cannot
be sent to Kudu")
+ .build();
+
+ public static final String RECORD_COUNT_ATTR = "record.count";
+
+ protected OperationType operationType;
+ protected SessionConfiguration.FlushMode flushMode;
+ protected int batchSize = 100;
+
+ protected KuduClient kuduClient;
+ protected KuduTable kuduTable;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -75,14 +179,169 @@ public class PutKudu extends AbstractKudu {
return rels;
}
+
+ @OnScheduled
+ public void OnScheduled(final ProcessContext context) throws KuduException
{
+ final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+ final String kuduMasters =
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+ operationType =
OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
+ batchSize =
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+ flushMode =
SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
+
+ getLogger().debug("Setting up Kudu connection...");
+ kuduClient = createClient(kuduMasters);
+ kuduTable = kuduClient.openTable(tableName);
+ getLogger().debug("Kudu connection successfully initialized");
+ }
+
+ protected KuduClient createClient(final String masters) {
+ return new KuduClient.KuduClientBuilder(masters).build();
+ }
+
+ @OnStopped
+ public final void closeClient() throws KuduException {
+ if (kuduClient != null) {
+ getLogger().debug("Closing KuduClient");
+ kuduClient.close();
+ kuduClient = null;
+ }
+ }
+
@Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final List<FlowFile> flowFiles = session.get(batchSize);
+ if (flowFiles.isEmpty()) {
+ return;
+ }
+
+ final KuduSession kuduSession = getKuduSession(kuduClient);
+ final RecordReaderFactory recordReaderFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+ final Map<FlowFile, Integer> numRecords = new HashMap<>();
+ final Map<FlowFile, Object> flowFileFailures = new HashMap<>();
+ final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>();
+
+ int numBuffered = 0;
+ final List<RowError> pendingRowErrors = new ArrayList<>();
+ for (FlowFile flowFile : flowFiles) {
+ try (final InputStream in = session.read(flowFile);
+ final RecordReader recordReader =
recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
+ final List<String> fieldNames =
recordReader.getSchema().getFieldNames();
+ final RecordSet recordSet = recordReader.createRecordSet();
+
+ Record record = recordSet.next();
+ while (record != null) {
+ Operation operation = operationType == OperationType.UPSERT
+ ? upsertRecordToKudu(kuduTable, record, fieldNames)
+ : insertRecordToKudu(kuduTable, record, fieldNames);
+
+ // We keep track of mappings between Operations and their
origins,
+ // so that we know which FlowFiles should be marked
failure after buffered flush.
+ operationFlowFileMap.put(operation, flowFile);
+
+ // Flush mutation buffer of KuduSession to avoid
"MANUAL_FLUSH is enabled
+ // but the buffer is too big" error. This can happen when
flush mode is
+ // MANUAL_FLUSH and a FlowFile has more than one records.
+ if (numBuffered == batchSize && flushMode ==
SessionConfiguration.FlushMode.MANUAL_FLUSH) {
+ numBuffered = 0;
+ flushKuduSession(kuduSession, false, pendingRowErrors);
+ }
+
+ // OperationResponse is returned only when flush mode is
set to AUTO_FLUSH_SYNC
+ OperationResponse response = kuduSession.apply(operation);
+ if (response != null && response.hasRowError()) {
+ // Stop processing the records on the first error.
+ // Note that Kudu does not support rolling back of
previous operations.
+ flowFileFailures.put(flowFile, response.getRowError());
+ break;
+ }
+
+ numBuffered++;
+ numRecords.merge(flowFile, 1, Integer::sum);
+ record = recordSet.next();
+ }
+ } catch (Exception ex) {
+ flowFileFailures.put(flowFile, ex);
+ }
+ }
+
+ if (numBuffered > 0) {
+ try {
+ flushKuduSession(kuduSession, true, pendingRowErrors);
+ } catch (final Exception e) {
+ getLogger().error("Failed to flush/close Kudu Session", e);
+ for (final FlowFile flowFile : flowFiles) {
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ return;
+ }
+ }
+
+ // Find RowErrors for each FlowFile
+ final Map<FlowFile, List<RowError>> flowFileRowErrors =
pendingRowErrors.stream().collect(
+ Collectors.groupingBy(e ->
operationFlowFileMap.get(e.getOperation())));
+
+ long totalCount = 0L;
+ for (final FlowFile flowFile : flowFiles) {
+ final int count = numRecords.getOrDefault(flowFile, 0);
+ totalCount += count;
+ final List<RowError> rowErrors = flowFileRowErrors.get(flowFile);
+
+ if (rowErrors != null) {
+ rowErrors.forEach(rowError -> getLogger().error("Failed to
write due to {}", new Object[]{rowError.toString()}));
+ session.putAttribute(flowFile, RECORD_COUNT_ATTR,
String.valueOf(count - rowErrors.size()));
+ session.transfer(flowFile, REL_FAILURE);
+ } else {
+ session.putAttribute(flowFile, RECORD_COUNT_ATTR,
String.valueOf(count));
+
+ if (flowFileFailures.containsKey(flowFile)) {
+ getLogger().error("Failed to write due to {}", new
Object[]{flowFileFailures.get(flowFile)});
+ session.transfer(flowFile, REL_FAILURE);
+ } else {
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().send(flowFile,
"Successfully added FlowFile to Kudu");
+ }
+ }
+ }
+
+ session.adjustCounter("Records Inserted", totalCount, false);
+ }
+
+
+ protected KuduSession getKuduSession(final KuduClient client) {
+ final KuduSession kuduSession = client.newSession();
+ kuduSession.setMutationBufferSpace(batchSize);
+ kuduSession.setFlushMode(flushMode);
+
+ if (operationType == OperationType.INSERT_IGNORE) {
+ kuduSession.setIgnoreAllDuplicateRows(true);
+ }
+
+ return kuduSession;
+ }
+
+ private void flushKuduSession(final KuduSession kuduSession, boolean
close, final List<RowError> rowErrors) throws KuduException {
+ final List<OperationResponse> responses = close ? kuduSession.close()
: kuduSession.flush();
+
+ if (kuduSession.getFlushMode() ==
SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND) {
+
rowErrors.addAll(Arrays.asList(kuduSession.getPendingErrors().getRowErrors()));
+ } else {
+ responses.stream()
+ .filter(OperationResponse::hasRowError)
+ .map(OperationResponse::getRowError)
+ .forEach(rowErrors::add);
+ }
+ }
+
+
+
protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames) throws IllegalStateException, Exception {
Upsert upsert = kuduTable.newUpsert();
this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record,
fieldNames);
return upsert;
}
- @Override
protected Insert insertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames) throws IllegalStateException, Exception {
Insert insert = kuduTable.newInsert();
this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record,
fieldNames);
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
index f5a657b..f805be7 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java
@@ -17,61 +17,64 @@
package org.apache.nifi.processors.kudu;
+import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
-import org.apache.kudu.client.KuduException;
-import org.apache.kudu.client.Insert;
import org.apache.kudu.client.Upsert;
-
import org.apache.nifi.serialization.record.Record;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class MockPutKudu extends PutKudu {
- private KuduSession session;
- private LinkedList<Insert> insertQueue;
+ private KuduSession session;
+ private LinkedList<Insert> insertQueue;
+
+ public MockPutKudu() {
+ this(mock(KuduSession.class));
+ }
+
+ public MockPutKudu(KuduSession session) {
+ this.session = session;
+ this.insertQueue = new LinkedList<>();
+ }
- public MockPutKudu() {
- this(mock(KuduSession.class));
- }
+ public void queue(Insert... operations) {
+ insertQueue.addAll(Arrays.asList(operations));
+ }
- public MockPutKudu(KuduSession session) {
- this.session = session;
- this.insertQueue = new LinkedList<>();
- }
+ @Override
+ protected Insert insertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames) {
+ Insert insert = insertQueue.poll();
+ return insert != null ? insert : mock(Insert.class);
+ }
- public void queue(Insert... operations) {
- insertQueue.addAll(Arrays.asList(operations));
- }
+ @Override
+ protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames) {
+ return mock(Upsert.class);
+ }
- @Override
- protected Insert insertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames) {
- Insert insert = insertQueue.poll();
- return insert != null ? insert : mock(Insert.class);
- }
+ @Override
+ protected KuduClient createClient(final String masters) {
+ final KuduClient client = mock(KuduClient.class);
- @Override
- protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record,
List<String> fieldNames) {
- return mock(Upsert.class);
- }
+ try {
+
when(client.openTable(anyString())).thenReturn(mock(KuduTable.class));
+ } catch (final Exception e) {
- @Override
- protected KuduClient getKuduConnection(String masters) {
- return mock(KuduClient.class);
- }
+ }
- @Override
- protected KuduSession getKuduSession(KuduClient client){
- return session;
- }
+ return client;
+ }
- @Override
- protected KuduTable getKuduTable(KuduClient client, String tableName) throws
KuduException {
- return mock(KuduTable.class);
- }
+ @Override
+ protected KuduSession getKuduSession(KuduClient client) {
+ return session;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
index 1263c0e..041b506 100644
---
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.kudu;
-import java.util.stream.Collectors;
import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
@@ -41,14 +40,13 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-
import org.apache.nifi.util.Tuple;
import org.junit.After;
import org.junit.Assert;
@@ -59,23 +57,23 @@ import org.mockito.stubbing.OngoingStubbing;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Arrays;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
+import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
+import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK;
-import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL;
-import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION;
-
public class TestPutKudu {
public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table";
@@ -246,23 +244,6 @@ public class TestPutKudu {
testRunner.assertAllFlowFilesTransferred(PutKudu.REL_FAILURE, 1);
}
- @Test
- public void testSkipHeadLineTrue() throws InitializationException,
IOException {
- createRecordReader(100);
- testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, "true");
-
- final String filename = "testSkipHeadLineTrue-" +
System.currentTimeMillis();
-
- final Map<String,String> flowFileAttributes = new HashMap<>();
- flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
-
- testRunner.enqueue("trigger", flowFileAttributes);
- testRunner.run();
- testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1);
-
- MockFlowFile flowFiles =
testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
- flowFiles.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "99");
- }
@Test
public void testInsertManyFlowFiles() throws Exception {