NIFI-1174 Refactoring the HBase client API and adding a PutHBaseJSON which can write a whole row from a single json document - Adding Complex Field Strategy to PutHBaseJSON to allow more control of complex fields - Improving error messages to indicate what the problem was with an invalid row
Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/40dd8a0a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/40dd8a0a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/40dd8a0a Branch: refs/heads/NIFI-655 Commit: 40dd8a0a845ef5f4d4fde451f02376ab2fab9758 Parents: 8c2323d Author: Bryan Bende <[email protected]> Authored: Wed Nov 18 17:24:49 2015 -0500 Committer: Bryan Bende <[email protected]> Committed: Thu Nov 19 13:49:02 2015 -0500 ---------------------------------------------------------------------- .../nifi-hbase-processors/pom.xml | 4 + .../nifi/hbase/AbstractHBaseProcessor.java | 23 - .../org/apache/nifi/hbase/AbstractPutHBase.java | 183 ++++++++ .../java/org/apache/nifi/hbase/GetHBase.java | 3 +- .../org/apache/nifi/hbase/PutHBaseCell.java | 153 +------ .../org/apache/nifi/hbase/PutHBaseJSON.java | 230 ++++++++++ .../org.apache.nifi.processor.Processor | 3 +- .../org/apache/nifi/hbase/HBaseTestUtil.java | 87 ++++ .../nifi/hbase/MockHBaseClientService.java | 14 +- .../org/apache/nifi/hbase/TestPutHBaseCell.java | 60 ++- .../org/apache/nifi/hbase/TestPutHBaseJSON.java | 423 +++++++++++++++++++ .../apache/nifi/hbase/HBaseClientService.java | 11 + .../org/apache/nifi/hbase/put/PutColumn.java | 47 +++ .../org/apache/nifi/hbase/put/PutFlowFile.java | 38 +- .../nifi/hbase/HBase_1_1_2_ClientService.java | 25 +- .../hbase/TestHBase_1_1_2_ClientService.java | 27 +- 16 files changed, 1119 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml index b474c6a..abbe4c9 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml @@ -50,6 +50,10 @@ <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java deleted file mode 100644 index 9cce35e..0000000 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractHBaseProcessor.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.hbase; - -import org.apache.nifi.processor.AbstractProcessor; - -public abstract class AbstractHBaseProcessor extends AbstractProcessor { - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java new file mode 100644 index 0000000..87424f9 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Base class for processors that put data to HBase. + */ +public abstract class AbstractPutHBase extends AbstractProcessor { + + protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() + .name("HBase Client Service") + .description("Specifies the Controller Service to use for accessing HBase.") + .required(true) + .identifiesControllerService(HBaseClientService.class) + .build(); + protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() + .name("Table Name") + .description("The name of the HBase Table to put data into") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final PropertyDescriptor ROW_ID = new PropertyDescriptor.Builder() + .name("Row Identifier") + .description("Specifies the Row ID to use when inserting data into HBase") + .required(false) // not all sub-classes will require this + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder() + .name("Column Family") + .description("The Column Family to use when inserting data into HBase") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder() + .name("Column Qualifier") + .description("The Column Qualifier to use when inserting data into HBase") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " + + "grouped by table, and a single Put per table will be performed.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("25") + .build(); + + protected static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship after it has been successfully stored in HBase") + .build(); + protected static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if it cannot be sent to HBase") + .build(); + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + List<FlowFile> flowFiles = session.get(batchSize); + if (flowFiles == null || flowFiles.size() == 0) { + return; + } + + final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>(); + + // Group FlowFiles by HBase Table + for (final FlowFile flowFile : flowFiles) { + final PutFlowFile putFlowFile = createPut(session, context, flowFile); + + if (putFlowFile == null) { + // sub-classes should log appropriate error messages before returning null + session.transfer(flowFile, REL_FAILURE); + } else if (!putFlowFile.isValid()) { + if (StringUtils.isBlank(putFlowFile.getTableName())) { + getLogger().error("Missing table name for FlowFile {}; routing to failure", new Object[]{flowFile}); + } else if (StringUtils.isBlank(putFlowFile.getRow())) { + getLogger().error("Missing row id for FlowFile {}; routing to failure", new Object[]{flowFile}); + } else if (putFlowFile.getColumns() == null || putFlowFile.getColumns().isEmpty()) { + getLogger().error("No columns provided for FlowFile {}; routing to failure", new Object[]{flowFile}); + } else { + // really shouldn't get here, but just in case + getLogger().error("Failed to produce a put for FlowFile {}; routing to failure", new Object[]{flowFile}); + } + session.transfer(flowFile, REL_FAILURE); + } else { + List<PutFlowFile> putFlowFiles = tablePuts.get(putFlowFile.getTableName()); + if (putFlowFiles == null) { + putFlowFiles = new ArrayList<>(); + tablePuts.put(putFlowFile.getTableName(), putFlowFiles); + } + putFlowFiles.add(putFlowFile); + } + } + + getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[]{flowFiles.size(), tablePuts.size()}); + + final long start = System.nanoTime(); + final List<PutFlowFile> successes = new ArrayList<>(); + final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); + + for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) { + try { + hBaseClientService.put(entry.getKey(), entry.getValue()); + successes.addAll(entry.getValue()); + } catch (Exception e) { + getLogger().error(e.getMessage(), e); + + for (PutFlowFile putFlowFile : entry.getValue()) { + getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e}); + final FlowFile failure = session.penalize(putFlowFile.getFlowFile()); + session.transfer(failure, REL_FAILURE); + } + } + } + + final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[]{successes.size(), sendMillis}); + + for (PutFlowFile putFlowFile : successes) { + session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS); + final String details = "Put " + putFlowFile.getColumns().size() + " cells to HBase"; + session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), details, sendMillis); + } + + } + + protected String getTransitUri(PutFlowFile putFlowFile) { + return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow(); + } + + /** + * Sub-classes provide the implementation to create a put from a FlowFile. + * + * @param session + * the current session + * @param context + * the current context + * @param flowFile + * the FlowFile to create a Put from + * + * @return a PutFlowFile instance for the given FlowFile + */ + protected abstract PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java index 5f08265..98a612c 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java @@ -41,6 +41,7 @@ import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; import org.apache.nifi.hbase.util.ObjectSerDe; import org.apache.nifi.hbase.util.StringSerDe; +import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; @@ -83,7 +84,7 @@ import java.util.regex.Pattern; @WritesAttribute(attribute = "hbase.table", description = "The name of the HBase table that the data was pulled from"), @WritesAttribute(attribute = "mime.type", description = "Set to application/json to indicate that output is JSON") }) -public class GetHBase extends AbstractHBaseProcessor { +public class GetHBase extends AbstractProcessor { static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*"); http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java index 0a2b763..759d91e 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.hbase; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -24,91 +23,36 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; -import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.HashMap; +import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; @EventDriven @SupportsBatching @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @Tags({"hadoop", "hbase"}) @CapabilityDescription("Adds the Contents of a FlowFile to HBase as the value of a single cell") -public class PutHBaseCell extends AbstractProcessor { - - protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder() - .name("HBase Client Service") - .description("Specifies the Controller Service to use for accessing HBase.") - .required(true) - .identifiesControllerService(HBaseClientService.class) - .build(); - protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder() - .name("Table Name") - .description("The name of the HBase Table to put data into") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - static final PropertyDescriptor ROW = new PropertyDescriptor.Builder() - .name("Row Identifier") - .description("Specifies the Row ID to use when inserting data into HBase") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - static final PropertyDescriptor COLUMN_FAMILY = new PropertyDescriptor.Builder() - .name("Column Family") - .description("The Column Family to use when inserting data into HBase") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - static final PropertyDescriptor COLUMN_QUALIFIER = new PropertyDescriptor.Builder() - .name("Column Qualifier") - .description("The Column Qualifier to use when inserting data into HBase") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " + - "grouped by table, and a single Put per table will be performed.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("25") - .build(); - - static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("A FlowFile is routed to this relationship after it has been successfully stored in HBase") - .build(); - static final Relationship FAILURE = new Relationship.Builder() - .name("failure") - .description("A FlowFile is routed to this relationship if it cannot be sent to HBase") - .build(); +public class PutHBaseCell extends AbstractPutHBase { @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { final List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(HBASE_CLIENT_SERVICE); properties.add(TABLE_NAME); - properties.add(ROW); + properties.add(ROW_ID); properties.add(COLUMN_FAMILY); properties.add(COLUMN_QUALIFIER); properties.add(BATCH_SIZE); @@ -119,84 +63,27 @@ public class PutHBaseCell extends AbstractProcessor { public Set<Relationship> getRelationships() { final Set<Relationship> rels = new HashSet<>(); rels.add(REL_SUCCESS); - rels.add(FAILURE); + rels.add(REL_FAILURE); return rels; } @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); - List<FlowFile> flowFiles = session.get(batchSize); - if (flowFiles == null || flowFiles.size() == 0) { - return; - } - - final Map<String,List<PutFlowFile>> tablePuts = new HashMap<>(); - - // Group FlowFiles by HBase Table - for (final FlowFile flowFile : flowFiles) { - final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); - final String row = context.getProperty(ROW).evaluateAttributeExpressions(flowFile).getValue(); - final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); - final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue(); - - if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || StringUtils.isBlank(columnFamily) || StringUtils.isBlank(columnQualifier)) { - getLogger().error("Invalid FlowFile {} missing table, row, column familiy, or column qualifier; routing to failure", new Object[]{flowFile}); - session.transfer(flowFile, FAILURE); - } else { - final byte[] buffer = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, buffer); - } - }); - - final PutFlowFile putFlowFile = new PutFlowFile(tableName, row, columnFamily, columnQualifier, buffer, flowFile); - - List<PutFlowFile> putFlowFiles = tablePuts.get(tableName); - if (putFlowFiles == null) { - putFlowFiles = new ArrayList<>(); - tablePuts.put(tableName, putFlowFiles); - } - putFlowFiles.add(putFlowFile); - } - } - - getLogger().debug("Sending {} FlowFiles to HBase in {} put operations", new Object[] {flowFiles.size(), tablePuts.size()}); - - final long start = System.nanoTime(); - final List<PutFlowFile> successes = new ArrayList<>(); - final HBaseClientService hBaseClientService = context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class); - - for (Map.Entry<String, List<PutFlowFile>> entry : tablePuts.entrySet()) { - try { - hBaseClientService.put(entry.getKey(), entry.getValue()); - successes.addAll(entry.getValue()); - } catch (Exception e) { - getLogger().error(e.getMessage(), e); - - for (PutFlowFile putFlowFile : entry.getValue()) { - getLogger().error("Failed to send {} to HBase due to {}; routing to failure", new Object[]{putFlowFile.getFlowFile(), e}); - final FlowFile failure = session.penalize(putFlowFile.getFlowFile()); - session.transfer(failure, FAILURE); - } + protected PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile) { + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String row = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue(); + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue(); + + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); } - } - - final long sendMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - getLogger().debug("Sent {} FlowFiles to HBase successfully in {} milliseconds", new Object[] {successes.size(), sendMillis}); - - for (PutFlowFile putFlowFile : successes) { - session.transfer(putFlowFile.getFlowFile(), REL_SUCCESS); - session.getProvenanceReporter().send(putFlowFile.getFlowFile(), getTransitUri(putFlowFile), sendMillis); - } - - } + }); - protected String getTransitUri(PutFlowFile putFlowFile) { - return "hbase://" + putFlowFile.getTableName() + "/" + putFlowFile.getRow() + "/" + putFlowFile.getColumnFamily() - + ":" + putFlowFile.getColumnQualifier(); + final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily, columnQualifier, buffer)); + return new PutFlowFile(tableName, row, columns, flowFile); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java new file mode 100644 index 0000000..0dba7ee --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.ObjectHolder; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +@EventDriven +@SupportsBatching +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"hadoop", "hbase", "put", "json"}) +@CapabilityDescription("Adds rows to HBase based on the contents of incoming JSON documents. Each FlowFile must contain a single " + + "UTF-8 encoded JSON document, and any FlowFiles where the root element is not a single document will be routed to failure. " + + "Each JSON field name and value will become a column qualifier and value of the HBase row. Any fields with a null value " + + "will be skipped, and fields with a complex value will be handled according to the Complex Field Strategy. " + + "The row id can be specified either directly on the processor through the Row Identifier property, or can be extracted from the JSON " + + "document by specifying the Row Identifier Field Name property. This processor will hold the contents of all FlowFiles for the given batch " + + "in memory at one time.") +public class PutHBaseJSON extends AbstractPutHBase { + + protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder() + .name("Row Identifier Field Name") + .description("Specifies the name of a JSON element whose value should be used as the row id for the given JSON document.") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final String FAIL_VALUE = "Fail"; + protected static final String WARN_VALUE = "Warn"; + protected static final String IGNORE_VALUE = "Ignore"; + protected static final String TEXT_VALUE = "Text"; + + protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue(FAIL_VALUE, FAIL_VALUE, "Route entire FlowFile to failure if any elements contain complex values."); + protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue(WARN_VALUE, WARN_VALUE, "Provide a warning and do not include field in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue(IGNORE_VALUE, IGNORE_VALUE, "Silently ignore and do not include in row sent to HBase."); + protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue(TEXT_VALUE, TEXT_VALUE, "Use the string representation of the complex field as the value of the given column."); + + protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder() + .name("Complex Field Strategy") + .description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT) + .defaultValue(COMPLEX_FIELD_TEXT.getValue()) + .build(); + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(HBASE_CLIENT_SERVICE); + properties.add(TABLE_NAME); + properties.add(ROW_ID); + properties.add(ROW_FIELD_NAME); + properties.add(COLUMN_FAMILY); + properties.add(BATCH_SIZE); + properties.add(COMPLEX_FIELD_STRATEGY); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + return rels; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final Collection<ValidationResult> results = new ArrayList<>(); + + final String rowId = validationContext.getProperty(ROW_ID).getValue(); + final String rowFieldName = validationContext.getProperty(ROW_FIELD_NAME).getValue(); + + if (StringUtils.isBlank(rowId) && StringUtils.isBlank(rowFieldName)) { + results.add(new ValidationResult.Builder() + .subject(this.getClass().getSimpleName()) + .explanation("Row Identifier or Row Identifier Field Name is required") + .valid(false) + .build()); + } else if (!StringUtils.isBlank(rowId) && !StringUtils.isBlank(rowFieldName)) { + results.add(new ValidationResult.Builder() + .subject(this.getClass().getSimpleName()) + .explanation("Row Identifier and Row Identifier Field Name can not be used together") + .valid(false) + .build()); + } + + return results; + } + + @Override + protected PutFlowFile createPut(final ProcessSession session, final ProcessContext context, final FlowFile flowFile) { + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String rowId = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue(); + final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue(); + final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue(); + final boolean extractRowId = !StringUtils.isBlank(rowFieldName); + final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue(); + + // Parse the JSON document + final ObjectMapper mapper = new ObjectMapper(); + final ObjectHolder<JsonNode> rootNodeRef = new ObjectHolder<>(null); + try { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + try (final InputStream bufferedIn = new BufferedInputStream(in)) { + rootNodeRef.set(mapper.readTree(bufferedIn)); + } + } + }); + } catch (final ProcessException pe) { + getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[]{flowFile, pe.toString()}, pe); + return null; + } + + final JsonNode rootNode = rootNodeRef.get(); + + if (rootNode.isArray()) { + getLogger().error("Root node of JSON must be a single document, found array for {}; routing to failure", new Object[]{flowFile}); + return null; + } + + final Collection<PutColumn> columns = new ArrayList<>(); + final ObjectHolder<String> rowIdHolder = new ObjectHolder<>(null); + + // convert each field/value to a column for the put, skip over nulls and arrays + final Iterator<String> fieldNames = rootNode.getFieldNames(); + while (fieldNames.hasNext()) { + final String fieldName = fieldNames.next(); + final ObjectHolder<String> fieldValueHolder = new ObjectHolder<>(null); + + final JsonNode fieldNode = rootNode.get(fieldName); + if (fieldNode.isNull()) { + getLogger().debug("Skipping {} because value was null", new Object[]{fieldName}); + } else if (fieldNode.isValueNode()) { + fieldValueHolder.set(fieldNode.asText()); + } else { + // for non-null, non-value nodes, determine what to do based on the handling strategy + switch (complexFieldStrategy) { + case FAIL_VALUE: + getLogger().error("Complex value found for {}; routing to failure", new Object[]{fieldName}); + return null; + case WARN_VALUE: + getLogger().warn("Complex value found for {}; skipping", new Object[]{fieldName}); + break; + case TEXT_VALUE: + // use toString() here because asText() is only guaranteed to be supported on value nodes + // some other types of nodes, like ArrayNode, provide toString implementations + fieldValueHolder.set(fieldNode.toString()); + break; + case IGNORE_VALUE: + // silently skip + break; + default: + break; + } + } + + // if we have a field value, then see if this is the row id field, if so store the value for later + // otherwise add a new column where the fieldName and fieldValue are the column qualifier and value + if (fieldValueHolder.get() != null) { + if (extractRowId && fieldName.equals(rowFieldName)) { + rowIdHolder.set(fieldValueHolder.get()); + } else { + columns.add(new PutColumn(columnFamily, fieldName, fieldValueHolder.get().getBytes(StandardCharsets.UTF_8))); + } + } + } + + // if we are expecting a field name to use for the row id and the incoming document doesn't have it + // log an error message so the user can see what the field names were and return null so it gets routed to failure + if (extractRowId && rowIdHolder.get() == null) { + final String fieldNameStr = StringUtils.join(rootNode.getFieldNames(), ","); + getLogger().error("Row ID field named '{}' not found in field names '{}'; routing to failure", new Object[] {rowFieldName, fieldNameStr}); + return null; + } + + final String putRowId = (extractRowId ? rowIdHolder.get() : rowId); + return new PutFlowFile(tableName, putRowId, columns, flowFile); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 613515d..6e2af81 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,4 +14,5 @@ # limitations under the License. org.apache.nifi.hbase.GetHBase -org.apache.nifi.hbase.PutHBaseCell \ No newline at end of file +org.apache.nifi.hbase.PutHBaseCell +org.apache.nifi.hbase.PutHBaseJSON \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java new file mode 100644 index 0000000..fc30f73 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.hbase.put.PutColumn; +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertTrue; + +public class HBaseTestUtil { + + public static void verifyPut(final String row, final String columnFamily, final Map<String,String> columns, final List<PutFlowFile> puts) { + boolean foundPut = false; + + for (final PutFlowFile put : puts) { + if (!row.equals(put.getRow())) { + continue; + } + + if (put.getColumns() == null || put.getColumns().size() != columns.size()) { + continue; + } + + // start off assuming we have all the columns + boolean foundAllColumns = true; + + for (Map.Entry<String, String> entry : columns.entrySet()) { + // determine if we have the current expected column + boolean foundColumn = false; + for (PutColumn putColumn : put.getColumns()) { + final String colVal = new String(putColumn.getBuffer(), StandardCharsets.UTF_8); + if (columnFamily.equals(putColumn.getColumnFamily()) && entry.getKey().equals(putColumn.getColumnQualifier()) + && entry.getValue().equals(colVal)) { + foundColumn = true; + break; + } + } + + // if we didn't have the current expected column we know we don't have all expected columns + if (!foundColumn) { + foundAllColumns = false; + break; + } + } + + // if we found all the expected columns this was a match so we can break + if (foundAllColumns) { + foundPut = true; + break; + } + } + + assertTrue(foundPut); + } + + public static void verifyEvent(final List<ProvenanceEventRecord> events, final String uri, final ProvenanceEventType eventType) { + boolean foundEvent = false; + for (final ProvenanceEventRecord event : events) { + if (event.getTransitUri().equals(uri) && event.getEventType().equals(eventType)) { + foundEvent = true; + break; + } + } + assertTrue(foundEvent); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java index a2abf7e..bca8b4f 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java @@ -17,6 +17,7 @@ package org.apache.nifi.hbase; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; @@ -33,7 +34,7 @@ import java.util.Map; public class MockHBaseClientService extends AbstractControllerService implements HBaseClientService { private Map<String,ResultCell[]> results = new HashMap<>(); - private Map<String, List<PutFlowFile>> puts = new HashMap<>(); + private Map<String, List<PutFlowFile>> flowFilePuts = new HashMap<>(); private boolean throwException = false; @Override @@ -42,7 +43,12 @@ public class MockHBaseClientService extends AbstractControllerService implements throw new IOException("exception"); } - this.puts.put(tableName, new ArrayList<>(puts)); + this.flowFilePuts.put(tableName, new ArrayList<>(puts)); + } + + @Override + public void put(String tableName, String rowId, Collection<PutColumn> columns) throws IOException { + throw new UnsupportedOperationException(); } @Override @@ -92,8 +98,8 @@ public class MockHBaseClientService extends AbstractControllerService implements results.put(rowKey, cellArray); } - public Map<String, List<PutFlowFile>> getPuts() { - return puts; + public Map<String, List<PutFlowFile>> getFlowFilePuts() { + return flowFilePuts; } public void setThrowException(boolean throwException) { http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java index 62fa9a6..0cd8ff7 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.hbase; +import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; @@ -43,7 +44,7 @@ public class TestPutHBaseCell { final TestRunner runner = TestRunners.newTestRunner(PutHBaseCell.class); runner.setProperty(PutHBaseCell.TABLE_NAME, tableName); - runner.setProperty(PutHBaseCell.ROW, row); + runner.setProperty(PutHBaseCell.ROW_ID, row); runner.setProperty(PutHBaseCell.COLUMN_FAMILY, columnFamily); runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, columnQualifier); runner.setProperty(PutHBaseCell.BATCH_SIZE, "1"); @@ -58,12 +59,14 @@ public class TestPutHBaseCell { final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); outFile.assertContentEquals(content); - assertNotNull(hBaseClient.getPuts()); - assertEquals(1, hBaseClient.getPuts().size()); + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); - List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName); + List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName); assertEquals(1, puts.size()); verifyPut(row, columnFamily, columnQualifier, content, puts.get(0)); + + assertEquals(1, runner.getProvenanceEvents().size()); } @Test @@ -89,12 +92,14 @@ public class TestPutHBaseCell { final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); outFile.assertContentEquals(content); - assertNotNull(hBaseClient.getPuts()); - assertEquals(1, hBaseClient.getPuts().size()); + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); - List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName); + List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName); assertEquals(1, puts.size()); verifyPut(row, columnFamily, columnQualifier, content, puts.get(0)); + + assertEquals(1, runner.getProvenanceEvents().size()); } @Test @@ -115,7 +120,9 @@ public class TestPutHBaseCell { runner.run(); runner.assertTransferCount(PutHBaseCell.REL_SUCCESS, 0); - runner.assertTransferCount(PutHBaseCell.FAILURE, 1); + runner.assertTransferCount(PutHBaseCell.REL_FAILURE, 1); + + assertEquals(0, runner.getProvenanceEvents().size()); } @Test @@ -142,7 +149,9 @@ public class TestPutHBaseCell { runner.run(); runner.assertTransferCount(PutHBaseCell.REL_SUCCESS, 1); - runner.assertTransferCount(PutHBaseCell.FAILURE, 1); + runner.assertTransferCount(PutHBaseCell.REL_FAILURE, 1); + + assertEquals(1, runner.getProvenanceEvents().size()); } @Test @@ -171,13 +180,15 @@ public class TestPutHBaseCell { final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); outFile.assertContentEquals(content1); - assertNotNull(hBaseClient.getPuts()); - assertEquals(1, hBaseClient.getPuts().size()); + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); - List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName); + List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName); assertEquals(2, puts.size()); verifyPut(row1, columnFamily, columnQualifier, content1, puts.get(0)); verifyPut(row2, columnFamily, columnQualifier, content2, puts.get(1)); + + assertEquals(2, runner.getProvenanceEvents().size()); } @Test @@ -202,7 +213,9 @@ public class TestPutHBaseCell { runner.enqueue(content2.getBytes("UTF-8"), attributes2); runner.run(); - runner.assertAllFlowFilesTransferred(PutHBaseCell.FAILURE, 2); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 2); + + assertEquals(0, runner.getProvenanceEvents().size()); } @Test @@ -229,13 +242,15 @@ public class TestPutHBaseCell { final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); outFile.assertContentEquals(content1); - assertNotNull(hBaseClient.getPuts()); - assertEquals(1, hBaseClient.getPuts().size()); + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); - List<PutFlowFile> puts = hBaseClient.getPuts().get(tableName); + List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName); assertEquals(2, puts.size()); verifyPut(row, columnFamily, columnQualifier, content1, puts.get(0)); verifyPut(row, columnFamily, columnQualifier, content2, puts.get(1)); + + assertEquals(2, runner.getProvenanceEvents().size()); } private Map<String, String> getAtrributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) { @@ -250,7 +265,7 @@ public class TestPutHBaseCell { private TestRunner getTestRunnerWithEL(PutHBaseCell proc) { final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutHBaseCell.TABLE_NAME, "${hbase.tableName}"); - runner.setProperty(PutHBaseCell.ROW, "${hbase.row}"); + runner.setProperty(PutHBaseCell.ROW_ID, "${hbase.row}"); runner.setProperty(PutHBaseCell.COLUMN_FAMILY, "${hbase.columnFamily}"); runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, "${hbase.columnQualifier}"); return runner; @@ -266,9 +281,14 @@ public class TestPutHBaseCell { private void verifyPut(String row, String columnFamily, String columnQualifier, String content, PutFlowFile put) { assertEquals(row, put.getRow()); - assertEquals(columnFamily, put.getColumnFamily()); - assertEquals(columnQualifier, put.getColumnQualifier()); - assertEquals(content, new String(put.getBuffer(), StandardCharsets.UTF_8)); + + assertNotNull(put.getColumns()); + assertEquals(1, put.getColumns().size()); + + final PutColumn column = put.getColumns().iterator().next(); + assertEquals(columnFamily, column.getColumnFamily()); + assertEquals(columnQualifier, column.getColumnQualifier()); + assertEquals(content, new String(column.getBuffer(), StandardCharsets.UTF_8)); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java new file mode 100644 index 0000000..7b59919 --- /dev/null +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase; + +import org.apache.nifi.hbase.put.PutFlowFile; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestPutHBaseJSON { + + public static final String DEFAULT_TABLE_NAME = "nifi"; + public static final String DEFAULT_ROW = "row1"; + public static final String DEFAULT_COLUMN_FAMILY = "family1"; + + @Test + public void testCustomValidate() throws InitializationException { + // missing row id and row id field name should be invalid + TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + getHBaseClientService(runner); + runner.assertNotValid(); + + // setting both properties should still be invalid + runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, "rowId"); + runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowFieldName"); + runner.assertNotValid(); + + // only a row id field name should make it valid + runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowFieldName"); + runner.assertValid(); + + // only a row id should make it valid + runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, "rowId"); + runner.assertValid(); + } + + @Test + public void testSingleJsonDocAndProvidedRowId() throws IOException, InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + + final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }"; + runner.enqueue(content.getBytes("UTF-8")); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); + assertEquals(1, puts.size()); + + final Map<String,String> expectedColumns = new HashMap<>(); + expectedColumns.put("field1", "value1"); + expectedColumns.put("field2", "value2"); + HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + assertEquals("hbase://" + DEFAULT_TABLE_NAME + "/" + DEFAULT_ROW, event.getTransitUri()); + } + + @Test + public void testSingJsonDocAndExtractedRowId() throws IOException, InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowField"); + + final String content = "{ \"rowField\" : \"myRowId\", \"field1\" : \"value1\", \"field2\" : \"value2\" }"; + runner.enqueue(content.getBytes(StandardCharsets.UTF_8)); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); + assertEquals(1, puts.size()); + + // should be a put with row id of myRowId, and rowField shouldn't end up in the columns + final Map<String,String> expectedColumns1 = new HashMap<>(); + expectedColumns1.put("field1", "value1"); + expectedColumns1.put("field2", "value2"); + HBaseTestUtil.verifyPut("myRowId", DEFAULT_COLUMN_FAMILY, expectedColumns1, puts); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + assertEquals(1, events.size()); + HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://" + DEFAULT_TABLE_NAME + "/myRowId", ProvenanceEventType.SEND); + } + + @Test + public void testSingJsonDocAndExtractedRowIdMissingField() throws IOException, InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "rowField"); + + final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }"; + runner.enqueue(content.getBytes(StandardCharsets.UTF_8)); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0); + outFile.assertContentEquals(content); + + // should be no provenance events + assertEquals(0, runner.getProvenanceEvents().size()); + + // no puts should have made it to the client + assertEquals(0, hBaseClient.getFlowFilePuts().size()); + } + + @Test + public void testMultipleJsonDocsRouteToFailure() throws IOException, InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + + final String content1 = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }"; + final String content2 = "{ \"field3\" : \"value3\", \"field4\" : \"value4\" }"; + final String content = "[ " + content1 + " , " + content2 + " ]"; + + runner.enqueue(content.getBytes(StandardCharsets.UTF_8)); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0); + outFile.assertContentEquals(content); + + // should be no provenance events + assertEquals(0, runner.getProvenanceEvents().size()); + + // no puts should have made it to the client + assertEquals(0, hBaseClient.getFlowFilePuts().size()); + } + + @Test + public void testELWithProvidedRowId() throws IOException, InitializationException { + final TestRunner runner = getTestRunner("${hbase.table}", "${hbase.colFamily}", "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, "${hbase.rowId}"); + + final Map<String,String> attributes = new HashMap<>(); + attributes.put("hbase.table", "myTable"); + attributes.put("hbase.colFamily", "myColFamily"); + attributes.put("hbase.rowId", "myRowId"); + + final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }"; + runner.enqueue(content.getBytes("UTF-8"), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable"); + assertEquals(1, puts.size()); + + final Map<String,String> expectedColumns = new HashMap<>(); + expectedColumns.put("field1", "value1"); + expectedColumns.put("field2", "value2"); + HBaseTestUtil.verifyPut("myRowId", "myColFamily", expectedColumns, puts); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + assertEquals(1, events.size()); + HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://myTable/myRowId", ProvenanceEventType.SEND); + } + + @Test + public void testELWithExtractedRowId() throws IOException, InitializationException { + final TestRunner runner = getTestRunner("${hbase.table}", "${hbase.colFamily}", "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_FIELD_NAME, "${hbase.rowField}"); + + final Map<String,String> attributes = new HashMap<>(); + attributes.put("hbase.table", "myTable"); + attributes.put("hbase.colFamily", "myColFamily"); + attributes.put("hbase.rowField", "field1"); + + final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }"; + runner.enqueue(content.getBytes("UTF-8"), attributes); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get("myTable"); + assertEquals(1, puts.size()); + + final Map<String,String> expectedColumns = new HashMap<>(); + expectedColumns.put("field2", "value2"); + HBaseTestUtil.verifyPut("value1", "myColFamily", expectedColumns, puts); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + assertEquals(1, events.size()); + HBaseTestUtil.verifyEvent(runner.getProvenanceEvents(), "hbase://myTable/value1", ProvenanceEventType.SEND); + } + + @Test + public void testNullAndArrayElementsWithWarnStrategy() throws InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_WARN.getValue()); + + // should route to success because there is at least one valid field + final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }"; + runner.enqueue(content.getBytes(StandardCharsets.UTF_8)); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); + assertEquals(1, puts.size()); + + // should have skipped field1 and field3 + final Map<String,String> expectedColumns = new HashMap<>(); + expectedColumns.put("field2", "value2"); + HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); + } + + @Test + public void testNullAndArrayElementsWithIgnoreStrategy() throws InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_IGNORE.getValue()); + + // should route to success because there is at least one valid field + final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }"; + runner.enqueue(content.getBytes(StandardCharsets.UTF_8)); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); + assertEquals(1, puts.size()); + + // should have skipped field1 and field3 + final Map<String,String> expectedColumns = new HashMap<>(); + expectedColumns.put("field2", "value2"); + HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); + } + + @Test + public void testNullAndArrayElementsWithFailureStrategy() throws InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_FAIL.getValue()); + + // should route to success because there is at least one valid field + final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }"; + runner.enqueue(content.getBytes(StandardCharsets.UTF_8)); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0); + outFile.assertContentEquals(content); + + // should be no provenance events + assertEquals(0, runner.getProvenanceEvents().size()); + + // no puts should have made it to the client + assertEquals(0, hBaseClient.getFlowFilePuts().size()); + } + + @Test + public void testNullAndArrayElementsWithTextStrategy() throws InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_TEXT.getValue()); + + // should route to success because there is at least one valid field + final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : \"value2\", \"field3\" : null }"; + runner.enqueue(content.getBytes(StandardCharsets.UTF_8)); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); + assertEquals(1, puts.size()); + + // should have skipped field1 and field3 + final Map<String,String> expectedColumns = new HashMap<>(); + expectedColumns.put("field1", "[{\"child_field1\":\"child_value1\"}]"); + expectedColumns.put("field2", "value2"); + HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); + } + + @Test + public void testNestedDocWithTextStrategy() throws InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_TEXT.getValue()); + + // should route to success because there is at least one valid field + final String content = "{ \"field1\" : { \"child_field1\" : \"child_value1\" }, \"field2\" : \"value2\", \"field3\" : null }"; + runner.enqueue(content.getBytes(StandardCharsets.UTF_8)); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS); + + assertNotNull(hBaseClient.getFlowFilePuts()); + assertEquals(1, hBaseClient.getFlowFilePuts().size()); + + final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME); + assertEquals(1, puts.size()); + + // should have skipped field1 and field3 + final Map<String,String> expectedColumns = new HashMap<>(); + expectedColumns.put("field1", "{\"child_field1\":\"child_value1\"}"); + expectedColumns.put("field2", "value2"); + HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, expectedColumns, puts); + } + + @Test + public void testAllElementsAreNullOrArrays() throws InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + final MockHBaseClientService hBaseClient = getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + runner.setProperty(PutHBaseJSON.COMPLEX_FIELD_STRATEGY, PutHBaseJSON.COMPLEX_FIELD_WARN.getValue()); + + // should route to failure since it would produce a put with no columns + final String content = "{ \"field1\" : [{ \"child_field1\" : \"child_value1\" }], \"field2\" : null }"; + runner.enqueue(content.getBytes(StandardCharsets.UTF_8)); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1); + + final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_FAILURE).get(0); + outFile.assertContentEquals(content); + + // should be no provenance events + assertEquals(0, runner.getProvenanceEvents().size()); + + // no puts should have made it to the client + assertEquals(0, hBaseClient.getFlowFilePuts().size()); + } + + @Test + public void testInvalidJson() throws InitializationException { + final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1"); + getHBaseClientService(runner); + runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW); + + final String content = "NOT JSON"; + runner.enqueue(content.getBytes(StandardCharsets.UTF_8)); + runner.run(); + runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1); + } + + private TestRunner getTestRunner(String table, String columnFamily, String batchSize) { + final TestRunner runner = TestRunners.newTestRunner(PutHBaseJSON.class); + runner.setProperty(PutHBaseJSON.TABLE_NAME, table); + runner.setProperty(PutHBaseJSON.COLUMN_FAMILY, columnFamily); + runner.setProperty(PutHBaseJSON.BATCH_SIZE, batchSize); + return runner; + } + + private MockHBaseClientService getHBaseClientService(final TestRunner runner) throws InitializationException { + final MockHBaseClientService hBaseClient = new MockHBaseClientService(); + runner.addControllerService("hbaseClient", hBaseClient); + runner.enableControllerService(hBaseClient); + runner.setProperty(PutHBaseCell.HBASE_CLIENT_SERVICE, "hbaseClient"); + return hBaseClient; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index 9ff2c46..79eef92 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -20,6 +20,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultHandler; @@ -74,6 +75,16 @@ public interface HBaseClientService extends ControllerService { void put(String tableName, Collection<PutFlowFile> puts) throws IOException; /** + * Puts the given row to HBase with the provided columns. + * + * @param tableName the name of an HBase table + * @param rowId the id of the row to put + * @param columns the columns of the row to put + * @throws IOException thrown when there are communication errors with HBase + */ + void put(String tableName, String rowId, Collection<PutColumn> columns) throws IOException; + + /** * Scans the given table using the optional filter criteria and passing each result to the provided handler. * * @param tableName the name of an HBase table to scan http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java new file mode 100644 index 0000000..0971f94 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hbase.put; + +/** + * Encapsulates the information for one column of a put operation. + */ +public class PutColumn { + + private final String columnFamily; + private final String columnQualifier; + private final byte[] buffer; + + + public PutColumn(final String columnFamily, final String columnQualifier, final byte[] buffer) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + this.buffer = buffer; + } + + public String getColumnFamily() { + return columnFamily; + } + + public String getColumnQualifier() { + return columnQualifier; + } + + public byte[] getBuffer() { + return buffer; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java index ed6319e..a97e3a4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutFlowFile.java @@ -16,8 +16,11 @@ */ package org.apache.nifi.hbase.put; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.flowfile.FlowFile; +import java.util.Collection; + /** * Wrapper to encapsulate all of the information for the Put along with the FlowFile. */ @@ -25,18 +28,13 @@ public class PutFlowFile { private final String tableName; private final String row; - private final String columnFamily; - private final String columnQualifier; - private final byte[] buffer; + private final Collection<PutColumn> columns; private final FlowFile flowFile; - public PutFlowFile(String tableName, String row, String columnFamily, String columnQualifier, - byte[] buffer, FlowFile flowFile) { + public PutFlowFile(String tableName, String row, Collection<PutColumn> columns, FlowFile flowFile) { this.tableName = tableName; this.row = row; - this.columnFamily = columnFamily; - this.columnQualifier = columnQualifier; - this.buffer = buffer; + this.columns = columns; this.flowFile = flowFile; } @@ -48,20 +46,26 @@ public class PutFlowFile { return row; } - public String getColumnFamily() { - return columnFamily; + public Collection<PutColumn> getColumns() { + return columns; } - public String getColumnQualifier() { - return columnQualifier; + public FlowFile getFlowFile() { + return flowFile; } - public byte[] getBuffer() { - return buffer; - } + public boolean isValid() { + if (StringUtils.isBlank(tableName) || StringUtils.isBlank(row) || flowFile == null || columns == null || columns.isEmpty()) { + return false; + } - public FlowFile getFlowFile() { - return flowFile; + for (PutColumn column : columns) { + if (StringUtils.isBlank(column.getColumnQualifier()) || StringUtils.isBlank(column.getColumnFamily()) || column.getBuffer() == null) { + return false; + } + } + + return true; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/40dd8a0a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index 42590c2..b207191 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -43,6 +43,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.hbase.put.PutColumn; import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; @@ -195,9 +196,13 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme put = new Put(putFlowFile.getRow().getBytes(StandardCharsets.UTF_8)); rowPuts.put(putFlowFile.getRow(), put); } - put.addColumn(putFlowFile.getColumnFamily().getBytes(StandardCharsets.UTF_8), - putFlowFile.getColumnQualifier().getBytes(StandardCharsets.UTF_8), - putFlowFile.getBuffer()); + + for (final PutColumn column : putFlowFile.getColumns()) { + put.addColumn( + column.getColumnFamily().getBytes(StandardCharsets.UTF_8), + column.getColumnQualifier().getBytes(StandardCharsets.UTF_8), + column.getBuffer()); + } } table.put(new ArrayList<>(rowPuts.values())); @@ -205,6 +210,20 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme } @Override + public void put(final String tableName, final String rowId, final Collection<PutColumn> columns) throws IOException { + try (final Table table = connection.getTable(TableName.valueOf(tableName))) { + Put put = new Put(rowId.getBytes(StandardCharsets.UTF_8)); + for (final PutColumn column : columns) { + put.addColumn( + column.getColumnFamily().getBytes(StandardCharsets.UTF_8), + column.getColumnQualifier().getBytes(StandardCharsets.UTF_8), + column.getBuffer()); + } + table.put(put); + } + } + + @Override public void scan(final String tableName, final Collection<Column> columns, final String filterExpression, final long minTime, final ResultHandler handler) throws IOException {
