[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15961298#comment-15961298
 ] 

ASF GitHub Bot commented on NIFI-1280:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1652#discussion_r110460794
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
 ---
    @@ -0,0 +1,392 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.standard;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.nifi.controller.AbstractControllerService;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.reporting.InitializationException;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.RecordReader;
    +import org.apache.nifi.serialization.RecordSetWriter;
    +import org.apache.nifi.serialization.RecordSetWriterFactory;
    +import org.apache.nifi.serialization.RowRecordReaderFactory;
    +import org.apache.nifi.serialization.SimpleRecordSchema;
    +import org.apache.nifi.serialization.WriteResult;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordField;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.RecordSet;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +public class TestQueryFlowFile {
    +
    +    static {
    +        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", 
"info");
    +        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
    +        System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", 
"debug");
    +        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard.SQLTransform",
 "debug");
    +    }
    +
    +    private static final String REL_NAME = "success";
    +
    +    @Test
    +    public void testSimple() throws InitializationException, IOException, 
SQLException {
    +        final MockRecordParser parser = new MockRecordParser();
    +        parser.addSchemaField("name", RecordFieldType.STRING);
    +        parser.addSchemaField("age", RecordFieldType.INT);
    +        parser.addRecord("Tom", 49);
    +
    +        final MockRecordWriter writer = new 
MockRecordWriter("\"name\",\"points\"");
    +
    +        final TestRunner runner = 
TestRunners.newTestRunner(QueryFlowFile.class);
    +        runner.addControllerService("parser", parser);
    +        runner.enableControllerService(parser);
    +        runner.addControllerService("writer", writer);
    +        runner.enableControllerService(writer);
    +
    +        runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE 
name <> ''");
    +        runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
    +        runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
    +
    +        final int numIterations = 1;
    +        for (int i = 0; i < numIterations; i++) {
    +            runner.enqueue(new byte[0]);
    +        }
    +
    +        runner.setThreadCount(4);
    +        runner.run(2 * numIterations);
    +
    +        runner.assertTransferCount(REL_NAME, 1);
    +        final MockFlowFile out = 
runner.getFlowFilesForRelationship(REL_NAME).get(0);
    +        System.out.println(new String(out.toByteArray()));
    +        out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
    +    }
    +
    +    @Test
    +    public void testParseFailure() throws InitializationException, 
IOException, SQLException {
    +        final MockRecordParser parser = new MockRecordParser();
    +        parser.addSchemaField("name", RecordFieldType.STRING);
    +        parser.addSchemaField("age", RecordFieldType.INT);
    +        parser.addRecord("Tom", 49);
    +
    +        final MockRecordWriter writer = new 
MockRecordWriter("\"name\",\"points\"");
    +
    +        final TestRunner runner = 
TestRunners.newTestRunner(QueryFlowFile.class);
    +        runner.addControllerService("parser", parser);
    +        runner.enableControllerService(parser);
    +        runner.addControllerService("writer", writer);
    +        runner.enableControllerService(writer);
    +
    +        runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE 
name <> ''");
    +        runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
    +        runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
    +
    +        final int numIterations = 1;
    +        for (int i = 0; i < numIterations; i++) {
    +            runner.enqueue(new byte[0]);
    +        }
    +
    +        runner.setThreadCount(4);
    +        runner.run(2 * numIterations);
    +
    +        runner.assertTransferCount(REL_NAME, 1);
    +        final MockFlowFile out = 
runner.getFlowFilesForRelationship(REL_NAME).get(0);
    +        System.out.println(new String(out.toByteArray()));
    +        out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
    +    }
    +
    +
    +    @Test
    +    public void testTransformCalc() throws InitializationException, 
