[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15961298#comment-15961298 ]
ASF GitHub Bot commented on NIFI-1280: -------------------------------------- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110460794 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +public class TestQueryFlowFile { + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard.SQLTransform", "debug"); + } + + private static final String REL_NAME = "success"; + + @Test + public void testSimple() throws InitializationException, IOException, SQLException { + final MockRecordParser parser = new MockRecordParser(); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("age", RecordFieldType.INT); + parser.addRecord("Tom", 49); + + final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\""); + + final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''"); + runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer"); + + final int numIterations = 1; + for (int i = 0; i < numIterations; i++) { + runner.enqueue(new byte[0]); + } + + runner.setThreadCount(4); + runner.run(2 * numIterations); + + runner.assertTransferCount(REL_NAME, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0); + System.out.println(new String(out.toByteArray())); + out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n"); + } + + @Test + public void testParseFailure() throws InitializationException, IOException, SQLException { + final MockRecordParser parser = new MockRecordParser(); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("age", RecordFieldType.INT); + parser.addRecord("Tom", 49); + + final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\""); + + final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''"); + runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer"); + + final int numIterations = 1; + for (int i = 0; i < numIterations; i++) { + runner.enqueue(new byte[0]); + } + + runner.setThreadCount(4); + runner.run(2 * numIterations); + + runner.assertTransferCount(REL_NAME, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0); + System.out.println(new String(out.toByteArray())); + out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n"); + } + + + @Test + public void testTransformCalc() throws InitializationException, IOException, SQLException { + final MockRecordParser parser = new MockRecordParser(); + parser.addSchemaField("ID", RecordFieldType.INT); + parser.addSchemaField("AMOUNT1", RecordFieldType.FLOAT); + parser.addSchemaField("AMOUNT2", RecordFieldType.FLOAT); + parser.addSchemaField("AMOUNT3", RecordFieldType.FLOAT); + + parser.addRecord("008", 10.05F, 15.45F, 89.99F); + parser.addRecord("100", 20.25F, 25.25F, 45.25F); + parser.addRecord("105", 20.05F, 25.05F, 45.05F); + parser.addRecord("200", 34.05F, 25.05F, 75.05F); + + final MockRecordWriter writer = new MockRecordWriter("\"NAME\",\"POINTS\""); + + final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select ID, AMOUNT1+AMOUNT2+AMOUNT3 as TOTAL from FLOWFILE where ID=100"); + runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0); + + out.assertContentEquals("\"NAME\",\"POINTS\"\n\"100\",\"90.75\"\n"); + } + + + @Test + public void testAggregateFunction() throws InitializationException, IOException { + final MockRecordParser parser = new MockRecordParser(); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("points", RecordFieldType.INT); + parser.addRecord("Tom", 1); + parser.addRecord("Jerry", 2); + parser.addRecord("Tom", 99); + + final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\""); + + final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select name, sum(points) as points from FLOWFILE GROUP BY name"); + runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer"); + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS).get(0); + flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n"); + } + + @Test + public void testColumnNames() throws InitializationException, IOException { + final MockRecordParser parser = new MockRecordParser(); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("points", RecordFieldType.INT); + parser.addSchemaField("greeting", RecordFieldType.STRING); + parser.addRecord("Tom", 1, "Hello"); + parser.addRecord("Jerry", 2, "Hi"); + parser.addRecord("Tom", 99, "Howdy"); + + final List<String> colNames = new ArrayList<>(); + colNames.add("name"); + colNames.add("points"); + colNames.add("greeting"); + colNames.add("FAV_GREETING"); + final ResultSetValidatingRecordWriter writer = new ResultSetValidatingRecordWriter(colNames); + + final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select *, greeting AS FAV_GREETING from FLOWFILE"); + runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer"); + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + } + + + private static class ResultSetValidatingRecordWriter extends AbstractControllerService implements RecordSetWriterFactory { + private final List<String> columnNames; + + public ResultSetValidatingRecordWriter(final List<String> colNames) { + this.columnNames = new ArrayList<>(colNames); + } + + @Override + public RecordSetWriter createWriter(ComponentLog logger) { + return new RecordSetWriter() { + @Override + public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException { + final int colCount = rs.getSchema().getFieldCount(); + Assert.assertEquals(columnNames.size(), colCount); + + final List<String> colNames = new ArrayList<>(colCount); + for (int i = 0; i < colCount; i++) { + colNames.add(rs.getSchema().getField(i).getFieldName()); + } + + Assert.assertEquals(columnNames, colNames); + + // Iterate over the rest of the records to ensure that we read the entire stream. If we don't + // do this, we won't consume all of the data and as a result we will not close the stream properly + Record record; + while ((record = rs.next()) != null) { + System.out.println(record); + } + + return WriteResult.of(0, Collections.emptyMap()); + } + + @Override + public String getMimeType() { + return "text/plain"; + } + + @Override + public WriteResult write(Record record, OutputStream out) throws IOException { + return null; + } + }; + } + + } + + private static class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory { --- End diff -- Can this be its own class for reuse across the processors that will use a RowRecordReaderFactory? > Create QueryFlowFile Processor > ------------------------------ > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions > Reporter: Mark Payne > Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)