http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java new file mode 100644 index 0000000..833a5d6 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import org.apache.calcite.config.CalciteConnectionProperty; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.DynamicRelationship; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.queryflowfile.FlowFileTable; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.ResultSetRecordSet; +import org.apache.nifi.util.StopWatch; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"sql", "query", "calcite", "route", "record", "transform", "select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", "text", "avro", "aggregate"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Evaluates one or more SQL queries against the contents of a FlowFile. The result of the " + + "SQL query then becomes the content of the output FlowFile. This can be used, for example, " + + "for field-specific filtering, transformation, and row-level filtering. " + + "Columns can be renamed, simple calculations and aggregations performed, etc. " + + "The Processor is configured with a Record Reader Controller Service and a Record Writer service so as to allow flexibility in incoming and outgoing data formats. " + + "The Processor must be configured with at least one user-defined property. The name of the Property " + + "is the Relationship to route data to, and the value of the Property is a SQL SELECT statement that is used to specify how input data should be transformed/filtered. " + + "The SQL statement must be valid ANSI SQL and is powered by Apache Calcite. " + + "If the transformation fails, the original FlowFile is routed to the 'failure' relationship. Otherwise, the data selected will be routed to the associated " + + "relationship. See the Processor Usage documentation for more information.") +@DynamicRelationship(name="<Property Name>", description="Each user-defined property defines a new Relationship for this Processor.") +@DynamicProperty(name = "The name of the relationship to route data to", value="A SQL SELECT statement that is used to determine what data should be routed to this " + + "relationship.", supportsExpressionLanguage=true, description="Each user-defined property specifies a SQL SELECT statement to run over the data, with the data " + + "that is selected being routed to the relationship whose name is the property name") +public class QueryFlowFile extends AbstractProcessor { + static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder() + .name("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema") + .identifiesControllerService(RowRecordReaderFactory.class) + .required(true) + .build(); + static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder() + .name("Record Writer") + .description("Specifies the Controller Service to use for writing results to a FlowFile") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + static final PropertyDescriptor INCLUDE_ZERO_RECORD_FLOWFILES = new PropertyDescriptor.Builder() + .name("Include Zero Record FlowFiles") + .description("When running the SQL statement against an incoming FlowFile, if the result has no data, " + + "this property specifies whether or not a FlowFile will be sent to the corresponding relationship") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + static final PropertyDescriptor CACHE_SCHEMA = new PropertyDescriptor.Builder() + .name("Cache Schema") + .description("Parsing the SQL query and deriving the FlowFile's schema is relatively expensive. If this value is set to true, " + + "the Processor will cache these values so that the Processor is much more efficient and much faster. However, if this is done, " + + "then the schema that is derived for the first FlowFile processed must apply to all FlowFiles. If all FlowFiles will not have the exact " + + "same schema, or if the SQL SELECT statement uses the Expression Language, this value should be set to false.") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original FlowFile is routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the SQL " + + "statement contains columns not present in input data), the original FlowFile it will " + + "be routed to this relationship") + .build(); + + private List<PropertyDescriptor> properties; + private final Set<Relationship> relationships = Collections.synchronizedSet(new HashSet<>()); + + private final Map<String, BlockingQueue<CachedStatement>> statementQueues = new HashMap<>(); + + @Override + protected void init(final ProcessorInitializationContext context) { + try { + DriverManager.registerDriver(new org.apache.calcite.jdbc.Driver()); + } catch (final SQLException e) { + throw new ProcessException("Failed to load Calcite JDBC Driver", e); + } + + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER_FACTORY); + properties.add(RECORD_WRITER_FACTORY); + properties.add(INCLUDE_ZERO_RECORD_FLOWFILES); + properties.add(CACHE_SCHEMA); + this.properties = Collections.unmodifiableList(properties); + + relationships.add(REL_FAILURE); + relationships.add(REL_ORIGINAL); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (!descriptor.isDynamic()) { + return; + } + + final Relationship relationship = new Relationship.Builder() + .name(descriptor.getName()) + .description("User-defined relationship that specifies where data that matches the specified SQL query should be routed") + .build(); + + if (newValue == null) { + relationships.remove(relationship); + } else { + relationships.add(relationship); + } + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final boolean cache = validationContext.getProperty(CACHE_SCHEMA).asBoolean(); + if (cache) { + for (final PropertyDescriptor descriptor : validationContext.getProperties().keySet()) { + if (descriptor.isDynamic() && validationContext.isExpressionLanguagePresent(validationContext.getProperty(descriptor).getValue())) { + return Collections.singleton(new ValidationResult.Builder() + .subject("Cache Schema") + .input("true") + .valid(false) + .explanation("Cannot have 'Cache Schema' property set to true if any SQL statement makes use of the Expression Language") + .build()); + } + } + } + + return Collections.emptyList(); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("SQL select statement specifies how data should be filtered/transformed. " + + "SQL SELECT should select from the FLOWFILE table") + .required(false) + .dynamic(true) + .expressionLanguageSupported(true) + .addValidator(new SqlValidator()) + .build(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final StopWatch stopWatch = new StopWatch(true); + + final RecordSetWriterFactory resultSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY) + .asControllerService(RecordSetWriterFactory.class); + final RowRecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY) + .asControllerService(RowRecordReaderFactory.class); + + final RecordSetWriter resultSetWriter = resultSetWriterFactory.createWriter(getLogger()); + final Map<FlowFile, Relationship> transformedFlowFiles = new HashMap<>(); + final Set<FlowFile> createdFlowFiles = new HashSet<>(); + + try { + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (!descriptor.isDynamic()) { + continue; + } + + final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build(); + + // We have to fork a child because we may need to read the input FlowFile more than once, + // and we cannot call session.read() on the original FlowFile while we are within a write + // callback for the original FlowFile. + FlowFile transformed = session.create(original); + + // Ensure that we have the FlowFile in the map in case we throw any Exception + createdFlowFiles.add(transformed); + + final String sql = context.getProperty(descriptor).evaluateAttributeExpressions(original).getValue(); + final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>(); + final QueryResult queryResult; + if (context.getProperty(CACHE_SCHEMA).asBoolean()) { + queryResult = queryWithCache(session, original, sql, context, recordParserFactory); + } else { + queryResult = query(session, original, sql, context, recordParserFactory); + } + + try { + final ResultSet rs = queryResult.getResultSet(); + transformed = session.write(transformed, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + try { + final ResultSetRecordSet recordSet = new ResultSetRecordSet(rs); + writeResultRef.set(resultSetWriter.write(recordSet, out)); + } catch (final Exception e) { + throw new IOException(e); + } + } + }); + } finally { + closeQuietly(queryResult); + } + + final WriteResult result = writeResultRef.get(); + if (result.getRecordCount() == 0 && !context.getProperty(INCLUDE_ZERO_RECORD_FLOWFILES).asBoolean()) { + session.remove(transformed); + transformedFlowFiles.remove(transformed); + getLogger().info("Transformed {} but the result contained no data so will not pass on a FlowFile", new Object[] {original}); + } else { + final Map<String, String> attributesToAdd = new HashMap<>(); + if (result.getAttributes() != null) { + attributesToAdd.putAll(result.getAttributes()); + } + + attributesToAdd.put(CoreAttributes.MIME_TYPE.key(), resultSetWriter.getMimeType()); + attributesToAdd.put("record.count", String.valueOf(result.getRecordCount())); + transformed = session.putAllAttributes(transformed, attributesToAdd); + transformedFlowFiles.put(transformed, relationship); + } + } + + final long elapsedMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS); + if (transformedFlowFiles.size() > 0) { + session.getProvenanceReporter().fork(original, transformedFlowFiles.keySet(), elapsedMillis); + + for (final Map.Entry<FlowFile, Relationship> entry : transformedFlowFiles.entrySet()) { + final FlowFile transformed = entry.getKey(); + final Relationship relationship = entry.getValue(); + + session.getProvenanceReporter().route(transformed, relationship); + session.transfer(transformed, relationship); + } + } + + getLogger().info("Successfully transformed {} in {} millis", new Object[] {original, elapsedMillis}); + session.transfer(original, REL_ORIGINAL); + } catch (ProcessException e) { + getLogger().error("Unable to transform {} due to {}", new Object[] {original, e}); + session.remove(createdFlowFiles); + session.transfer(original, REL_FAILURE); + } catch (final SQLException e) { + getLogger().error("Unable to transform {} due to {}", new Object[] {original, e.getCause() == null ? e : e.getCause()}); + session.remove(createdFlowFiles); + session.transfer(original, REL_FAILURE); + } + } + + private synchronized CachedStatement getStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session, + final FlowFile flowFile, final RowRecordReaderFactory recordReaderFactory) throws SQLException { + + final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(sql); + if (statementQueue == null) { + return buildCachedStatement(sql, connectionSupplier, session, flowFile, recordReaderFactory); + } + + final CachedStatement cachedStmt = statementQueue.poll(); + if (cachedStmt != null) { + return cachedStmt; + } + + return buildCachedStatement(sql, connectionSupplier, session, flowFile, recordReaderFactory); + } + + private CachedStatement buildCachedStatement(final String sql, final Supplier<CalciteConnection> connectionSupplier, final ProcessSession session, + final FlowFile flowFile, final RowRecordReaderFactory recordReaderFactory) throws SQLException { + + final CalciteConnection connection = connectionSupplier.get(); + final SchemaPlus rootSchema = connection.getRootSchema(); + + final FlowFileTable<?, ?> flowFileTable = new FlowFileTable<>(session, flowFile, recordReaderFactory, getLogger()); + rootSchema.add("FLOWFILE", flowFileTable); + rootSchema.setCacheEnabled(false); + + final PreparedStatement stmt = connection.prepareStatement(sql); + return new CachedStatement(stmt, flowFileTable, connection); + } + + @OnStopped + public synchronized void cleanup() { + for (final BlockingQueue<CachedStatement> statementQueue : statementQueues.values()) { + CachedStatement stmt; + while ((stmt = statementQueue.poll()) != null) { + closeQuietly(stmt.getStatement(), stmt.getConnection()); + } + } + + statementQueues.clear(); + } + + @OnScheduled + public synchronized void setupQueues(final ProcessContext context) { + // Create a Queue of PreparedStatements for each property that is user-defined. This allows us to easily poll the + // queue and add as necessary, knowing that the queue already exists. + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (!descriptor.isDynamic()) { + continue; + } + + final String sql = context.getProperty(descriptor).evaluateAttributeExpressions().getValue(); + final BlockingQueue<CachedStatement> queue = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); + statementQueues.put(sql, queue); + } + } + + protected QueryResult queryWithCache(final ProcessSession session, final FlowFile flowFile, final String sql, final ProcessContext context, + final RowRecordReaderFactory recordParserFactory) throws SQLException { + + final Supplier<CalciteConnection> connectionSupplier = () -> { + final Properties properties = new Properties(); + properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.JAVA.name()); + + try { + final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties); + final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + return calciteConnection; + } catch (final Exception e) { + throw new ProcessException(e); + } + }; + + final CachedStatement cachedStatement = getStatement(sql, connectionSupplier, session, flowFile, recordParserFactory); + final PreparedStatement stmt = cachedStatement.getStatement(); + final FlowFileTable<?, ?> table = cachedStatement.getTable(); + table.setFlowFile(session, flowFile); + + final ResultSet rs = stmt.executeQuery(); + + return new QueryResult() { + @Override + public void close() throws IOException { + final BlockingQueue<CachedStatement> statementQueue = statementQueues.get(sql); + if (statementQueue == null || !statementQueue.offer(cachedStatement)) { + try { + cachedStatement.getConnection().close(); + } catch (SQLException e) { + throw new IOException("Failed to close statement", e); + } + } + } + + @Override + public ResultSet getResultSet() { + return rs; + } + }; + } + + protected QueryResult query(final ProcessSession session, final FlowFile flowFile, final String sql, final ProcessContext context, + final RowRecordReaderFactory recordParserFactory) throws SQLException { + + final Properties properties = new Properties(); + properties.put(CalciteConnectionProperty.LEX.camelName(), Lex.JAVA.name()); + + Connection connection = null; + ResultSet resultSet = null; + Statement statement = null; + try { + connection = DriverManager.getConnection("jdbc:calcite:", properties); + final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + final SchemaPlus rootSchema = calciteConnection.getRootSchema(); + + final FlowFileTable<?, ?> flowFileTable = new FlowFileTable<>(session, flowFile, recordParserFactory, getLogger()); + rootSchema.add("FLOWFILE", flowFileTable); + rootSchema.setCacheEnabled(false); + + statement = connection.createStatement(); + resultSet = statement.executeQuery(sql); + + final ResultSet rs = resultSet; + final Statement stmt = statement; + final Connection conn = connection; + return new QueryResult() { + @Override + public void close() throws IOException { + closeQuietly(rs, stmt, conn); + } + + @Override + public ResultSet getResultSet() { + return rs; + } + }; + } catch (final Exception e) { + closeQuietly(resultSet, statement, connection); + throw e; + } + } + + private void closeQuietly(final AutoCloseable... closeables) { + if (closeables == null) { + return; + } + + for (final AutoCloseable closeable : closeables) { + if (closeable == null) { + continue; + } + + try { + closeable.close(); + } catch (final Exception e) { + getLogger().warn("Failed to close SQL resource", e); + } + } + } + + private static class SqlValidator implements Validator { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); + final SqlParser parser = SqlParser.create(substituted); + try { + parser.parseStmt(); + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(true) + .build(); + } catch (final Exception e) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Not a valid SQL Statement: " + e.getMessage()) + .build(); + } + } + } + + private static interface QueryResult extends Closeable { + ResultSet getResultSet(); + } + + private static class CachedStatement { + private final FlowFileTable<?, ?> table; + private final PreparedStatement statement; + private final Connection connection; + + public CachedStatement(final PreparedStatement statement, final FlowFileTable<?, ?> table, final Connection connection) { + this.statement = statement; + this.table = table; + this.connection = connection; + } + + public FlowFileTable<?, ?> getTable() { + return table; + } + + public PreparedStatement getStatement() { + return statement; + } + + public Connection getConnection() { + return connection; + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java new file mode 100644 index 0000000..1a62d14 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileEnumerator.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.queryflowfile; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.calcite.linq4j.Enumerator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.record.Record; + +public class FlowFileEnumerator<InternalType> implements Enumerator<Object> { + private final ProcessSession session; + private final FlowFile flowFile; + private final ComponentLog logger; + private final RowRecordReaderFactory recordParserFactory; + private final int[] fields; + + private InputStream rawIn; + private Object currentRow; + private RecordReader recordParser; + + public FlowFileEnumerator(final ProcessSession session, final FlowFile flowFile, final ComponentLog logger, final RowRecordReaderFactory parserFactory, final int[] fields) { + this.session = session; + this.flowFile = flowFile; + this.recordParserFactory = parserFactory; + this.logger = logger; + this.fields = fields; + reset(); + } + + @Override + public Object current() { + return currentRow; + } + + @Override + public boolean moveNext() { + currentRow = null; + while (currentRow == null) { + try { + currentRow = filterColumns(recordParser.nextRecord()); + break; + } catch (final IOException e) { + logger.error("Failed to read next record in stream for " + flowFile + ". Assuming end of stream.", e); + currentRow = null; + break; + } catch (final MalformedRecordException mre) { + logger.error("Failed to parse record in stream for " + flowFile + ". Will skip record and continue reading", mre); + } + } + + if (currentRow == null) { + // If we are out of data, close the InputStream. We do this because + // Calcite does not necessarily call our close() method. + close(); + } + return (currentRow != null); + } + + private Object filterColumns(final Record record) { + if (record == null) { + return null; + } + + final Object[] row = record.getValues(); + + // If we want no fields or if the row is null, just return null + if (fields == null || row == null) { + return row; + } + + // If we want only a single field, then Calcite is going to expect us to return + // the actual value, NOT a 1-element array of values. + if (fields.length == 1) { + final int desiredCellIndex = fields[0]; + return row[desiredCellIndex]; + } + + // Create a new Object array that contains only the desired fields. + if (row.length <= fields.length) { + return row; + } + + final Object[] filtered = new Object[fields.length]; + for (int i = 0; i < fields.length; i++) { + final int indexToKeep = fields[i]; + filtered[i] = row[indexToKeep]; + } + + return filtered; + } + + @Override + public void reset() { + if (rawIn != null) { + try { + rawIn.close(); + } catch (final Exception e) { + logger.warn("Could not close FlowFile's input due to " + e, e); + } + } + + rawIn = session.read(flowFile); + + try { + recordParser = recordParserFactory.createRecordReader(rawIn, logger); + } catch (final MalformedRecordException | IOException e) { + throw new ProcessException("Failed to reset stream", e); + } + } + + @Override + public void close() { + if (recordParser != null) { + try { + recordParser.close(); + } catch (final Exception e) { + logger.warn("Failed to close decorated source for " + flowFile, e); + } + } + + try { + rawIn.close(); + } catch (final Exception e) { + logger.warn("Failed to close InputStream for " + flowFile, e); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileProjectTableScanRule.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileProjectTableScanRule.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileProjectTableScanRule.java new file mode 100644 index 0000000..c5179c9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileProjectTableScanRule.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.queryflowfile; + +import java.util.List; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; + +/** + * Planner rule that projects from a {@link FlowFileTableScan} scan just the columns + * needed to satisfy a projection. If the projection's expressions are trivial, + * the projection is removed. + */ +public class FlowFileProjectTableScanRule extends RelOptRule { + public static final FlowFileProjectTableScanRule INSTANCE = new FlowFileProjectTableScanRule(); + + private FlowFileProjectTableScanRule() { + super( + operand(LogicalProject.class, + operand(FlowFileTableScan.class, none())), + "FlowFileProjectTableScanRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final LogicalProject project = call.rel(0); + final FlowFileTableScan scan = call.rel(1); + final int[] fields = getProjectFields(project.getProjects()); + + if (fields == null) { + // Project contains expressions more complex than just field references. + return; + } + + call.transformTo( + new FlowFileTableScan( + scan.getCluster(), + scan.getTable(), + scan.flowFileTable, + fields)); + } + + private int[] getProjectFields(List<RexNode> exps) { + final int[] fields = new int[exps.size()]; + + for (int i = 0; i < exps.size(); i++) { + final RexNode exp = exps.get(i); + + if (exp instanceof RexInputRef) { + fields[i] = ((RexInputRef) exp).getIndex(); + } else { + return null; // not a simple projection + } + } + + return fields; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java new file mode 100644 index 0000000..a23dcfa --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTable.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.queryflowfile; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.QueryProvider; +import org.apache.calcite.linq4j.Queryable; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.QueryableTable; +import org.apache.calcite.schema.Schema.TableType; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Schemas; +import org.apache.calcite.schema.TranslatableTable; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.util.Pair; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; + + +public class FlowFileTable<S, E> extends AbstractTable implements QueryableTable, TranslatableTable { + + private final RowRecordReaderFactory recordParserFactory; + private final ComponentLog logger; + + private RecordSchema recordSchema; + private RelDataType relDataType = null; + + private volatile ProcessSession session; + private volatile FlowFile flowFile; + + /** + * Creates a FlowFile table. + */ + public FlowFileTable(final ProcessSession session, final FlowFile flowFile, final RowRecordReaderFactory recordParserFactory, final ComponentLog logger) { + this.session = session; + this.flowFile = flowFile; + this.recordParserFactory = recordParserFactory; + this.logger = logger; + } + + public void setFlowFile(final ProcessSession session, final FlowFile flowFile) { + this.session = session; + this.flowFile = flowFile; + } + + + @Override + public String toString() { + return "FlowFileTable"; + } + + /** + * Returns an enumerable over a given projection of the fields. + * + * <p> + * Called from generated code. + */ + public Enumerable<Object> project(final int[] fields) { + return new AbstractEnumerable<Object>() { + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public Enumerator<Object> enumerator() { + return new FlowFileEnumerator(session, flowFile, logger, recordParserFactory, fields); + } + }; + } + + @Override + @SuppressWarnings("rawtypes") + public Expression getExpression(final SchemaPlus schema, final String tableName, final Class clazz) { + return Schemas.tableExpression(schema, getElementType(), tableName, clazz); + } + + @Override + public Type getElementType() { + return Object[].class; + } + + @Override + public <T> Queryable<T> asQueryable(final QueryProvider queryProvider, final SchemaPlus schema, final String tableName) { + throw new UnsupportedOperationException(); + } + + @Override + public RelNode toRel(final RelOptTable.ToRelContext context, final RelOptTable relOptTable) { + // Request all fields. + final int fieldCount = relOptTable.getRowType().getFieldCount(); + final int[] fields = new int[fieldCount]; + for (int i = 0; i < fieldCount; i++) { + fields[i] = i; + } + + return new FlowFileTableScan(context.getCluster(), relOptTable, this, fields); + } + + @Override + public RelDataType getRowType(final RelDataTypeFactory typeFactory) { + if (relDataType != null) { + return relDataType; + } + + RecordSchema schema; + try (final InputStream in = session.read(flowFile)) { + final RecordReader recordParser = recordParserFactory.createRecordReader(in, logger); + schema = recordParser.getSchema(); + } catch (final MalformedRecordException | IOException e) { + throw new ProcessException("Failed to determine schema of data records for " + flowFile, e); + } + + final List<String> names = new ArrayList<>(); + final List<RelDataType> types = new ArrayList<>(); + + final JavaTypeFactory javaTypeFactory = (JavaTypeFactory) typeFactory; + for (final RecordField field : schema.getFields()) { + names.add(field.getFieldName()); + types.add(getRelDataType(field.getDataType(), javaTypeFactory)); + } + + logger.debug("Found Schema: {}", new Object[] {schema}); + + if (recordSchema == null) { + recordSchema = schema; + } + + relDataType = typeFactory.createStructType(Pair.zip(names, types)); + return relDataType; + } + + private RelDataType getRelDataType(final DataType fieldType, final JavaTypeFactory typeFactory) { + switch (fieldType.getFieldType()) { + case BOOLEAN: + return typeFactory.createJavaType(boolean.class); + case BYTE: + return typeFactory.createJavaType(byte.class); + case CHAR: + return typeFactory.createJavaType(char.class); + case DATE: + return typeFactory.createJavaType(java.sql.Date.class); + case DOUBLE: + return typeFactory.createJavaType(double.class); + case FLOAT: + return typeFactory.createJavaType(float.class); + case INT: + return typeFactory.createJavaType(int.class); + case SHORT: + return typeFactory.createJavaType(short.class); + case TIME: + return typeFactory.createJavaType(java.sql.Time.class); + case TIMESTAMP: + return typeFactory.createJavaType(java.sql.Timestamp.class); + case LONG: + return typeFactory.createJavaType(long.class); + case STRING: + return typeFactory.createJavaType(String.class); + case ARRAY: + return typeFactory.createJavaType(Object[].class); + case OBJECT: + return typeFactory.createJavaType(Object.class); + } + + throw new IllegalArgumentException("Unknown Record Field Type: " + fieldType); + } + + @Override + public TableType getJdbcTableType() { + return TableType.TEMPORARY_TABLE; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTableScan.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTableScan.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTableScan.java new file mode 100644 index 0000000..ad3a1c3 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryflowfile/FlowFileTableScan.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.queryflowfile; + +import java.util.List; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.tree.Blocks; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; + +/** + * Relational expression representing a scan of a FlowFile. + * + * <p> + * Like any table scan, it serves as a leaf node of a query tree. + * </p> + */ +public class FlowFileTableScan extends TableScan implements EnumerableRel { + final FlowFileTable<?, ?> flowFileTable; + final int[] fields; + + protected FlowFileTableScan(final RelOptCluster cluster, final RelOptTable table, final FlowFileTable<?, ?> flowFileTable, final int[] fields) { + super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table); + + this.flowFileTable = flowFileTable; + this.fields = fields; + } + + @Override + public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs) { + return new FlowFileTableScan(getCluster(), table, flowFileTable, fields); + } + + @Override + public RelWriter explainTerms(final RelWriter pw) { + return super.explainTerms(pw).item("fields", Primitive.asList(fields)); + } + + @Override + public RelDataType deriveRowType() { + final List<RelDataTypeField> fieldList = table.getRowType().getFieldList(); + final RelDataTypeFactory.FieldInfoBuilder builder = getCluster().getTypeFactory().builder(); + for (int field : fields) { + builder.add(fieldList.get(field)); + } + return builder.build(); + } + + @Override + public void register(RelOptPlanner planner) { + planner.addRule(FlowFileProjectTableScanRule.INSTANCE); + } + + @Override + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + PhysType physType = PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray()); + + return implementor.result(physType, Blocks.toBlock( + Expressions.call(table.getExpression(FlowFileTable.class), "project", Expressions.constant(fields)))); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 9de5ab6..2f2b0cb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -75,6 +75,7 @@ org.apache.nifi.processors.standard.PutSyslog org.apache.nifi.processors.standard.PutTCP org.apache.nifi.processors.standard.PutUDP org.apache.nifi.processors.standard.QueryDatabaseTable +org.apache.nifi.processors.standard.QueryFlowFile org.apache.nifi.processors.standard.ReplaceText org.apache.nifi.processors.standard.RouteText org.apache.nifi.processors.standard.ReplaceTextWithMapping http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html new file mode 100644 index 0000000..1cc7923 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryFlowFile/additionalDetails.html @@ -0,0 +1,47 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>QueryFlowFile</title> + + <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> + </head> + + <body> + <p> + QueryFlowFile provides users a tremendous amount of power by leveraging an extremely well-known + syntax (SQL) to route, filter, transform, and query data as it traverses the system. In order to + provide the Processor with the maximum amount of flexibility, it is configured with a Controller + Service that is responsible for reading and parsing the incoming FlowFiles and a Controller Service + that is responsible for writing the results out. By using this paradigm, users are not forced to + convert their data from one format to another just to query it, and then transform the data back + into the form that they want. Rather, the appropriate Controller Service can easily be configured + and put to use for the appropriate data format. + </p> + + <p> + Rather than providing a single "SQL SELECT Statement" type of Property, this Processor makes use + of user-defined properties. Each user-defined property that is added to the Processor has a name + that becomes a new Relationship for the Processor and a corresponding SQL query that will be evaluated + against each FlowFile. This allows multiple SQL queries to be run against each FlowFile. + </p> + + <p> + The SQL syntax that is supported by this Processor is ANSI SQL and is powered by Apache Calcite. + </p> + </body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java deleted file mode 100644 index 421da98..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFilterCSVColumns.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard; - -import static org.junit.Assert.assertEquals; - -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.List; - -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TestFilterCSVColumns { - - private static final Logger LOGGER; - - static { - System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); - System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.FilterCSVColumns", "debug"); - System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.standard.TestFilterCSVColumns", "debug"); - LOGGER = LoggerFactory.getLogger(TestFilterCSVColumns.class); - } - - @Test - public void testTransformSimple() throws InitializationException, IOException, SQLException { - String sql = "select first_name, last_name, company_name, address, city from CSV.A where city='New York'"; - - Path inpath = Paths.get("src/test/resources/TestFilterCSVColumns/US500.csv"); - InputStream in = new FileInputStream(inpath.toFile()); - - ResultSet resultSet = FilterCSVColumns.transform(in, sql); - - int nrofColumns = resultSet.getMetaData().getColumnCount(); - - for (int i = 1; i <= nrofColumns; i++) { - System.out.print(resultSet.getMetaData().getColumnLabel(i) + " "); - } - System.out.println(); - - while (resultSet.next()) { - for (int i = 1; i <= nrofColumns; i++) { - System.out.print(resultSet.getString(i)+ " "); - } - System.out.println(); - } - } - - @Test - public void testTransformCalc() throws InitializationException, IOException, SQLException { - String sql = "select ID, AMOUNT1+AMOUNT2+AMOUNT3 as TOTAL from CSV.A where ID=100"; - - Path inpath = Paths.get("src/test/resources/TestFilterCSVColumns/Numeric.csv"); - InputStream in = new FileInputStream(inpath.toFile()); - - ResultSet resultSet = FilterCSVColumns.transform(in, sql); - - int nrofColumns = resultSet.getMetaData().getColumnCount(); - - for (int i = 1; i <= nrofColumns; i++) { - System.out.print(resultSet.getMetaData().getColumnLabel(i) + " "); - } - System.out.println(); - - while (resultSet.next()) { - for (int i = 1; i <= nrofColumns; i++) { - System.out.print(resultSet.getString(i)+ " "); - } - double total = resultSet.getDouble("TOTAL"); - System.out.println(); - assertEquals(90.75, total, 0.0001); - } - } - - @Test - public void testSimpleTypeless() throws InitializationException, IOException { - final TestRunner runner = TestRunners.newTestRunner(FilterCSVColumns.class); - String sql = "select first_name, last_name, company_name, address, city from CSV.A where city='New York'"; - runner.setProperty(FilterCSVColumns.SQL_SELECT, sql); - - runner.enqueue(Paths.get("src/test/resources/TestFilterCSVColumns/US500_typeless.csv")); - runner.run(); - - final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS); - for (final MockFlowFile flowFile : flowFiles) { - System.out.println(flowFile); - System.out.println(new String(flowFile.toByteArray())); - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java new file mode 100644 index 0000000..41469ba --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +public class TestQueryFlowFile { + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard.SQLTransform", "debug"); + } + + private static final String REL_NAME = "success"; + + @Test + public void testSimple() throws InitializationException, IOException, SQLException { + final MockRecordParser parser = new MockRecordParser(); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("age", RecordFieldType.INT); + parser.addRecord("Tom", 49); + + final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\""); + + final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''"); + runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer"); + + final int numIterations = 1; + for (int i = 0; i < numIterations; i++) { + runner.enqueue(new byte[0]); + } + + runner.setThreadCount(4); + runner.run(2 * numIterations); + + runner.assertTransferCount(REL_NAME, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0); + System.out.println(new String(out.toByteArray())); + out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n"); + } + + @Test + public void testParseFailure() throws InitializationException, IOException, SQLException { + final MockRecordParser parser = new MockRecordParser(); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("age", RecordFieldType.INT); + parser.addRecord("Tom", 49); + + final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\""); + + final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''"); + runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer"); + + final int numIterations = 1; + for (int i = 0; i < numIterations; i++) { + runner.enqueue(new byte[0]); + } + + runner.setThreadCount(4); + runner.run(2 * numIterations); + + runner.assertTransferCount(REL_NAME, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0); + System.out.println(new String(out.toByteArray())); + out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n"); + } + + + @Test + public void testTransformCalc() throws InitializationException, IOException, SQLException { + final MockRecordParser parser = new MockRecordParser(); + parser.addSchemaField("ID", RecordFieldType.INT); + parser.addSchemaField("AMOUNT1", RecordFieldType.FLOAT); + parser.addSchemaField("AMOUNT2", RecordFieldType.FLOAT); + parser.addSchemaField("AMOUNT3", RecordFieldType.FLOAT); + + parser.addRecord("008", 10.05F, 15.45F, 89.99F); + parser.addRecord("100", 20.25F, 25.25F, 45.25F); + parser.addRecord("105", 20.05F, 25.05F, 45.05F); + parser.addRecord("200", 34.05F, 25.05F, 75.05F); + + final MockRecordWriter writer = new MockRecordWriter("\"NAME\",\"POINTS\""); + + final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select ID, AMOUNT1+AMOUNT2+AMOUNT3 as TOTAL from FLOWFILE where ID=100"); + runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0); + + out.assertContentEquals("\"NAME\",\"POINTS\"\n\"100\",\"90.75\"\n"); + } + + + @Test + public void testAggregateFunction() throws InitializationException, IOException { + final MockRecordParser parser = new MockRecordParser(); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("points", RecordFieldType.INT); + parser.addRecord("Tom", 1); + parser.addRecord("Jerry", 2); + parser.addRecord("Tom", 99); + + final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\""); + + final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select name, sum(points) as points from FLOWFILE GROUP BY name"); + runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer"); + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + final MockFlowFile flowFileOut = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS).get(0); + flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n"); + } + + @Test + public void testColumnNames() throws InitializationException, IOException { + final MockRecordParser parser = new MockRecordParser(); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("points", RecordFieldType.INT); + parser.addSchemaField("greeting", RecordFieldType.STRING); + parser.addRecord("Tom", 1, "Hello"); + parser.addRecord("Jerry", 2, "Hi"); + parser.addRecord("Tom", 99, "Howdy"); + + final List<String> colNames = new ArrayList<>(); + colNames.add("name"); + colNames.add("points"); + colNames.add("greeting"); + colNames.add("FAV_GREETING"); + final ResultSetValidatingRecordWriter writer = new ResultSetValidatingRecordWriter(colNames); + + final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + runner.addControllerService("writer", writer); + runner.enableControllerService(writer); + + runner.setProperty(REL_NAME, "select *, greeting AS FAV_GREETING from FLOWFILE"); + runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser"); + runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer"); + + runner.enqueue(""); + runner.run(); + + runner.assertTransferCount(REL_NAME, 1); + } + + + private static class ResultSetValidatingRecordWriter extends AbstractControllerService implements RecordSetWriterFactory { + private final List<String> columnNames; + + public ResultSetValidatingRecordWriter(final List<String> colNames) { + this.columnNames = new ArrayList<>(colNames); + } + + @Override + public RecordSetWriter createWriter(ComponentLog logger) { + return new RecordSetWriter() { + @Override + public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException { + final int colCount = rs.getSchema().getFieldCount(); + Assert.assertEquals(columnNames.size(), colCount); + + final List<String> colNames = new ArrayList<>(colCount); + for (int i = 0; i < colCount; i++) { + colNames.add(rs.getSchema().getField(i).getFieldName()); + } + + Assert.assertEquals(columnNames, colNames); + + return WriteResult.of(0, Collections.emptyMap()); + } + + @Override + public String getMimeType() { + return "text/plain"; + } + + @Override + public WriteResult write(Record record, OutputStream out) throws IOException { + return null; + } + }; + } + + } + + private static class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory { + private final String header; + + public MockRecordWriter(final String header) { + this.header = header; + } + + @Override + public RecordSetWriter createWriter(final ComponentLog logger) { + return new RecordSetWriter() { + @Override + public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException { + out.write(header.getBytes()); + out.write("\n".getBytes()); + + int recordCount = 0; + final int numCols = rs.getSchema().getFieldCount(); + Record record = null; + while ((record = rs.next()) != null) { + recordCount++; + int i = 0; + for (final String fieldName : record.getSchema().getFieldNames()) { + final String val = record.getAsString(fieldName); + out.write("\"".getBytes()); + out.write(val.getBytes()); + out.write("\"".getBytes()); + + if (i++ < numCols - 1) { + out.write(",".getBytes()); + } + } + out.write("\n".getBytes()); + } + + return WriteResult.of(recordCount, Collections.emptyMap()); + } + + @Override + public String getMimeType() { + return "text/plain"; + } + + @Override + public WriteResult write(Record record, OutputStream out) throws IOException { + return null; + } + }; + } + } + + private static class MockRecordParser extends AbstractControllerService implements RowRecordReaderFactory { + private final List<Object[]> records = new ArrayList<>(); + private final List<RecordField> fields = new ArrayList<>(); + private final int failAfterN; + + public MockRecordParser() { + this(-1); + } + + public MockRecordParser(final int failAfterN) { + this.failAfterN = failAfterN; + } + + + public void addSchemaField(final String fieldName, final RecordFieldType type) { + fields.add(new RecordField(fieldName, type.getDataType())); + } + + public void addRecord(Object... values) { + records.add(values); + } + + @Override + public RecordReader createRecordReader(InputStream in, ComponentLog logger) throws IOException { + final Iterator<Object[]> itr = records.iterator(); + + return new RecordReader() { + private int recordCount = 0; + + @Override + public void close() throws IOException { + } + + @Override + public Record nextRecord() throws IOException, MalformedRecordException { + if (failAfterN >= recordCount) { + throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read"); + } + recordCount++; + + if (!itr.hasNext()) { + return null; + } + + final Object[] values = itr.next(); + final Map<String, Object> valueMap = new HashMap<>(); + int i = 0; + for (final RecordField field : fields) { + final String fieldName = field.getFieldName(); + valueMap.put(fieldName, values[i++]); + } + + return new MapRecord(new SimpleRecordSchema(fields), valueMap); + } + + @Override + public RecordSchema getSchema() { + return new SimpleRecordSchema(fields); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv deleted file mode 100644 index 2d56bb7..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestFilterCSVColumns/Numeric.csv +++ /dev/null @@ -1,5 +0,0 @@ -ID:int,AMOUNT1: float,AMOUNT2:float,AMOUNT3:float -008, 10.05, 15.45, 89.99 -100, 20.25, 25.25, 45.25 -105, 20.05, 25.05, 45.05 -200, 34.05, 25.05, 75.05 \ No newline at end of file
