Repository: nifi
Updated Branches:
  refs/heads/master 28067a29f -> 143d7e682


NIFI-3538 Added DeleteHBaseRow

This closes #2294.

Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/143d7e68
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/143d7e68
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/143d7e68

Branch: refs/heads/master
Commit: 143d7e6829c7ac5967dceb0df4a9f7f4456720eb
Parents: 28067a2
Author: Mike Thomsen <mikerthom...@gmail.com>
Authored: Tue Feb 13 13:12:00 2018 -0500
Committer: Koji Kawamura <ijokaruma...@apache.org>
Committed: Wed Feb 14 10:04:25 2018 +0900

----------------------------------------------------------------------
 .../apache/nifi/hbase/AbstractDeleteHBase.java  | 103 +++++++++
 .../org/apache/nifi/hbase/DeleteHBaseRow.java   | 215 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../nifi/hbase/MockHBaseClientService.java      |  44 +++-
 .../apache/nifi/hbase/TestDeleteHBaseRow.java   | 197 +++++++++++++++++
 .../apache/nifi/hbase/HBaseClientService.java   |  10 +
 .../nifi/hbase/HBase_1_1_2_ClientService.java   |  11 +
 7 files changed, 580 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/143d7e68/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractDeleteHBase.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractDeleteHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractDeleteHBase.java
