Github user MikeThomsen commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2294#discussion_r167940095
--- Diff:
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestDeleteHBaseRow.java
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.hbase;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class TestDeleteHBaseRow {
+ private TestRunner runner;
+ private MockHBaseClientService hBaseClient;
+
+ @Before
+ public void setup() throws InitializationException {
+ runner = TestRunners.newTestRunner(new DeleteHBaseRow());
+
+ hBaseClient = new MockHBaseClientService();
+ runner.addControllerService("hbaseClient", hBaseClient);
+ runner.enableControllerService(hBaseClient);
+
+ runner.setProperty(DeleteHBaseRow.TABLE_NAME, "nifi");
+ runner.setProperty(DeleteHBaseRow.HBASE_CLIENT_SERVICE,
"hbaseClient");
+ }
+
+ List<String> populateTable(int max) {
+ List<String> ids = new ArrayList<>();
+ for (int index = 0; index < max; index++) {
+ String uuid = UUID.randomUUID().toString();
+ ids.add(uuid);
+ Map<String, String> cells = new HashMap<>();
+ cells.put("test", UUID.randomUUID().toString());
+ hBaseClient.addResult(uuid, cells, System.currentTimeMillis());
+ }
+
+ return ids;
+ }
+
+ @Test
+ public void testSimpleDelete() {
+ List<String> ids = populateTable(100);
+
+ runner.setProperty(DeleteHBaseRow.BATCH_SIZE, "100");
+ runner.setProperty(DeleteHBaseRow.FLOWFILE_FETCH_COUNT, "100");
+ for (String id : ids) {
+ runner.enqueue(id);
+ }
+
+ runner.run(1, true);
+ Assert.assertTrue("The mock client was not empty.",
hBaseClient.isEmpty());
+ }
+
+ private String buildSeparatedString(List<String> ids, String
separator) {
+ StringBuilder sb = new StringBuilder();
+ for (int index = 1; index <= ids.size(); index++) {
+ sb.append(ids.get(index - 1)).append(separator);
+ }
+
+ return sb.toString();
+ }
+
+ private void testSeparatedDeletes(String separator) {
+ testSeparatedDeletes(separator, separator, new HashMap());
+ }
+
+ private void testSeparatedDeletes(String separator, String
separatorProp, Map attrs) {
+ List<String> ids = populateTable(10000);
+ runner.setProperty(DeleteHBaseRow.KEY_SEPARATOR, separator);
+ runner.setProperty(DeleteHBaseRow.BATCH_SIZE, "100");
+ runner.enqueue(buildSeparatedString(ids, separatorProp), attrs);
+ runner.run(1, true);
+
+ Assert.assertTrue("The mock client was not empty.",
hBaseClient.isEmpty());
+ }
+
+ @Test
+ public void testDeletesSeparatedByNewLines() {
+ testSeparatedDeletes("\n");
+ }
+
+ @Test
+ public void testDeletesSeparatedByCommas() {
+ testSeparatedDeletes(",");
+ }
+
+ @Test
+ public void testDeleteWithELSeparator() {
+ runner.setValidateExpressionUsage(true);
+ Map<String, String> attrs = new HashMap<>();
+ attrs.put("test.separator", "____");
+ testSeparatedDeletes("${test.separator}", "____", attrs);
+ }
+
+ @Test
+ public void testDeleteWithExpressionLanguage() {
+ List<String> ids = populateTable(1000);
+ for (String id : ids) {
+ String[] parts = id.split("-");
+ Map<String, String> attrs = new HashMap<>();
+ for (int index = 0; index < parts.length; index++) {
+ attrs.put(String.format("part_%d", index), parts[index]);
+ }
+ runner.enqueue(id, attrs);
+ }
+ runner.setProperty(DeleteHBaseRow.ROW_ID,
"${part_0}-${part_1}-${part_2}-${part_3}-${part_4}");
+ runner.setProperty(DeleteHBaseRow.ROW_ID_LOCATION,
DeleteHBaseRow.ROW_ID_ATTR);
+ runner.setProperty(DeleteHBaseRow.BATCH_SIZE, "200");
+ runner.setValidateExpressionUsage(true);
+ runner.run(1, true);
+ }
+
+ @Test
+ public void testConnectivityErrorHandling() {
+ List<String> ids = populateTable(100);
+ for (String id : ids) {
+ runner.enqueue(id);
+ }
+ boolean exception = false;
+ try {
+ hBaseClient.setThrowException(true);
+ runner.run(1, true);
+ } catch (Exception ex) {
+ exception = true;
+ } finally {
+ hBaseClient.setThrowException(false);
+ }
+
+ Assert.assertFalse("An unhandled exception was caught.",
exception);
+ }
+
+ @Test
+ public void testRestartIndexAttribute() {
+ List<String> ids = populateTable(500);
+ StringBuilder sb = new StringBuilder();
+ for (int index = 0; index < ids.size(); index++) {
+ sb.append(ids.get(index)).append( index < ids.size() - 1 ? ","
: "");
+ }
+ runner.enqueue(sb.toString());
+ runner.setProperty(DeleteHBaseRow.ROW_ID_LOCATION,
DeleteHBaseRow.ROW_ID_BODY);
+
+ Assert.assertTrue("There should have been 500 rows.",
hBaseClient.size() == 500);
+
+ hBaseClient.setDeletePoint(20);
+ hBaseClient.setThrowExceptionDuringBatchDelete(true);
+ runner.run(1, true, true);
+
+ runner.assertTransferCount(DeleteHBaseRow.REL_FAILURE, 1);
+ runner.assertTransferCount(DeleteHBaseRow.REL_SUCCESS, 0);
+
+ Assert.assertTrue("Nothing was deleted", hBaseClient.size() < 500);
--- End diff --
Done
---