Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2239#discussion_r148626386
  
    --- Diff: 
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestHiveParser.java
 ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hive;
    +
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSessionFactory;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.junit.Test;
    +
    +import java.util.Set;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
    +
    +public class TestHiveParser extends AbstractHiveQLProcessor {
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
    +
    +    }
    +
    +    @Test
    +    public void parseSelect() throws Exception {
    +        String query = "select a.empid, to_something(b.saraly) from " +
    +                "company.emp a inner join default.salary b where a.empid = 
b.empid";
    +        final Set<TableName> tableNames = findTableNames(query);
    +        System.out.printf("tableNames=%s\n", tableNames);
    +        assertEquals(2, tableNames.size());
    +        assertTrue(tableNames.contains(new TableName("company", "emp")));
    +        assertTrue(tableNames.contains(new TableName("default", 
"salary")));
    +        for (TableName tableName : tableNames) {
    +            assertTrue(tableName.isInput());
    +        }
    +    }
    +
    +    @Test
    +    public void parseSelectPrepared() throws Exception {
    +        String query = "select empid from company.emp a where a.firstName 
= ?";
    +        final Set<TableName> tableNames = findTableNames(query);
    +        System.out.printf("tableNames=%s\n", tableNames);
    +        assertEquals(1, tableNames.size());
    +        assertTrue(tableNames.contains(new TableName("company", "emp")));
    +        for (TableName tableName : tableNames) {
    +            assertTrue(tableName.isInput());
    +        }
    +    }
    +
    +
    +    @Test
    +    public void parseLongSelect() throws Exception {
    +        String query = "select\n" +
    +                "\n" +
    +                "    i_item_id,\n" +
    +                "\n" +
    +                "    i_item_desc,\n" +
    +                "\n" +
    +                "    s_state,\n" +
    +                "\n" +
    +                "    count(ss_quantity) as store_sales_quantitycount,\n" +
    +                "\n" +
    +                "    avg(ss_quantity) as store_sales_quantityave,\n" +
    +                "\n" +
    +                "    stddev_samp(ss_quantity) as 
store_sales_quantitystdev,\n" +
    +                "\n" +
    +                "    stddev_samp(ss_quantity) / avg(ss_quantity) as 
store_sales_quantitycov,\n" +
    +                "\n" +
    +                "    count(sr_return_quantity) as 
store_returns_quantitycount,\n" +
    +                "\n" +
    +                "    avg(sr_return_quantity) as 
store_returns_quantityave,\n" +
    +                "\n" +
    +                "    stddev_samp(sr_return_quantity) as 
store_returns_quantitystdev,\n" +
    +                "\n" +
    +                "    stddev_samp(sr_return_quantity) / 
avg(sr_return_quantity) as store_returns_quantitycov,\n" +
    +                "\n" +
    +                "    count(cs_quantity) as catalog_sales_quantitycount,\n" 
+
    +                "\n" +
    +                "    avg(cs_quantity) as catalog_sales_quantityave,\n" +
    +                "\n" +
    +                "    stddev_samp(cs_quantity) / avg(cs_quantity) as 
catalog_sales_quantitystdev,\n" +
    +                "\n" +
    +                "    stddev_samp(cs_quantity) / avg(cs_quantity) as 
catalog_sales_quantitycov\n" +
    +                "\n" +
    +                "from\n" +
    +                "\n" +
    +                "    store_sales,\n" +
    +                "\n" +
    +                "    store_returns,\n" +
    +                "\n" +
    +                "    catalog_sales,\n" +
    +                "\n" +
    +                "    date_dim d1,\n" +
    +                "\n" +
    +                "    date_dim d2,\n" +
    +                "\n" +
    +                "    date_dim d3,\n" +
    +                "\n" +
    +                "    store,\n" +
    +                "\n" +
    +                "    item\n" +
    +                "\n" +
    +                "where\n" +
    +                "\n" +
    +                "    d1.d_quarter_name = '2000Q1'\n" +
    +                "\n" +
    +                "        and d1.d_date_sk = ss_sold_date_sk\n" +
    +                "\n" +
    +                "        and i_item_sk = ss_item_sk\n" +
    +                "\n" +
    +                "        and s_store_sk = ss_store_sk\n" +
    +                "\n" +
    +                "        and ss_customer_sk = sr_customer_sk\n" +
    +                "\n" +
    +                "        and ss_item_sk = sr_item_sk\n" +
    +                "\n" +
    +                "        and ss_ticket_number = sr_ticket_number\n" +
    +                "\n" +
    +                "        and sr_returned_date_sk = d2.d_date_sk\n" +
    +                "\n" +
    +                "        and d2.d_quarter_name in ('2000Q1' , '2000Q2', 
'2000Q3')\n" +
    +                "\n" +
    +                "        and sr_customer_sk = cs_bill_customer_sk\n" +
    +                "\n" +
    +                "        and sr_item_sk = cs_item_sk\n" +
    +                "\n" +
    +                "        and cs_sold_date_sk = d3.d_date_sk\n" +
    +                "\n" +
    +                "        and d3.d_quarter_name in ('2000Q1' , '2000Q2', 
'2000Q3')\n" +
    +                "\n" +
    +                "group by i_item_id , i_item_desc , s_state\n" +
    +                "\n" +
    +                "order by i_item_id , i_item_desc , s_state\n" +
    +                "\n" +
    +                "limit 100";
    +
    +        final Set<TableName> tableNames = findTableNames(query);
    +        System.out.printf("tableNames=%s\n", tableNames);
    +        assertEquals(6, tableNames.size());
    +        AtomicInteger cnt = new AtomicInteger(0);
    +        for (TableName tableName : tableNames) {
    +            if (tableName.equals(new TableName(null, "store_sales"))) {
    +                assertTrue(tableName.isInput());
    +                cnt.incrementAndGet();
    +            } else if (tableName.equals(new TableName(null, 
"store_returns"))) {
    +                assertTrue(tableName.isInput());
    +                cnt.incrementAndGet();
    +            } else if (tableName.equals(new TableName(null, 
"catalog_sales"))) {
    +                assertTrue(tableName.isInput());
    +                cnt.incrementAndGet();
    +            } else if (tableName.equals(new TableName(null, "date_dim"))) {
    +                assertTrue(tableName.isInput());
    +                cnt.incrementAndGet();
    +            } else if (tableName.equals(new TableName(null, "store"))) {
    +                assertTrue(tableName.isInput());
    +                cnt.incrementAndGet();
    +            } else if (tableName.equals(new TableName(null, "item"))) {
    +                assertTrue(tableName.isInput());
    +                cnt.incrementAndGet();
    +            }
    +        }
    +        assertEquals(6, cnt.get());
    +    }
    +
    +    @Test
    +    public void parseInsert() throws Exception {
    --- End diff --
    
    These tests pass for me, but when I tried on a real NiFi against a Hive 
with two tables (prov_events and t), running a single flow file into PutHiveQL 
containing the following:
    `insert into t values (11,"eleven");`
    `insert into prov_events select * from prov_events where durationmillis = 
10;`
    `insert into t values (0,"zero");`
    
    I correctly get query.input_tables = "prov_events", but query.output_tables 
= "t" when it should be "t,prov_events" right?


---

Reply via email to