new file mode 100644
index 0000000..a097fbe
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractDeleteHBase.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class AbstractDeleteHBase extends AbstractProcessor {
+    protected static final PropertyDescriptor HBASE_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("HBase Client Service")
+            .description("Specifies the Controller Service to use for 
accessing HBase.")
+            .required(true)
+            .identifiesControllerService(HBaseClientService.class)
+            .build();
+    protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("Table Name")
+            .description("The name of the HBase Table.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    protected static final PropertyDescriptor ROW_ID = new 
PropertyDescriptor.Builder()
+            .name("Row Identifier")
+            .description("Specifies the Row ID to use when deleting data into 
HBase")
+            .required(false) // not all sub-classes will require this
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is routed to this relationship after it 
has been successfully stored in HBase")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is routed to this relationship if it 
cannot be sent to HBase")
+            .build();
+
+    protected HBaseClientService clientService;
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        clientService = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        Set<Relationship> set = new HashSet<>();
+        set.add(REL_SUCCESS);
+        set.add(REL_FAILURE);
+
+        return set;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HBASE_CLIENT_SERVICE);
+        properties.add(TABLE_NAME);
+        properties.add(ROW_ID);
+
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        try {
+            doDelete(context, session);
+        } catch (Exception e) {
+            getLogger().error("Error", e);
+        } finally {
+            session.commit();
+        }
+    }
+
+    protected abstract void doDelete(ProcessContext context, ProcessSession 
session) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/143d7e68/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java
new file mode 100644
index 0000000..8aec55a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/DeleteHBaseRow.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+@WritesAttributes(
+    value = {
+        @WritesAttribute( attribute = "restart.index", description = "If a 
delete batch fails, 'restart.index' attribute is added to the FlowFile and sent 
to 'failure' " +
+                "relationship, so that this processor can retry from there 
when the same FlowFile is routed again." ),
+        @WritesAttribute( attribute = "rowkey.start", description = "The first 
rowkey in the flowfile. Only written when using the flowfile's content for the 
row IDs."),
+        @WritesAttribute( attribute = "rowkey.end", description = "The last 
rowkey in the flowfile. Only written when using the flowfile's content for the 
row IDs.")
+    }
+)
+@Tags({ "delete", "hbase" })
+@CapabilityDescription(
+        "Delete HBase records individually or in batches. The input can be a 
single row ID in the flowfile content, one ID per line, " +
+        "row IDs separated by a configurable separator character (default is a 
comma). ")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class DeleteHBaseRow extends AbstractDeleteHBase {
+    static final AllowableValue ROW_ID_CONTENT = new AllowableValue("content", 
"FlowFile content", "Get the row key(s) from the flowfile content.");
+    static final AllowableValue ROW_ID_ATTR = new AllowableValue("attr", 
"FlowFile attributes", "Get the row key from an expression language 
statement.");
+
+    static final String RESTART_INDEX = "restart.index";
+    static final String ROWKEY_START = "rowkey.start";
+    static final String ROWKEY_END   = "rowkey.end";
+
+    static final PropertyDescriptor ROW_ID_LOCATION = new 
PropertyDescriptor.Builder()
+            .name("delete-hb-row-id-location")
+            .displayName("Row ID Location")
+            .description("The location of the row ID to use for building the 
delete. Can be from the content or an expression language statement.")
+            .required(true)
+            .defaultValue(ROW_ID_CONTENT.getValue())
+            .allowableValues(ROW_ID_CONTENT, ROW_ID_ATTR)
+            .addValidator(Validator.VALID)
+            .build();
+
+    static final PropertyDescriptor FLOWFILE_FETCH_COUNT = new 
PropertyDescriptor.Builder()
+            .name("delete-hb-flowfile-fetch-count")
+            .displayName("Flowfile Fetch Count")
+            .description("The number of flowfiles to fetch per run.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("5")
+            .expressionLanguageSupported(false)
+            .build();
+
+    static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
+            .name("delete-hb-batch-size")
+            .displayName("Batch Size")
+            .description("The number of deletes to send per batch.")
+            .required(true)
+            .defaultValue("50")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+
+    static final PropertyDescriptor KEY_SEPARATOR = new 
PropertyDescriptor.Builder()
+            .name("delete-hb-separator")
+            .displayName("Delete Row Key Separator")
+            .description("The separator character(s) that separate multiple 
row keys " +
+                    "when multiple row keys are provided in the flowfile 
content")
+            .required(true)
+            .defaultValue(",")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("delete-char-set")
+            .displayName("Character Set")
+            .description("The character set used to encode the row key for 
HBase.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = 
super.getSupportedPropertyDescriptors();
+        properties.add(ROW_ID_LOCATION);
+        properties.add(FLOWFILE_FETCH_COUNT);
+        properties.add(BATCH_SIZE);
+        properties.add(KEY_SEPARATOR);
+        properties.add(CHARSET);
+
+        return properties;
+    }
+
+    @Override
+    protected void doDelete(ProcessContext context, ProcessSession session) 
throws Exception {
+        final int batchSize      = context.getProperty(BATCH_SIZE).asInteger();
+        final String location    = 
context.getProperty(ROW_ID_LOCATION).getValue();
+        final int flowFileCount  = 
context.getProperty(FLOWFILE_FETCH_COUNT).asInteger();
+        final String charset     = context.getProperty(CHARSET).getValue();
+        List<FlowFile> flowFiles = session.get(flowFileCount);
+
+        if (flowFiles != null && flowFiles.size() > 0) {
+            for (int index = 0; index < flowFiles.size(); index++) {
+                FlowFile flowFile = flowFiles.get(index);
+                final String tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+                try {
+                    if (location.equals(ROW_ID_CONTENT.getValue())) {
+                        flowFile = doDeleteFromContent(flowFile, context, 
session, tableName, batchSize, charset);
+                        if (flowFile.getAttribute(RESTART_INDEX) != null) {
+                            session.transfer(flowFile, REL_FAILURE);
+                        } else {
+                            final String transitUrl = 
clientService.toTransitUri(tableName, flowFile.getAttribute(ROWKEY_END));
+                            session.transfer(flowFile, REL_SUCCESS);
+                            
session.getProvenanceReporter().invokeRemoteProcess(flowFile, transitUrl);
+                        }
+                    } else {
+                        String transitUrl = doDeleteFromAttribute(flowFile, 
context, tableName, charset);
+                        session.transfer(flowFile, REL_SUCCESS);
+                        
session.getProvenanceReporter().invokeRemoteProcess(flowFile, transitUrl);
+                    }
+                } catch (Exception ex) {
+                    getLogger().error(ex.getMessage(), ex);
+                    session.transfer(flowFile, REL_FAILURE);
+                }
+            }
+        }
+    }
+
+    private String doDeleteFromAttribute(FlowFile flowFile, ProcessContext 
context, String tableName, String charset) throws Exception {
+        String rowKey = 
context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
+        clientService.delete(tableName, rowKey.getBytes(charset));
+
+        return clientService.toTransitUri(tableName, rowKey);
+    }
+
+    private FlowFile doDeleteFromContent(FlowFile flowFile, ProcessContext 
context, ProcessSession session, String tableName, int batchSize, String 
charset) throws Exception {
+        String keySeparator = 
context.getProperty(KEY_SEPARATOR).evaluateAttributeExpressions(flowFile).getValue();
+        final String restartIndex = flowFile.getAttribute(RESTART_INDEX);
+
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        session.exportTo(flowFile, out);
+        out.close();
+
+        String data = new String(out.toByteArray(), charset);
+
+        int restartFrom = -1;
+        if (restartIndex != null) {
+            restartFrom = Integer.parseInt(restartIndex);
+        }
+
+        String first = null, last = null;
+
+        List<byte[]> batch = new ArrayList<>();
+        if (data != null && data.length() > 0) {
+            String[] parts = data.split(keySeparator);
+            int index = 0;
+            try {
+                for (index = 0; index < parts.length; index++) {
+                    if (restartFrom > 0 && index < restartFrom) {
+                        continue;
+                    }
+
+                    if (first == null) {
+                        first = parts[index];
+                    }
+
+                    batch.add(parts[index].getBytes(charset));
+                    if (batch.size() == batchSize) {
+                        clientService.delete(tableName, batch);
+                        batch = new ArrayList<>();
+                    }
+                    last = parts[index];
+                }
+                if (batch.size() > 0) {
+                    clientService.delete(tableName, batch);
+                }
+
+                flowFile = session.removeAttribute(flowFile, RESTART_INDEX);
+                flowFile = session.putAttribute(flowFile, ROWKEY_START, first);
+                flowFile = session.putAttribute(flowFile, ROWKEY_END, last);
+            } catch (Exception ex) {
+                getLogger().error("Error sending delete batch", ex);
+                int restartPoint = index - batch.size() > 0 ? index - 
batch.size() : 0;
+                flowFile = session.putAttribute(flowFile, RESTART_INDEX, 
String.valueOf(restartPoint));
+            }
+        }
+
+        return flowFile;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/143d7e68/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 21c827c..b2cccc8 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+org.apache.nifi.hbase.DeleteHBaseRow
 org.apache.nifi.hbase.GetHBase
 org.apache.nifi.hbase.PutHBaseCell
 org.apache.nifi.hbase.PutHBaseJSON

http://git-wip-us.apache.org/repos/asf/nifi/blob/143d7e68/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index f62102a..d720344 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -24,7 +24,6 @@ import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.scan.ResultCell;
 import org.apache.nifi.hbase.scan.ResultHandler;
 
-
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -39,6 +38,7 @@ public class MockHBaseClientService extends 
AbstractControllerService implements
     private Map<String,ResultCell[]> results = new HashMap<>();
     private Map<String, List<PutFlowFile>> flowFilePuts = new HashMap<>();
     private boolean throwException = false;
+    private boolean throwExceptionDuringBatchDelete = false;
     private int numScans = 0;
     private int numPuts  = 0;
     @Override
@@ -71,6 +71,40 @@ public class MockHBaseClientService extends 
AbstractControllerService implements
         throw new UnsupportedOperationException();
     }
 
+    private int deletePoint = 0;
+    public void setDeletePoint(int deletePoint) {
+        this.deletePoint = deletePoint;
+    }
+
+    @Override
+    public void delete(String tableName, List<byte[]> rowIds) throws 
IOException {
+        if (throwException) {
+            throw new RuntimeException("Simulated connectivity error");
+        }
+
+        int index = 0;
+        for (byte[] id : rowIds) {
+            String key = new String(id);
+            Object val = results.remove(key);
+            if (index == deletePoint && throwExceptionDuringBatchDelete) {
+                throw new RuntimeException("Forcing write of restart.index");
+            }
+            if (val == null && deletePoint >= 0) {
+                throw new RuntimeException(String.format("%s was never 
added.", key));
+            }
+
+            index++;
+        }
+    }
+
+    public int size() {
+        return results.size();
+    }
+
+    public boolean isEmpty() {
+        return results.isEmpty();
+    }
+
     @Override
     public void scan(String tableName, byte[] startRow, byte[] endRow, 
Collection<Column> columns, ResultHandler handler) throws IOException {
         if (throwException) {
@@ -216,4 +250,12 @@ public class MockHBaseClientService extends 
AbstractControllerService implements
     public void setFailureThreshold(int failureThreshold) {
         this.failureThreshold = failureThreshold;
     }
+
+    public boolean isThrowExceptionDuringBatchDelete() {
+        return throwExceptionDuringBatchDelete;
+    }
+
+    public void setThrowExceptionDuringBatchDelete(boolean 
throwExceptionDuringBatchDelete) {
+        this.throwExceptionDuringBatchDelete = throwExceptionDuringBatchDelete;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/143d7e68/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
new file mode 100644
index 0000000..6c0d92b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class TestDeleteHBaseRow {
+    private TestRunner runner;
+    private MockHBaseClientService hBaseClient;
+
+    @Before
+    public void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(new DeleteHBaseRow());
+
+        hBaseClient = new MockHBaseClientService();
+        runner.addControllerService("hbaseClient", hBaseClient);
+        runner.enableControllerService(hBaseClient);
+
+        runner.setProperty(DeleteHBaseRow.TABLE_NAME, "nifi");
+        runner.setProperty(DeleteHBaseRow.HBASE_CLIENT_SERVICE, "hbaseClient");
+    }
+
+    List<String> populateTable(int max) {
+        List<String> ids = new ArrayList<>();
+        for (int index = 0; index < max; index++) {
+            String uuid = UUID.randomUUID().toString();
+            ids.add(uuid);
+            Map<String, String> cells = new HashMap<>();
+            cells.put("test", UUID.randomUUID().toString());
+            hBaseClient.addResult(uuid, cells, System.currentTimeMillis());
+        }
+
+        return ids;
+    }
+
+    @Test
+    public void testSimpleDelete() {
+        List<String> ids = populateTable(100);
+
+        runner.setProperty(DeleteHBaseRow.BATCH_SIZE, "100");
+        runner.setProperty(DeleteHBaseRow.FLOWFILE_FETCH_COUNT, "100");
+        for (String id : ids) {
+            runner.enqueue(id);
+        }
+
+        runner.run(1, true);
+        Assert.assertTrue("The mock client was not empty.", 
hBaseClient.isEmpty());
+    }
+
+    private String buildSeparatedString(List<String> ids, String separator) {
+        StringBuilder sb = new StringBuilder();
+        for (int index = 1; index <= ids.size(); index++) {
+            sb.append(ids.get(index - 1)).append(separator);
+        }
+
+        return sb.toString();
+    }
+
+    private void testSeparatedDeletes(String separator) {
+        testSeparatedDeletes(separator, separator, new HashMap());
+    }
+
+    private void testSeparatedDeletes(String separator, String separatorProp, 
Map attrs) {
+        List<String> ids = populateTable(10000);
+        runner.setProperty(DeleteHBaseRow.KEY_SEPARATOR, separator);
+        runner.setProperty(DeleteHBaseRow.BATCH_SIZE, "100");
+        runner.enqueue(buildSeparatedString(ids, separatorProp), attrs);
+        runner.run(1, true);
+
+        Assert.assertTrue("The mock client was not empty.", 
hBaseClient.isEmpty());
+    }
+
+    @Test
+    public void testDeletesSeparatedByNewLines() {
+        testSeparatedDeletes("\n");
+    }
+
+    @Test
+    public void testDeletesSeparatedByCommas() {
+        testSeparatedDeletes(",");
+    }
+
+    @Test
+    public void testDeleteWithELSeparator() {
+        runner.setValidateExpressionUsage(true);
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put("test.separator", "____");
+        testSeparatedDeletes("${test.separator}", "____", attrs);
+    }
+
+    @Test
+    public void testDeleteWithExpressionLanguage() {
+        List<String> ids = populateTable(1000);
+        for (String id : ids) {
+            String[] parts = id.split("-");
+            Map<String, String> attrs = new HashMap<>();
+            for (int index = 0; index < parts.length; index++) {
+                attrs.put(String.format("part_%d", index), parts[index]);
+            }
+            runner.enqueue(id, attrs);
+        }
+        runner.setProperty(DeleteHBaseRow.ROW_ID, 
"${part_0}-${part_1}-${part_2}-${part_3}-${part_4}");
+        runner.setProperty(DeleteHBaseRow.ROW_ID_LOCATION, 
DeleteHBaseRow.ROW_ID_ATTR);
+        runner.setProperty(DeleteHBaseRow.BATCH_SIZE, "200");
+        runner.setValidateExpressionUsage(true);
+        runner.run(1, true);
+    }
+
+    @Test
+    public void testConnectivityErrorHandling() {
+        List<String> ids = populateTable(100);
+        for (String id : ids) {
+            runner.enqueue(id);
+        }
+        boolean exception = false;
+        try {
+            hBaseClient.setThrowException(true);
+            runner.run(1, true);
+        } catch (Exception ex) {
+            exception = true;
+        } finally {
+            hBaseClient.setThrowException(false);
+        }
+
+        Assert.assertFalse("An unhandled exception was caught.", exception);
+    }
+
+    @Test
+    public void testRestartIndexAttribute() {
+        List<String> ids = populateTable(500);
+        StringBuilder sb = new StringBuilder();
+        for (int index = 0; index < ids.size(); index++) {
+            sb.append(ids.get(index)).append( index < ids.size() - 1 ? "," : 
"");
+        }
+        runner.enqueue(sb.toString());
+        runner.setProperty(DeleteHBaseRow.ROW_ID_LOCATION, 
DeleteHBaseRow.ROW_ID_CONTENT);
+
+        Assert.assertTrue("There should have been 500 rows.", 
hBaseClient.size() == 500);
+
+        hBaseClient.setDeletePoint(20);
+        hBaseClient.setThrowExceptionDuringBatchDelete(true);
+        runner.run(1, true, true);
+
+        runner.assertTransferCount(DeleteHBaseRow.REL_FAILURE, 1);
+        runner.assertTransferCount(DeleteHBaseRow.REL_SUCCESS, 0);
+
+        Assert.assertTrue("Partially deleted", hBaseClient.size() < 500);
+
+        List<MockFlowFile> flowFile = 
runner.getFlowFilesForRelationship(DeleteHBaseRow.REL_FAILURE);
+        Assert.assertNotNull("Missing restart.index attribute", 
flowFile.get(0).getAttribute("restart.index"));
+
+        byte[] oldData = runner.getContentAsByteArray(flowFile.get(0));
+        Map<String, String> attrs = new HashMap<>();
+        attrs.put("restart.index", 
flowFile.get(0).getAttribute("restart.index"));
+        runner.enqueue(oldData, attrs);
+        hBaseClient.setDeletePoint(-1);
+        hBaseClient.setThrowExceptionDuringBatchDelete(false);
+        runner.clearTransferState();
+        runner.run(1, true, true);
+
+        runner.assertTransferCount(DeleteHBaseRow.REL_FAILURE, 0);
+        runner.assertTransferCount(DeleteHBaseRow.REL_SUCCESS, 1);
+
+        flowFile = 
runner.getFlowFilesForRelationship(DeleteHBaseRow.REL_SUCCESS);
+
+        Assert.assertTrue("The client should have been empty", 
hBaseClient.isEmpty());
+        Assert.assertNull("The restart.index attribute should be null", 
flowFile.get(0).getAttribute("restart.index"));
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/143d7e68/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
index 0c2a131..10d17ab 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java
@@ -29,6 +29,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 
 @Tags({"hbase", "client"})
 @CapabilityDescription("A controller service for accessing an HBase client.")
@@ -117,6 +118,15 @@ public interface HBaseClientService extends 
ControllerService {
     void delete(String tableName, byte[] rowId) throws IOException;
 
     /**
+     * Deletes a list of rows in HBase. All cells are deleted.
+     *
+     * @param tableName the name of an HBase table
+     * @param rowIds a list of rowIds to send in a batch delete
+     */
+
+    void delete(String tableName, List<byte[]> rowIds) throws IOException;
+
+    /**
      * Scans the given table using the optional filter criteria and passing 
each result to the provided handler.
      *
      * @param tableName the name of an HBase table to scan

http://git-wip-us.apache.org/repos/asf/nifi/blob/143d7e68/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index cc6927b..07a3cf2 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -356,6 +356,17 @@ public class HBase_1_1_2_ClientService extends 
AbstractControllerService impleme
     }
 
     @Override
+    public void delete(String tableName, List<byte[]> rowIds) throws 
IOException {
+        List<Delete> deletes = new ArrayList<>();
+        for (int index = 0; index < rowIds.size(); index++) {
+            deletes.add(new Delete(rowIds.get(index)));
+        }
+        try (final Table table = 
connection.getTable(TableName.valueOf(tableName))) {
+            table.delete(deletes);
+        }
+    }
+
+    @Override
     public void scan(final String tableName, final Collection<Column> columns, 
final String filterExpression, final long minTime, final ResultHandler handler)
             throws IOException {
 

Reply via email to