Repository: nifi Updated Branches: refs/heads/master 0a44bad76 -> 02ba4cf2c
NIFI-5064 - Fixes and improvements to PutKudu processor Signed-off-by: Pierre Villard <[email protected]> This closes #2621. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/02ba4cf2 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/02ba4cf2 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/02ba4cf2 Branch: refs/heads/master Commit: 02ba4cf2c85221af731f310b9f2d1624f4e2f446 Parents: 0a44bad Author: Junegunn Choi <[email protected]> Authored: Tue Apr 3 16:36:54 2018 +0900 Committer: Pierre Villard <[email protected]> Committed: Wed May 9 19:39:29 2018 +0200 ---------------------------------------------------------------------- .../nifi-kudu-processors/pom.xml | 7 +- .../nifi/processors/kudu/AbstractKudu.java | 188 +++++++------ .../apache/nifi/processors/kudu/PutKudu.java | 38 ++- .../nifi/processors/kudu/MockPutKudu.java | 25 +- .../nifi/processors/kudu/TestPutKudu.java | 265 ++++++++++++++++++- 5 files changed, 426 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/02ba4cf2/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml index 0eba8d9..250a2d2 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml @@ -47,7 +47,7 @@ <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> - <version>1.3.0</version> + <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> @@ -91,5 +91,10 @@ <version>2.5.4</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>18.0</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/02ba4cf2/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java index 68e7004..8e78eb4 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java @@ -17,16 +17,15 @@ package org.apache.nifi.processors.kudu; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.commons.io.IOUtils; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduException; import org.apache.kudu.client.KuduSession; import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.Insert; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.OperationResponse; +import org.apache.kudu.client.RowError; +import org.apache.kudu.client.SessionConfiguration.FlushMode; import org.apache.kudu.client.Upsert; import org.apache.kudu.client.SessionConfiguration; @@ -40,20 +39,20 @@ import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.serialization.record.Record; +import java.io.InputStream; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; public abstract class AbstractKudu extends AbstractProcessor { @@ -83,9 +82,10 @@ public abstract class AbstractKudu extends AbstractProcessor { protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder() .name("Skip head line") - .description("Set it to true if your first line is the header line e.g. column names") + .description("Deprecated. Used to ignore header lines, but this should be handled by a RecordReader " + + "(e.g. \"Treat First Line as Header\" property of CSVReader)") .allowableValues("true", "false") - .defaultValue("true") + .defaultValue("false") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -112,12 +112,12 @@ public abstract class AbstractKudu extends AbstractProcessor { protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("Batch Size") - .description("Set the number of operations that can be buffered, between 2 - 100000. " + + .description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " + "Depending on your memory size, and data size per row set an appropriate batch size. " + "Gradually increase this number to find out the best one for best performances.") .defaultValue("100") .required(true) - .addValidator(StandardValidators.createLongValidator(2, 100000, true)) + .addValidator(StandardValidators.createLongValidator(1, 100000, true)) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); @@ -147,17 +147,17 @@ public abstract class AbstractKudu extends AbstractProcessor { try { tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue(); kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue(); - if(kuduClient == null) { + operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue()); + batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); + flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue()); + skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean(); + + if (kuduClient == null) { getLogger().debug("Setting up Kudu connection..."); kuduClient = getKuduConnection(kuduMasters); kuduTable = this.getKuduTable(kuduClient, tableName); getLogger().debug("Kudu connection successfully initialized"); } - - operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue()); - batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); - flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue()); - skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean(); } catch(KuduException ex){ getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex); } @@ -172,75 +172,107 @@ public abstract class AbstractKudu extends AbstractProcessor { } } + private Stream<RowError> flushKuduSession(final KuduSession kuduSession, boolean close) throws Exception { + List<OperationResponse> responses = close ? kuduSession.close() : kuduSession.flush(); + Stream<RowError> rowErrors; + if (kuduSession.getFlushMode() == FlushMode.AUTO_FLUSH_BACKGROUND) { + rowErrors = Stream.of(kuduSession.getPendingErrors().getRowErrors()); + } else { + rowErrors = responses.stream() + .filter(OperationResponse::hasRowError) + .map(OperationResponse::getRowError); + } + return rowErrors; + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final FlowFile flowFile = session.get(); - try { - if (flowFile == null) return; - final Map<String,String> attributes = new HashMap<String, String>(); - final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null); - final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); - final KuduSession kuduSession = this.getKuduSession(kuduClient); - - session.read(flowFile, (final InputStream rawIn) -> { - RecordReader recordReader = null; - try (final BufferedInputStream in = new BufferedInputStream(rawIn)) { - try { - recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger()); - } catch (Exception ex) { - final RecordReaderFactoryException rrfe = new RecordReaderFactoryException("Unable to create RecordReader", ex); - exceptionHolder.set(rrfe); - return; + final List<FlowFile> flowFiles = session.get(batchSize); + if (flowFiles.isEmpty()) return; + + final KuduSession kuduSession = getKuduSession(kuduClient); + final RecordReaderFactory recordReaderFactory = + context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + + final Map<FlowFile, Integer> numRecords = new HashMap<>(); + final Map<FlowFile, Object> flowFileFailures = new HashMap<>(); + final Map<Operation, FlowFile> operationFlowFileMap = new HashMap<>(); + + int numBuffered = 0; + Stream<RowError> pendingRowErrors = Stream.empty(); + for (FlowFile flowFile : flowFiles) { + try (final InputStream in = session.read(flowFile); + final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) { + final List<String> fieldNames = recordReader.getSchema().getFieldNames(); + final RecordSet recordSet = recordReader.createRecordSet(); + + // Deprecated + if (skipHeadLine) recordSet.next(); + + Record record = recordSet.next(); + while (record != null) { + Operation operation = operationType == OperationType.UPSERT + ? upsertRecordToKudu(kuduTable, record, fieldNames) + : insertRecordToKudu(kuduTable, record, fieldNames); + // We keep track of mappings between Operations and their origins, + // so that we know which FlowFiles should be marked failure after buffered flush. + operationFlowFileMap.put(operation, flowFile); + + // Flush mutation buffer of KuduSession to avoid "MANUAL_FLUSH is enabled + // but the buffer is too big" error. This can happen when flush mode is + // MANUAL_FLUSH and a FlowFile has more than one records. + if (numBuffered == batchSize && flushMode == FlushMode.MANUAL_FLUSH) { + numBuffered = 0; + pendingRowErrors = Stream.concat(pendingRowErrors, flushKuduSession(kuduSession, false)); } - List<String> fieldNames = recordReader.getSchema().getFieldNames(); - final RecordSet recordSet = recordReader.createRecordSet(); - - if (skipHeadLine) recordSet.next(); - - int numOfAddedRecord = 0; - Record record = recordSet.next(); - while (record != null) { - org.apache.kudu.client.Operation oper = null; - if(operationType == OperationType.UPSERT) { - oper = upsertRecordToKudu(kuduTable, record, fieldNames); - } else { - oper = insertRecordToKudu(kuduTable, record, fieldNames); - } - kuduSession.apply(oper); - numOfAddedRecord++; - record = recordSet.next(); + // OperationResponse is returned only when flush mode is set to AUTO_FLUSH_SYNC + OperationResponse response = kuduSession.apply(operation); + if (response != null && response.hasRowError()) { + // Stop processing the records on the first error. + // Note that Kudu does not support rolling back of previous operations. + flowFileFailures.put(flowFile, response.getRowError()); + break; } - getLogger().info("KUDU: number of inserted records: " + numOfAddedRecord); - attributes.put(RECORD_COUNT_ATTR, String.valueOf(numOfAddedRecord)); - - } catch (KuduException ex) { - getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex); - exceptionHolder.set(ex); - } catch (Exception e) { - exceptionHolder.set(e); - } finally { - IOUtils.closeQuietly(recordReader); + numBuffered++; + numRecords.merge(flowFile, 1, Integer::sum); + record = recordSet.next(); } - }); - kuduSession.close(); - if (exceptionHolder.get() != null) { - throw exceptionHolder.get(); + } catch (Exception ex) { + flowFileFailures.put(flowFile, ex); } + } - // Update flow file's attributes after the ingestion - session.putAllAttributes(flowFile, attributes); - - session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().send(flowFile, "Successfully added flowfile to kudu"); - - } catch (IOException | FlowFileAccessException e) { - getLogger().error("Failed to write due to {}", new Object[]{e}); - session.transfer(flowFile, REL_FAILURE); - } catch (Throwable t) { - getLogger().error("Failed to write due to {}", new Object[]{t}); - session.transfer(flowFile, REL_FAILURE); + try { + // Find RowErrors for each FlowFile + Map<FlowFile, List<RowError>> flowFileRowErrors = + Stream.concat( + pendingRowErrors, + numBuffered > 0 ? flushKuduSession(kuduSession, true) : Stream.empty() + ).collect(Collectors.groupingBy(e -> operationFlowFileMap.get(e.getOperation()))); + + flowFiles.forEach(ff -> { + int count = numRecords.getOrDefault(ff, 0); + List<RowError> rowErrors = flowFileRowErrors.get(ff); + if (rowErrors != null) { + rowErrors.forEach(rowError -> + getLogger().error("Failed to write due to {}", new Object[]{rowError})); + session.putAttribute(ff, RECORD_COUNT_ATTR, String.valueOf(count - rowErrors.size())); + session.transfer(ff, REL_FAILURE); + } else { + session.putAttribute(ff, RECORD_COUNT_ATTR, String.valueOf(count)); + if (flowFileFailures.containsKey(ff)) { + getLogger().error("Failed to write due to {}", new Object[]{flowFileFailures.get(ff)}); + session.transfer(ff, REL_FAILURE); + } else { + session.transfer(ff, REL_SUCCESS); + session.getProvenanceReporter().send(ff, "Successfully added FlowFile to Kudu"); + } + } + }); + } catch (Exception ex) { + throw new ProcessException(ex); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/02ba4cf2/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index 313e49b..eb35e6d 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -17,13 +17,13 @@ package org.apache.nifi.processors.kudu; +import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.Insert; import org.apache.kudu.client.Upsert; import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.KuduTable; -import org.apache.kudu.client.Operation; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -35,6 +35,9 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.Relationship; import org.apache.nifi.serialization.record.Record; +import com.google.common.annotations.VisibleForTesting; + +import java.math.BigDecimal; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -75,27 +78,31 @@ public class PutKudu extends AbstractKudu { @Override protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception { Upsert upsert = kuduTable.newUpsert(); - this.insert(kuduTable, upsert, record, fieldNames); + this.buildPartialRow(kuduTable.getSchema(), upsert.getRow(), record, fieldNames); return upsert; } @Override protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception { Insert insert = kuduTable.newInsert(); - this.insert(kuduTable, insert, record, fieldNames); + this.buildPartialRow(kuduTable.getSchema(), insert.getRow(), record, fieldNames); return insert; } - private void insert(KuduTable kuduTable, Operation operation, Record record, List<String> fieldNames){ - PartialRow row = operation.getRow(); - Schema colSchema = kuduTable.getSchema(); - + @VisibleForTesting + void buildPartialRow(Schema schema, PartialRow row, Record record, List<String> fieldNames) { for (String colName : fieldNames) { - int colIdx = this.getColumnIndex(colSchema, colName); + int colIdx = this.getColumnIndex(schema, colName); if (colIdx != -1) { - Type colType = colSchema.getColumnByIndex(colIdx).getType(); + ColumnSchema colSchema = schema.getColumnByIndex(colIdx); + Type colType = colSchema.getType(); + + if (record.getValue(colName) == null) { + row.setNull(colName); + continue; + } - switch (colType.getDataType()) { + switch (colType.getDataType(colSchema.getTypeAttributes())) { case BOOL: row.addBoolean(colIdx, record.getAsBoolean(colName)); break; @@ -109,9 +116,11 @@ public class PutKudu extends AbstractKudu { row.addBinary(colIdx, record.getAsString(colName).getBytes()); break; case INT8: + row.addByte(colIdx, record.getAsInt(colName).byteValue()); + break; case INT16: - short temp = (short)record.getAsInt(colName).intValue(); - row.addShort(colIdx, temp); + row.addShort(colIdx, record.getAsInt(colName).shortValue()); + break; case INT32: row.addInt(colIdx, record.getAsInt(colName)); break; @@ -121,6 +130,11 @@ public class PutKudu extends AbstractKudu { case STRING: row.addString(colIdx, record.getAsString(colName)); break; + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: + row.addDecimal(colIdx, new BigDecimal(record.getAsString(colName))); + break; default: throw new IllegalStateException(String.format("unknown column type %s", colType)); } http://git-wip-us.apache.org/repos/asf/nifi/blob/02ba4cf2/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java index feef584..f5a657b 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java @@ -26,14 +26,33 @@ import org.apache.kudu.client.Upsert; import org.apache.nifi.serialization.record.Record; +import java.util.Arrays; +import java.util.LinkedList; import java.util.List; import static org.mockito.Mockito.mock; -public class MockPutKudu extends PutKudu{ +public class MockPutKudu extends PutKudu { + private KuduSession session; + private LinkedList<Insert> insertQueue; + + public MockPutKudu() { + this(mock(KuduSession.class)); + } + + public MockPutKudu(KuduSession session) { + this.session = session; + this.insertQueue = new LinkedList<>(); + } + + public void queue(Insert... operations) { + insertQueue.addAll(Arrays.asList(operations)); + } + @Override protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) { - return mock(Insert.class); + Insert insert = insertQueue.poll(); + return insert != null ? insert : mock(Insert.class); } @Override @@ -48,7 +67,7 @@ public class MockPutKudu extends PutKudu{ @Override protected KuduSession getKuduSession(KuduClient client){ - return mock(KuduSession.class); + return session; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/02ba4cf2/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java index fa3aa77..315a0ee 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -17,6 +17,18 @@ package org.apache.nifi.processors.kudu; +import java.util.stream.Collectors; +import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder; +import org.apache.kudu.ColumnTypeAttributes; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.OperationResponse; +import org.apache.kudu.client.RowError; +import org.apache.kudu.client.RowErrorsAndOverflowStatus; +import org.apache.kudu.client.SessionConfiguration.FlushMode; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; @@ -27,27 +39,43 @@ import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.Tuple; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.stubbing.OngoingStubbing; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.IntStream; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.OK; +import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.FAIL; +import static org.apache.nifi.processors.kudu.TestPutKudu.ResultCode.EXCEPTION; + public class TestPutKudu { public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table"; @@ -63,6 +91,10 @@ public class TestPutKudu { public void setUp() { processor = new MockPutKudu(); testRunner = TestRunners.newTestRunner(processor); + setUpTestRunner(testRunner); + } + + private void setUpTestRunner(TestRunner testRunner) { testRunner.setProperty(PutKudu.TABLE_NAME, DEFAULT_TABLE_NAME); testRunner.setProperty(PutKudu.KUDU_MASTERS, DEFAULT_MASTERS); testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, SKIP_HEAD_LINE); @@ -125,7 +157,7 @@ public class TestPutKudu { // simulate throwing an IOException when the factory creates a reader which is what would happen when // invalid Avro is passed to the Avro reader factory - final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class); + final RecordReaderFactory readerFactory = mock(RecordReaderFactory.class); when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory"); when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenThrow(new IOException("NOT AVRO")); @@ -162,10 +194,10 @@ public class TestPutKudu { public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException { createRecordReader(10); - final RecordReader recordReader = Mockito.mock(RecordReader.class); + final RecordReader recordReader = mock(RecordReader.class); when(recordReader.nextRecord()).thenThrow(new MalformedRecordException("ERROR")); - final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class); + final RecordReaderFactory readerFactory = mock(RecordReaderFactory.class); when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory"); when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader); @@ -272,4 +304,231 @@ public class TestPutKudu { flowFile.assertContentEquals("string".getBytes()); flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50"); } + + @Test + public void testBuildRow() { + buildPartialRow((long) 1, "foo", (short) 10); + } + + @Test + public void testBuildPartialRowNullable() { + buildPartialRow((long) 1, null, (short) 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testBuildPartialRowNullPrimaryKey() { + buildPartialRow(null, "foo", (short) 10); + } + + @Test(expected = IllegalArgumentException.class) + public void testBuildPartialRowNotNullable() { + buildPartialRow((long) 1, "foo", null); + } + + private void buildPartialRow(Long id, String name, Short age) { + final Schema kuduSchema = new Schema(Arrays.asList( + new ColumnSchemaBuilder("id", Type.INT64).key(true).build(), + new ColumnSchemaBuilder("name", Type.STRING).nullable(true).build(), + new ColumnSchemaBuilder("age", Type.INT16).nullable(false).build(), + new ColumnSchemaBuilder("score", Type.DECIMAL).nullable(true).typeAttributes( + new ColumnTypeAttributes.ColumnTypeAttributesBuilder().precision(9).scale(0).build() + ).build())); + + final RecordSchema schema = new SimpleRecordSchema(Arrays.asList( + new RecordField("id", RecordFieldType.BIGINT.getDataType()), + new RecordField("name", RecordFieldType.STRING.getDataType()), + new RecordField("age", RecordFieldType.SHORT.getDataType()), + new RecordField("score", RecordFieldType.LONG.getDataType()))); + + Map<String, Object> values = new HashMap<>(); + values.put("id", id); + values.put("name", name); + values.put("age", age); + values.put("score", 10000L); + new PutKudu().buildPartialRow( + kuduSchema, + kuduSchema.newPartialRow(), + new MapRecord(schema, values), + schema.getFieldNames() + ); + } + + private Tuple<Insert, OperationResponse> insert(boolean success) { + Insert insert = mock(Insert.class); + OperationResponse response = mock(OperationResponse.class, Mockito.RETURNS_DEEP_STUBS); + when(response.hasRowError()).thenReturn(!success); + if (!success) { + when(response.getRowError().getOperation()).thenReturn(insert); + } + return new Tuple<>(insert, response); + } + + enum ResultCode { + OK, + FAIL, + EXCEPTION + } + + private LinkedList<OperationResponse> queueInsert(MockPutKudu kudu, KuduSession session, boolean sync, ResultCode... results) throws Exception { + LinkedList<OperationResponse> responses = new LinkedList<>(); + for (ResultCode result : results) { + boolean ok = result == OK; + Tuple<Insert, OperationResponse> tuple = insert(ok); + kudu.queue(tuple.getKey()); + + if (result == EXCEPTION) { + when(session.apply(tuple.getKey())).thenThrow(mock(KuduException.class)); + // Stop processing the rest of the records on the first exception + break; + } else { + responses.add(tuple.getValue()); + if (sync) { + when(session.apply(tuple.getKey())).thenReturn(ok ? null : tuple.getValue()); + + // In AUTO_FLUSH_SYNC mode, PutKudu immediately knows when an operation has failed. + // In that case, it does not process the rest of the records in the FlowFile. + if (result == FAIL) break; + } + } + } + return responses; + } + + private static <T> void stubSlices(OngoingStubbing<T> stubbing, List<T> slices) { + for (T slice : slices) { + stubbing = stubbing.thenReturn(slice); + } + } + + private void testKuduPartialFailure(FlushMode flushMode, int batchSize) throws Exception { + final int numFlowFiles = 4; + final int numRecordsPerFlowFile = 3; + final ResultCode[][] flowFileResults = new ResultCode[][] { + new ResultCode[]{OK, OK, FAIL}, + + // The last operation will not be submitted to Kudu if flush mode is AUTO_FLUSH_SYNC + new ResultCode[]{OK, FAIL, OK}, + + // Everything's okay + new ResultCode[]{OK, OK, OK}, + + // The last operation will not be submitted due to an exception from apply() call + new ResultCode[]{OK, EXCEPTION, OK}, + }; + + KuduSession session = mock(KuduSession.class); + when(session.getFlushMode()).thenReturn(flushMode); + MockPutKudu putKudu = new MockPutKudu(session); + + List<List<OperationResponse>> flowFileResponses = new ArrayList<>(); + boolean sync = flushMode == FlushMode.AUTO_FLUSH_SYNC; + for (ResultCode[] results : flowFileResults) { + flowFileResponses.add(queueInsert(putKudu, session, sync, results)); + } + + switch (flushMode) { + case AUTO_FLUSH_SYNC: + // flush() or close() returns an empty list + when(session.close()).thenReturn(Collections.emptyList()); + break; + case AUTO_FLUSH_BACKGROUND: + // close() will be called for each batch of FlowFiles, however we do not check + // the return value of it. Instead, we should check the pending errors of the session + // as buffered operations may have already been flushed. + when(session.close()).thenReturn(Collections.emptyList()); + + List<RowErrorsAndOverflowStatus> pendingErrorResponses = new ArrayList<>(); + while (!flowFileResponses.isEmpty()) { + int sliceSize = Math.min(batchSize, flowFileResponses.size()); + List<List<OperationResponse>> slice = flowFileResponses.subList(0, sliceSize); + + RowErrorsAndOverflowStatus pendingErrorResponse = mock(RowErrorsAndOverflowStatus.class); + RowError[] rowErrors = slice.stream() + .flatMap(List::stream) + .filter(OperationResponse::hasRowError) + .map(OperationResponse::getRowError) + .toArray(RowError[]::new); + when(pendingErrorResponse.getRowErrors()).thenReturn(rowErrors); + pendingErrorResponses.add(pendingErrorResponse); + + flowFileResponses = flowFileResponses.subList(sliceSize, flowFileResponses.size()); + } + + stubSlices(when(session.getPendingErrors()), pendingErrorResponses); + break; + case MANUAL_FLUSH: + // close() will be called at the end of a batch, but flush() will also be called + // whenever the mutation buffer of KuduSession becomes full. In PutKudu, we set + // the size of the mutation buffer to match batchSize, so flush() is called only + // when a FlowFile more than one record. + List<List<OperationResponse>> flushes = new ArrayList<>(); + List<List<OperationResponse>> closes = new ArrayList<>(); + + while (!flowFileResponses.isEmpty()) { + int sliceSize = Math.min(batchSize, flowFileResponses.size()); + List<List<OperationResponse>> slice = flowFileResponses.subList(0, sliceSize); + flowFileResponses = flowFileResponses.subList(sliceSize, flowFileResponses.size()); + + List<OperationResponse> batch = new ArrayList<>(); + for (OperationResponse response : slice.stream().flatMap(List::stream).collect(Collectors.toList())) { + if (batch.size() == batchSize) { + flushes.add(batch); + batch = new ArrayList<>(); + } + batch.add(response); + } + if (flowFileResponses.isEmpty() && batch.size() == batchSize) { + // To handle the case where PutKudu ends the batch with flush() + // instead of close() due to the exception from the subsequent apply call. + flushes.add(batch); + } else if (batch.size() > 0) { + closes.add(batch); + } + + if (!flushes.isEmpty()) stubSlices(when(session.flush()), flushes); + if (!closes.isEmpty()) stubSlices(when(session.close()), closes); + } + break; + } + + testRunner = TestRunners.newTestRunner(putKudu); + createRecordReader(numRecordsPerFlowFile); + setUpTestRunner(testRunner); + testRunner.setProperty(PutKudu.FLUSH_MODE, flushMode.name()); + testRunner.setProperty(PutKudu.BATCH_SIZE, String.valueOf(batchSize)); + + IntStream.range(0, numFlowFiles).forEach(i -> testRunner.enqueue("")); + testRunner.run(numFlowFiles); + + testRunner.assertTransferCount(PutKudu.REL_FAILURE, 3); + List<MockFlowFile> failedFlowFiles = testRunner.getFlowFilesForRelationship(PutKudu.REL_FAILURE); + failedFlowFiles.get(0).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "2"); + failedFlowFiles.get(1).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, sync ? "1" : "2"); + failedFlowFiles.get(2).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "1"); + + testRunner.assertTransferCount(PutKudu.REL_SUCCESS, 1); + testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "3"); + } + + private void testKuduPartialFailure(FlushMode flushMode) throws Exception { + // Test against different batch sizes (up until the point where every record can be buffered at once) + for (int i = 1; i <= 11; i++) { + testKuduPartialFailure(flushMode, i); + } + } + + @Test + public void testKuduPartialFailuresOnAutoFlushSync() throws Exception { + testKuduPartialFailure(FlushMode.AUTO_FLUSH_SYNC); + } + + @Test + public void testKuduPartialFailuresOnAutoFlushBackground() throws Exception { + testKuduPartialFailure(FlushMode.AUTO_FLUSH_BACKGROUND); + } + + @Test + public void testKuduPartialFailuresOnManualFlush() throws Exception { + testKuduPartialFailure(FlushMode.MANUAL_FLUSH); + } }