IOException, SQLException {
    +        final MockRecordParser parser = new MockRecordParser();
    +        parser.addSchemaField("ID", RecordFieldType.INT);
    +        parser.addSchemaField("AMOUNT1", RecordFieldType.FLOAT);
    +        parser.addSchemaField("AMOUNT2", RecordFieldType.FLOAT);
    +        parser.addSchemaField("AMOUNT3", RecordFieldType.FLOAT);
    +
    +        parser.addRecord("008", 10.05F, 15.45F, 89.99F);
    +        parser.addRecord("100", 20.25F, 25.25F, 45.25F);
    +        parser.addRecord("105", 20.05F, 25.05F, 45.05F);
    +        parser.addRecord("200", 34.05F, 25.05F, 75.05F);
    +
    +        final MockRecordWriter writer = new 
MockRecordWriter("\"NAME\",\"POINTS\"");
    +
    +        final TestRunner runner = 
TestRunners.newTestRunner(QueryFlowFile.class);
    +        runner.addControllerService("parser", parser);
    +        runner.enableControllerService(parser);
    +        runner.addControllerService("writer", writer);
    +        runner.enableControllerService(writer);
    +
    +        runner.setProperty(REL_NAME, "select ID, AMOUNT1+AMOUNT2+AMOUNT3 
as TOTAL from FLOWFILE where ID=100");
    +        runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
    +        runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
    +
    +        runner.enqueue(new byte[0]);
    +        runner.run();
    +
    +        runner.assertTransferCount(REL_NAME, 1);
    +        final MockFlowFile out = 
runner.getFlowFilesForRelationship(REL_NAME).get(0);
    +
    +        
out.assertContentEquals("\"NAME\",\"POINTS\"\n\"100\",\"90.75\"\n");
    +    }
    +
    +
    +    @Test
    +    public void testAggregateFunction() throws InitializationException, 
IOException {
    +        final MockRecordParser parser = new MockRecordParser();
    +        parser.addSchemaField("name", RecordFieldType.STRING);
    +        parser.addSchemaField("points", RecordFieldType.INT);
    +        parser.addRecord("Tom", 1);
    +        parser.addRecord("Jerry", 2);
    +        parser.addRecord("Tom", 99);
    +
    +        final MockRecordWriter writer = new 
MockRecordWriter("\"name\",\"points\"");
    +
    +        final TestRunner runner = 
TestRunners.newTestRunner(QueryFlowFile.class);
    +        runner.addControllerService("parser", parser);
    +        runner.enableControllerService(parser);
    +        runner.addControllerService("writer", writer);
    +        runner.enableControllerService(writer);
    +
    +        runner.setProperty(REL_NAME, "select name, sum(points) as points 
from FLOWFILE GROUP BY name");
    +        runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
    +        runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
    +
    +        runner.enqueue("");
    +        runner.run();
    +
    +        runner.assertTransferCount(REL_NAME, 1);
    +        final MockFlowFile flowFileOut = 
runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS).get(0);
    +        
flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n");
    +    }
    +
    +    @Test
    +    public void testColumnNames() throws InitializationException, 
IOException {
    +        final MockRecordParser parser = new MockRecordParser();
    +        parser.addSchemaField("name", RecordFieldType.STRING);
    +        parser.addSchemaField("points", RecordFieldType.INT);
    +        parser.addSchemaField("greeting", RecordFieldType.STRING);
    +        parser.addRecord("Tom", 1, "Hello");
    +        parser.addRecord("Jerry", 2, "Hi");
    +        parser.addRecord("Tom", 99, "Howdy");
    +
    +        final List<String> colNames = new ArrayList<>();
    +        colNames.add("name");
    +        colNames.add("points");
    +        colNames.add("greeting");
    +        colNames.add("FAV_GREETING");
    +        final ResultSetValidatingRecordWriter writer = new 
ResultSetValidatingRecordWriter(colNames);
    +
    +        final TestRunner runner = 
TestRunners.newTestRunner(QueryFlowFile.class);
    +        runner.addControllerService("parser", parser);
    +        runner.enableControllerService(parser);
    +        runner.addControllerService("writer", writer);
    +        runner.enableControllerService(writer);
    +
    +        runner.setProperty(REL_NAME, "select *, greeting AS FAV_GREETING 
from FLOWFILE");
    +        runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
    +        runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
    +
    +        runner.enqueue("");
    +        runner.run();
    +
    +        runner.assertTransferCount(REL_NAME, 1);
    +    }
    +
    +
    +    private static class ResultSetValidatingRecordWriter extends 
AbstractControllerService implements RecordSetWriterFactory {
    +        private final List<String> columnNames;
    +
    +        public ResultSetValidatingRecordWriter(final List<String> 
colNames) {
    +            this.columnNames = new ArrayList<>(colNames);
    +        }
    +
    +        @Override
    +        public RecordSetWriter createWriter(ComponentLog logger) {
    +            return new RecordSetWriter() {
    +                @Override
    +                public WriteResult write(final RecordSet rs, final 
OutputStream out) throws IOException {
    +                    final int colCount = rs.getSchema().getFieldCount();
    +                    Assert.assertEquals(columnNames.size(), colCount);
    +
    +                    final List<String> colNames = new 
ArrayList<>(colCount);
    +                    for (int i = 0; i < colCount; i++) {
    +                        
colNames.add(rs.getSchema().getField(i).getFieldName());
    +                    }
    +
    +                    Assert.assertEquals(columnNames, colNames);
    +
    +                    // Iterate over the rest of the records to ensure that 
we read the entire stream. If we don't
    +                    // do this, we won't consume all of the data and as a 
result we will not close the stream properly
    +                    Record record;
    +                    while ((record = rs.next()) != null) {
    +                        System.out.println(record);
    +                    }
    +
    +                    return WriteResult.of(0, Collections.emptyMap());
    +                }
    +
    +                @Override
    +                public String getMimeType() {
    +                    return "text/plain";
    +                }
    +
    +                @Override
    +                public WriteResult write(Record record, OutputStream out) 
throws IOException {
    +                    return null;
    +                }
    +            };
    +        }
    +
    +    }
    +
    +    private static class MockRecordWriter extends 
AbstractControllerService implements RecordSetWriterFactory {
    --- End diff --
    
    Can this be its own class for reuse across the processors that will use a 
RowRecordReaderFactory?


> Create QueryFlowFile Processor
> ------------------------------
>
>                 Key: NIFI-1280
>                 URL: https://issues.apache.org/jira/browse/NIFI-1280
>             Project: Apache NiFi
>          Issue Type: Task
>          Components: Extensions
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>             Fix For: 1.2.0
>
>         Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to