Repository: nifi Updated Branches: refs/heads/master 0eda71a9a -> e3da44fb6
[NiFi-3973] Add PutKudu processor to ingest data to Kudu Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e3da44fb Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e3da44fb Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e3da44fb Branch: refs/heads/master Commit: e3da44fb626a495f18ead72955a0854d6b1f7151 Parents: 0eda71a Author: cam <[email protected]> Authored: Tue Jun 6 18:25:41 2017 -0700 Committer: ricky <[email protected]> Committed: Fri Aug 25 10:53:31 2017 -0400 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 5 + .../nifi-kudu-bundle/nifi-kudu-nar/pom.xml | 42 +++ .../src/main/resources/META-INF/NOTICE | 51 ++++ .../nifi-kudu-processors/pom.xml | 91 ++++++ .../nifi/processors/kudu/AbstractKudu.java | 236 ++++++++++++++++ .../nifi/processors/kudu/OperationType.java | 24 ++ .../apache/nifi/processors/kudu/PutKudu.java | 136 +++++++++ .../org.apache.nifi.processor.Processor | 16 ++ .../nifi/processors/kudu/MockPutKudu.java | 58 ++++ .../nifi/processors/kudu/TestPutKudu.java | 275 +++++++++++++++++++ nifi-nar-bundles/nifi-kudu-bundle/pom.xml | 36 +++ nifi-nar-bundles/pom.xml | 1 + pom.xml | 12 + 13 files changed, 983 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index b875f1c..5b10e0c 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -218,6 +218,11 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kudu-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-flume-nar</artifactId> <type>nar</type> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml new file mode 100644 index 0000000..4d2b98f --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kudu-bundle</artifactId> + <version>1.4.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-kudu-nar</artifactId> + <version>1.4.0-SNAPSHOT</version> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kudu-processors</artifactId> + <version>1.4.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hadoop-libraries-nar</artifactId> + <type>nar</type> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..34f4467 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,51 @@ +nifi-kudu-nar +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Apache Avro + The following NOTICE information applies: + Apache Avro + Copyright 2009-2017 The Apache Software Foundation + + (ASLv2) Apache Commons Lang + The following NOTICE information applies: + Apache Commons Lang + Copyright 2001-2015 The Apache Software Foundation + + This product includes software from the Spring Framework, + under the Apache License 2.0 (see: StringUtils.containsWhitespace()) + + (ASLv2) Apache Commons JEXL + The following NOTICE information applies: + Apache Commons JEXL + Copyright 2001-2011 The Apache Software Foundation + + (ASLv2) Kudu SDK + The following NOTICE information applies: + This product includes software developed by Cloudera, Inc. + (http://www.cloudera.com/). + + This product includes software developed at + The Apache Software Foundation (http://www.apache.org/). + + This product includes software developed by + Saxonica (http://www.saxonica.com/). + + (ASLv2) Parquet MR + The following NOTICE information applies: + Parquet MR + Copyright 2012 Twitter, Inc. + + This project includes code from https://github.com/lemire/JavaFastPFOR + parquet-column/src/main/java/parquet/column/values/bitpacking/LemireBitPacking.java + Apache License Version 2.0 http://www.apache.org/licenses/. + (c) Daniel Lemire, http://lemire.me/en/ + http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml new file mode 100644 index 0000000..7e6c2d4 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml @@ -0,0 +1,91 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kudu-bundle</artifactId> + <version>1.4.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-kudu-processors</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service-api</artifactId> + <version>1.4.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.kudu</groupId> + <artifactId>kudu-client</artifactId> + <version>1.3.0</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hadoop-record-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock-record-utils</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <version>1.10.19</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>2.5.4</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java new file mode 100644 index 0000000..5019e03 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kudu; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.Upsert; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; + +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException; + +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.serialization.record.Record; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractKudu extends AbstractProcessor { + + protected static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder() + .name("Kudu Masters") + .description("List all kudu masters's ip with port (e.g. 7051), comma separated") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .description("The name of the Kudu Table to put data into") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("The service for reading records from incoming flow files.") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + + protected static final PropertyDescriptor SKIP_HEAD_LINE = new PropertyDescriptor.Builder() + .name("Skip head line") + .description("Set it to true if your first line is the header line e.g. column names") + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder() + .name("Insert Operation") + .description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows") + .allowableValues(OperationType.values()) + .defaultValue(OperationType.INSERT.toString()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after it has been successfully stored in Kudu") + .build(); + protected static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if it cannot be sent to Kudu") + .build(); + + public static final String RECORD_COUNT_ATTR = "record.count"; + + protected String kuduMasters; + protected String tableName; + protected boolean skipHeadLine; + protected OperationType operationType; + + protected KuduClient kuduClient; + protected KuduTable kuduTable; + + @OnScheduled + public void OnScheduled(final ProcessContext context) { + try { + tableName = context.getProperty(TABLE_NAME).getValue(); + kuduMasters = context.getProperty(KUDU_MASTERS).getValue(); + if(kuduClient == null) { + getLogger().debug("Setting up Kudu connection..."); + kuduClient = getKuduConnection(kuduMasters); + kuduTable = this.getKuduTable(kuduClient, tableName); + getLogger().debug("Kudu connection successfully initialized"); + } + } catch(KuduException ex){ + getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex); + } + operationType = OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue()); + skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean(); + } + + @OnStopped + public final void closeClient() throws KuduException { + if (kuduClient != null) { + getLogger().info("Closing KuduClient"); + kuduClient.close(); + kuduClient = null; + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final FlowFile flowFile = session.get(); + try { + if (flowFile == null) return; + final Map<String,String> attributes = new HashMap<String, String>(); + final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null); + final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final KuduSession kuduSession = this.getKuduSession(kuduClient); + + session.read(flowFile, (final InputStream rawIn) -> { + RecordReader recordReader = null; + try (final BufferedInputStream in = new BufferedInputStream(rawIn)) { + try { + recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger()); + } catch (Exception ex) { + final RecordReaderFactoryException rrfe = new RecordReaderFactoryException("Unable to create RecordReader", ex); + exceptionHolder.set(rrfe); + return; + } + + List<String> fieldNames = recordReader.getSchema().getFieldNames(); + final RecordSet recordSet = recordReader.createRecordSet(); + + if (skipHeadLine) recordSet.next(); + + int numOfAddedRecord = 0; + Record record = recordSet.next(); + while (record != null) { + org.apache.kudu.client.Operation oper = null; + if(operationType == OperationType.UPSERT) { + oper = upsertRecordToKudu(kuduTable, record, fieldNames); + } else { + oper = insertRecordToKudu(kuduTable, record, fieldNames); + } + kuduSession.apply(oper); + numOfAddedRecord++; + record = recordSet.next(); + } + + getLogger().info("KUDU: number of inserted records: " + numOfAddedRecord); + attributes.put(RECORD_COUNT_ATTR, String.valueOf(numOfAddedRecord)); + + } catch (KuduException ex) { + getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), ex); + exceptionHolder.set(ex); + } catch (Exception e) { + exceptionHolder.set(e); + } finally { + IOUtils.closeQuietly(recordReader); + } + }); + kuduSession.close(); + if (exceptionHolder.get() != null) { + throw exceptionHolder.get(); + } + + // Update flow file's attributes after the ingestion + session.putAllAttributes(flowFile, attributes); + + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().send(flowFile, "Successfully added flowfile to kudu"); + + } catch (IOException | FlowFileAccessException e) { + getLogger().error("Failed to write due to {}", new Object[]{e}); + session.transfer(flowFile, REL_FAILURE); + } catch (Throwable t) { + getLogger().error("Failed to write due to {}", new Object[]{t}); + session.transfer(flowFile, REL_FAILURE); + } + } + + protected KuduClient getKuduConnection(String masters) { + return new KuduClient.KuduClientBuilder(kuduMasters).build(); + } + + protected KuduTable getKuduTable(KuduClient client, String tableName) throws KuduException { + return client.openTable(tableName); + } + + protected KuduSession getKuduSession(KuduClient client){ + + KuduSession kuduSession = client.newSession(); + if(operationType == OperationType.INSERT_IGNORE){ + kuduSession.setIgnoreAllDuplicateRows(true); + } + + return kuduSession; + } + + protected abstract Insert insertRecordToKudu(final KuduTable table, final Record record, final List<String> fields) throws Exception; + protected abstract Upsert upsertRecordToKudu(final KuduTable table, final Record record, final List<String> fields) throws Exception; +} + http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java new file mode 100644 index 0000000..4ab466e --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/OperationType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kudu; + +public enum OperationType { + INSERT, + INSERT_IGNORE, + UPSERT; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java new file mode 100644 index 0000000..53fc678 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kudu; + +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.Upsert; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.serialization.record.Record; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"put", "database", "NoSQL", "kudu", "HDFS"}) +@CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " + + "to the specified Kudu's table. The schema for the table must be provided in the processor properties or from your source." + + " If any error occurs while reading records from the input, or writing records to Kudu, the FlowFile will be routed to failure") +@WritesAttribute(attribute = "record.count", description = "Number of records written to Kudu") +public class PutKudu extends AbstractKudu { + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(KUDU_MASTERS); + properties.add(TABLE_NAME); + properties.add(SKIP_HEAD_LINE); + properties.add(RECORD_READER); + properties.add(INSERT_OPERATION); + + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + @Override + protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception { + Upsert upsert = kuduTable.newUpsert(); + this.insert(kuduTable, upsert, record, fieldNames); + return upsert; + } + + @Override + protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) throws IllegalStateException, Exception { + Insert insert = kuduTable.newInsert(); + this.insert(kuduTable, insert, record, fieldNames); + return insert; + } + + private void insert(KuduTable kuduTable, Operation operation, Record record, List<String> fieldNames){ + PartialRow row = operation.getRow(); + Schema colSchema = kuduTable.getSchema(); + + for (String colName : fieldNames) { + int colIdx = this.getColumnIndex(colSchema, colName); + if (colIdx != -1) { + Type colType = colSchema.getColumnByIndex(colIdx).getType(); + + switch (colType.getDataType()) { + case BOOL: + row.addBoolean(colIdx, record.getAsBoolean(colName)); + break; + case FLOAT: + row.addFloat(colIdx, record.getAsFloat(colName)); + break; + case DOUBLE: + row.addDouble(colIdx, record.getAsDouble(colName)); + break; + case BINARY: + row.addBinary(colIdx, record.getAsString(colName).getBytes()); + break; + case INT8: + case INT16: + short temp = (short)record.getAsInt(colName).intValue(); + row.addShort(colIdx, temp); + case INT32: + row.addInt(colIdx, record.getAsInt(colName)); + break; + case INT64: + row.addLong(colIdx, record.getAsLong(colName)); + break; + case STRING: + row.addString(colIdx, record.getAsString(colName)); + break; + default: + throw new IllegalStateException(String.format("unknown column type %s", colType)); + } + } + } + } + + private int getColumnIndex(Schema columns, String colName) { + try { + return columns.getColumnIndex(colName); + } catch (Exception ex) { + return -1; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..908723c --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.kudu.PutKudu + http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java new file mode 100644 index 0000000..feef584 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/MockPutKudu.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kudu; + +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.Upsert; + +import org.apache.nifi.serialization.record.Record; + +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class MockPutKudu extends PutKudu{ + @Override + protected Insert insertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) { + return mock(Insert.class); + } + + @Override + protected Upsert upsertRecordToKudu(KuduTable kuduTable, Record record, List<String> fieldNames) { + return mock(Upsert.class); + } + + @Override + protected KuduClient getKuduConnection(String masters) { + return mock(KuduClient.class); + } + + @Override + protected KuduSession getKuduSession(KuduClient client){ + return mock(KuduSession.class); + } + + @Override + protected KuduTable getKuduTable(KuduClient client, String tableName) throws KuduException { + return mock(KuduTable.class); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java new file mode 100644 index 0000000..fa3aa77 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/TestPutKudu.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.kudu; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +public class TestPutKudu { + + public static final String DEFAULT_TABLE_NAME = "Nifi-Kudu-Table"; + public static final String DEFAULT_MASTERS = "testLocalHost:7051"; + public static final String SKIP_HEAD_LINE = "false"; + public static final String TABLE_SCHEMA = "id,stringVal,num32Val,doubleVal"; + + private TestRunner testRunner; + private MockPutKudu processor; + private MockRecordParser readerFactory; + + @Before + public void setUp() { + processor = new MockPutKudu(); + testRunner = TestRunners.newTestRunner(processor); + testRunner.setProperty(PutKudu.TABLE_NAME, DEFAULT_TABLE_NAME); + testRunner.setProperty(PutKudu.KUDU_MASTERS, DEFAULT_MASTERS); + testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, SKIP_HEAD_LINE); + testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory"); + testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.INSERT.toString()); + } + + @After + public void close() { + testRunner = null; + } + + private void createRecordReader(int numOfRecord) throws InitializationException { + + readerFactory = new MockRecordParser(); + readerFactory.addSchemaField("id", RecordFieldType.INT); + readerFactory.addSchemaField("stringVal", RecordFieldType.STRING); + readerFactory.addSchemaField("num32Val", RecordFieldType.INT); + readerFactory.addSchemaField("doubleVal", RecordFieldType.DOUBLE); + + for (int i=0; i < numOfRecord; i++) { + readerFactory.addRecord(i, "val_" + i, 1000 + i, 100.88 + i); + } + + testRunner.addControllerService("mock-reader-factory", readerFactory); + testRunner.enableControllerService(readerFactory); + } + + @Test + public void testWriteKuduWithDefaults() throws IOException, InitializationException { + createRecordReader(100); + + final String filename = "testWriteKudu-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1); + + // verify the successful flow file has the expected content & attributes + final MockFlowFile mockFlowFile = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0); + mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename); + mockFlowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "100"); + mockFlowFile.assertContentEquals("trigger"); + + // verify we generated a provenance event + final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents(); + Assert.assertEquals(1, provEvents.size()); + + // verify it was a SEND event with the correct URI + final ProvenanceEventRecord provEvent = provEvents.get(0); + Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType()); + } + + @Test + public void testInvalidReaderShouldRouteToFailure() throws InitializationException, SchemaNotFoundException, MalformedRecordException, IOException { + createRecordReader(0); + + // simulate throwing an IOException when the factory creates a reader which is what would happen when + // invalid Avro is passed to the Avro reader factory + final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class); + when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory"); + when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenThrow(new IOException("NOT AVRO")); + + testRunner.addControllerService("mock-reader-factory", readerFactory); + testRunner.enableControllerService(readerFactory); + testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory"); + + final String filename = "testInvalidAvroShouldRouteToFailure-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutKudu.REL_FAILURE, 1); + } + + @Test + public void testValidSchemaShouldBeSuccessful() throws InitializationException, IOException { + createRecordReader(10); + final String filename = "testValidSchemaShouldBeSuccessful-" + System.currentTimeMillis(); + + // don't provide my.schema as an attribute + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + flowFileAttributes.put("my.schema", TABLE_SCHEMA); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1); + } + + @Test + public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException { + createRecordReader(10); + + final RecordReader recordReader = Mockito.mock(RecordReader.class); + when(recordReader.nextRecord()).thenThrow(new MalformedRecordException("ERROR")); + + final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class); + when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory"); + when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader); + + testRunner.addControllerService("mock-reader-factory", readerFactory); + testRunner.enableControllerService(readerFactory); + testRunner.setProperty(PutKudu.RECORD_READER, "mock-reader-factory"); + + final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutKudu.REL_FAILURE, 1); + } + + @Test + public void testReadAsStringAndWriteAsInt() throws InitializationException, IOException { + createRecordReader(0); + // add the favorite color as a string + readerFactory.addRecord(1, "name0", "0", "89.89"); + + final String filename = "testReadAsStringAndWriteAsInt-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1); + } + + @Test + public void testMissingColumInReader() throws InitializationException, IOException { + createRecordReader(0); + readerFactory.addRecord( "name0", "0", "89.89"); //missing id + + final String filename = "testMissingColumInReader-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutKudu.REL_FAILURE, 1); + } + + @Test + public void testSkipHeadLineTrue() throws InitializationException, IOException { + createRecordReader(100); + testRunner.setProperty(PutKudu.SKIP_HEAD_LINE, "true"); + + final String filename = "testSkipHeadLineTrue-" + System.currentTimeMillis(); + + final Map<String,String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1); + + MockFlowFile flowFiles = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0); + flowFiles.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "99"); + } + + @Test + public void testInsertManyFlowFiles() throws Exception { + createRecordReader(50); + final String content1 = "{ \"field1\" : \"value1\", \"field2\" : \"valu11\" }"; + final String content2 = "{ \"field1\" : \"value1\", \"field2\" : \"value11\" }"; + final String content3 = "{ \"field1\" : \"value3\", \"field2\" : \"value33\" }"; + + testRunner.enqueue(content1.getBytes()); + testRunner.enqueue(content2.getBytes()); + testRunner.enqueue(content3.getBytes()); + + testRunner.run(3); + + testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 3); + List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS); + + flowFiles.get(0).assertContentEquals(content1.getBytes()); + flowFiles.get(0).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50"); + + flowFiles.get(1).assertContentEquals(content2.getBytes()); + flowFiles.get(1).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50"); + + flowFiles.get(2).assertContentEquals(content3.getBytes()); + flowFiles.get(2).assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50"); + } + + @Test + public void testUpsertFlowFiles() throws Exception { + createRecordReader(50); + testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.UPSERT.toString()); + testRunner.enqueue("string".getBytes()); + + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS, 1); + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0); + + flowFile.assertContentEquals("string".getBytes()); + flowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "50"); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/nifi-kudu-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml new file mode 100644 index 0000000..5c97172 --- /dev/null +++ b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.4.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kudu-bundle</artifactId> + <version>1.4.0-SNAPSHOT</version> + <packaging>pom</packaging> + + <modules> + <module>nifi-kudu-processors</module> + <module>nifi-kudu-nar</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 6aef817..456291f 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -34,6 +34,7 @@ <module>nifi-update-attribute-bundle</module> <module>nifi-kafka-bundle</module> <module>nifi-kite-bundle</module> + <module>nifi-kudu-bundle</module> <module>nifi-solr-bundle</module> <module>nifi-confluent-platform-bundle</module> <module>nifi-aws-bundle</module> http://git-wip-us.apache.org/repos/asf/nifi/blob/e3da44fb/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4eab25c..b719f6f 100644 --- a/pom.xml +++ b/pom.xml @@ -1103,6 +1103,18 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kudu-nar</artifactId> + <version>1.4.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-kudu-nar</artifactId> + <version>1.4.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-mongodb-nar</artifactId> <version>1.4.0-SNAPSHOT</version> <type>nar</type>
