Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2239#discussion_r148626386
--- Diff:
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestHiveParser.java
---
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hive;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.junit.Test;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestHiveParser extends AbstractHiveQLProcessor {
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSessionFactory
sessionFactory) throws ProcessException {
+
+ }
+
+ @Test
+ public void parseSelect() throws Exception {
+ String query = "select a.empid, to_something(b.saraly) from " +
+ "company.emp a inner join default.salary b where a.empid =
b.empid";
+ final Set<TableName> tableNames = findTableNames(query);
+ System.out.printf("tableNames=%s\n", tableNames);
+ assertEquals(2, tableNames.size());
+ assertTrue(tableNames.contains(new TableName("company", "emp")));
+ assertTrue(tableNames.contains(new TableName("default",
"salary")));
+ for (TableName tableName : tableNames) {
+ assertTrue(tableName.isInput());
+ }
+ }
+
+ @Test
+ public void parseSelectPrepared() throws Exception {
+ String query = "select empid from company.emp a where a.firstName
= ?";
+ final Set<TableName> tableNames = findTableNames(query);
+ System.out.printf("tableNames=%s\n", tableNames);
+ assertEquals(1, tableNames.size());
+ assertTrue(tableNames.contains(new TableName("company", "emp")));
+ for (TableName tableName : tableNames) {
+ assertTrue(tableName.isInput());
+ }
+ }
+
+
+ @Test
+ public void parseLongSelect() throws Exception {
+ String query = "select\n" +
+ "\n" +
+ " i_item_id,\n" +
+ "\n" +
+ " i_item_desc,\n" +
+ "\n" +
+ " s_state,\n" +
+ "\n" +
+ " count(ss_quantity) as store_sales_quantitycount,\n" +
+ "\n" +
+ " avg(ss_quantity) as store_sales_quantityave,\n" +
+ "\n" +
+ " stddev_samp(ss_quantity) as
store_sales_quantitystdev,\n" +
+ "\n" +
+ " stddev_samp(ss_quantity) / avg(ss_quantity) as
store_sales_quantitycov,\n" +
+ "\n" +
+ " count(sr_return_quantity) as
store_returns_quantitycount,\n" +
+ "\n" +
+ " avg(sr_return_quantity) as
store_returns_quantityave,\n" +
+ "\n" +
+ " stddev_samp(sr_return_quantity) as
store_returns_quantitystdev,\n" +
+ "\n" +
+ " stddev_samp(sr_return_quantity) /
avg(sr_return_quantity) as store_returns_quantitycov,\n" +
+ "\n" +
+ " count(cs_quantity) as catalog_sales_quantitycount,\n"
+
+ "\n" +
+ " avg(cs_quantity) as catalog_sales_quantityave,\n" +
+ "\n" +
+ " stddev_samp(cs_quantity) / avg(cs_quantity) as
catalog_sales_quantitystdev,\n" +
+ "\n" +
+ " stddev_samp(cs_quantity) / avg(cs_quantity) as
catalog_sales_quantitycov\n" +
+ "\n" +
+ "from\n" +
+ "\n" +
+ " store_sales,\n" +
+ "\n" +
+ " store_returns,\n" +
+ "\n" +
+ " catalog_sales,\n" +
+ "\n" +
+ " date_dim d1,\n" +
+ "\n" +
+ " date_dim d2,\n" +
+ "\n" +
+ " date_dim d3,\n" +
+ "\n" +
+ " store,\n" +
+ "\n" +
+ " item\n" +
+ "\n" +
+ "where\n" +
+ "\n" +
+ " d1.d_quarter_name = '2000Q1'\n" +
+ "\n" +
+ " and d1.d_date_sk = ss_sold_date_sk\n" +
+ "\n" +
+ " and i_item_sk = ss_item_sk\n" +
+ "\n" +
+ " and s_store_sk = ss_store_sk\n" +
+ "\n" +
+ " and ss_customer_sk = sr_customer_sk\n" +
+ "\n" +
+ " and ss_item_sk = sr_item_sk\n" +
+ "\n" +
+ " and ss_ticket_number = sr_ticket_number\n" +
+ "\n" +
+ " and sr_returned_date_sk = d2.d_date_sk\n" +
+ "\n" +
+ " and d2.d_quarter_name in ('2000Q1' , '2000Q2',
'2000Q3')\n" +
+ "\n" +
+ " and sr_customer_sk = cs_bill_customer_sk\n" +
+ "\n" +
+ " and sr_item_sk = cs_item_sk\n" +
+ "\n" +
+ " and cs_sold_date_sk = d3.d_date_sk\n" +
+ "\n" +
+ " and d3.d_quarter_name in ('2000Q1' , '2000Q2',
'2000Q3')\n" +
+ "\n" +
+ "group by i_item_id , i_item_desc , s_state\n" +
+ "\n" +
+ "order by i_item_id , i_item_desc , s_state\n" +
+ "\n" +
+ "limit 100";
+
+ final Set<TableName> tableNames = findTableNames(query);
+ System.out.printf("tableNames=%s\n", tableNames);
+ assertEquals(6, tableNames.size());
+ AtomicInteger cnt = new AtomicInteger(0);
+ for (TableName tableName : tableNames) {
+ if (tableName.equals(new TableName(null, "store_sales"))) {
+ assertTrue(tableName.isInput());
+ cnt.incrementAndGet();
+ } else if (tableName.equals(new TableName(null,
"store_returns"))) {
+ assertTrue(tableName.isInput());
+ cnt.incrementAndGet();
+ } else if (tableName.equals(new TableName(null,
"catalog_sales"))) {
+ assertTrue(tableName.isInput());
+ cnt.incrementAndGet();
+ } else if (tableName.equals(new TableName(null, "date_dim"))) {
+ assertTrue(tableName.isInput());
+ cnt.incrementAndGet();
+ } else if (tableName.equals(new TableName(null, "store"))) {
+ assertTrue(tableName.isInput());
+ cnt.incrementAndGet();
+ } else if (tableName.equals(new TableName(null, "item"))) {
+ assertTrue(tableName.isInput());
+ cnt.incrementAndGet();
+ }
+ }
+ assertEquals(6, cnt.get());
+ }
+
+ @Test
+ public void parseInsert() throws Exception {
--- End diff --
These tests pass for me, but when I tried on a real NiFi against a Hive
with two tables (prov_events and t), running a single flow file into PutHiveQL
containing the following:
`insert into t values (11,"eleven");`
`insert into prov_events select * from prov_events where durationmillis =
10;`
`insert into t values (0,"zero");`
I correctly get query.input_tables = "prov_events", but query.output_tables
= "t" when it should be "t,prov_events" right?
---