Github user bdesert commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2478#discussion_r169680760
--- Diff:
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java
---
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestScanHBase {
+
+ private ScanHBase proc;
+ private MockHBaseClientService hBaseClientService;
+ private TestRunner runner;
+
+ @Before
+ public void setup() throws InitializationException {
+ proc = new ScanHBase();
+ runner = TestRunners.newTestRunner(proc);
+
+ hBaseClientService = new MockHBaseClientService();
+ runner.addControllerService("hbaseClient", hBaseClientService);
+ runner.enableControllerService(hBaseClientService);
+ runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
+ }
+
+ @Test
+ public void testColumnsValidation() {
+ runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+ runner.setProperty(ScanHBase.START_ROW, "row1");
+ runner.setProperty(ScanHBase.END_ROW, "row1");
+ runner.assertValid();
+
+ runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1");
+ runner.assertValid();
+
+ runner.setProperty(ScanHBase.COLUMNS, "cf1");
+ runner.assertValid();
+
+ runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3");
+ runner.assertValid();
+
+ runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3");
+ runner.assertValid();
+
+ runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3");
+ runner.assertNotValid();
+
+ runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3");
+ runner.assertNotValid();
+
+ runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testNoIncomingFlowFile() {
+ runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+ runner.setProperty(ScanHBase.START_ROW, "row1");
+ runner.setProperty(ScanHBase.END_ROW, "row1");
+
+ runner.run();
+ runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+ runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+ runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
+
+ Assert.assertEquals(0, hBaseClientService.getNumScans());
+ }
+
+ @Test
+ public void testInvalidTableName() {
+ runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}");
+ runner.setProperty(ScanHBase.START_ROW, "row1");
+ runner.setProperty(ScanHBase.END_ROW, "row1");
+
+ runner.enqueue("trigger flow file");
+ runner.run();
+
+ runner.assertTransferCount(ScanHBase.REL_FAILURE, 1);
+ runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+ runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
+
+ Assert.assertEquals(0, hBaseClientService.getNumScans());
+ }
+
+ @Test
+ public void testResultsNotFound() {
+ runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+ runner.setProperty(ScanHBase.START_ROW, "row1");
+ runner.setProperty(ScanHBase.END_ROW, "row1");
+
+ runner.enqueue("trigger flow file");
+ runner.run();
+
+ runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+ runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+ runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1);
+
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ScanHBase.REL_ORIGINAL).get(0);
+ flowFile.assertAttributeEquals("scanhbase.results.found",
Boolean.FALSE.toString());
+
+ Assert.assertEquals(1, hBaseClientService.getNumScans());
+ }
+
+ @Test
+ public void testScanToContentWithStringValues() {
+ final Map<String, String> cells = new HashMap<>();
+ cells.put("cq1", "val1");
+ cells.put("cq2", "val2");
+
+ final long ts1 = 123456789;
+ hBaseClientService.addResult("row1", cells, ts1);
+ hBaseClientService.addResult("row2", cells, ts1);
+
+ runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+ runner.setProperty(ScanHBase.START_ROW, "row1");
+ runner.setProperty(ScanHBase.END_ROW, "row2");
+ runner.setProperty(ScanHBase.TIME_RANGE_MIN, "0");
+ runner.setProperty(ScanHBase.TIME_RANGE_MAX, "1111111110");
+ runner.setProperty(ScanHBase.LIMIT_ROWS, "10");
+ runner.setProperty(ScanHBase.REVERSED_SCAN, "false");
+ runner.setProperty(ScanHBase.BULK_SIZE, "10");
+
+ runner.enqueue("trigger flow file");
+ runner.run();
+
+ runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+ runner.assertTransferCount(ScanHBase.REL_SUCCESS, 1);
+ runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 1);
+
+ MockFlowFile flowFile =
runner.getFlowFilesForRelationship(ScanHBase.REL_SUCCESS).get(0);
+ flowFile.assertContentEquals("{\"row\":\"row1\", \"cells\": [" +
--- End diff --
The formatting and structure of JSON representation is coming from NIFI
implementation, not HBase. It is defined in:
org.apache.nifi.hbase.io.JsonFullRowSerializer;
org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
Also, I tried to keep it consistent with TestFetchHBaseRow.
---