Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/645#discussion_r71073757 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.standard.db.DatabaseAdapter; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +@EventDriven +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"sql", "select", "jdbc", "query", "database", "fetch", "generate"}) +@SeeAlso(QueryDatabaseTable.class) +@CapabilityDescription("Generates SQL select queries that fetch \"pages\" of rows from a table. The partition size property, along with the table's row count, " + + "determine the size and number of pages and generated FlowFiles. In addition, incremental fetching can be achieved by setting Maximum-Value Columns, " + + "which causes the processor to track the columns' maximum values, thus only fetching rows whose columns' values exceed the observed maximums.") +@Stateful(scopes = Scope.CLUSTER, description = "After performing a query on the specified table, the maximum values for " + + "the specified column(s) will be retained for use in future executions of the query. This allows the Processor " + + "to fetch only those records that have max values greater than the retained values. This can be used for " + + "incremental fetching, fetching of newly added rows, etc. To clear the maximum values, clear the state of the processor " + + "per the State Management documentation") +@WritesAttribute(attribute = "querydbtable.row.count") +public class GenerateTableFetch extends AbstractDatabaseFetchProcessor { + + + public static final PropertyDescriptor PARTITION_SIZE = new PropertyDescriptor.Builder() + .name("gen-table-fetch-partition-size") + .displayName("Partition Size") + .description("The number of result rows to be fetched by each generated SQL statement. The total number of rows in " + + "the table divided by the partition size gives the number of SQL statements (i.e. FlowFiles) generated. A " + + "value of zero indicates that a single FlowFile is to be generated whose SQL statement will fetch all rows " + + "in the table.") + .defaultValue("10000") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + public GenerateTableFetch() { + final Set<Relationship> r = new HashSet<>(); + r.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(r); + + final List<PropertyDescriptor> pds = new ArrayList<>(); + pds.add(DBCP_SERVICE); + pds.add(DB_TYPE); + pds.add(TABLE_NAME); + pds.add(COLUMN_NAMES); + pds.add(MAX_VALUE_COLUMN_NAMES); + pds.add(QUERY_TIMEOUT); + pds.add(PARTITION_SIZE); + propDescriptors = Collections.unmodifiableList(pds); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propDescriptors; + } + + @OnScheduled + public void setup(final ProcessContext context) { + super.setup(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + ProcessSession session = sessionFactory.createSession(); + final ComponentLog logger = getLogger(); + + final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + final DatabaseAdapter dbAdapter = dbAdapters.get(context.getProperty(DB_TYPE).getValue()); + final String tableName = context.getProperty(TABLE_NAME).getValue(); + final String columnNames = context.getProperty(COLUMN_NAMES).getValue(); + final String maxValueColumnNames = context.getProperty(MAX_VALUE_COLUMN_NAMES).getValue(); + final int partitionSize = context.getProperty(PARTITION_SIZE).asInteger(); + + final StateManager stateManager = context.getStateManager(); + final StateMap stateMap; + + try { + stateMap = stateManager.getState(Scope.CLUSTER); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve observed maximum values from the State Manager. Will not perform " + + "query until this is accomplished.", ioe); + context.yield(); + return; + } + // Make a mutable copy of the current state property map. This will be updated by the result row callback, and eventually + // set as the current state map (after the session has been committed) + final Map<String, String> statePropertyMap = new HashMap<>(stateMap.toMap()); + + // Build a WHERE clause with maximum-value columns (if they exist), and a list of column names that will contain MAX(<column>) aliases. The + // executed SQL query will retrieve the count of all records after the filter(s) have been applied, as well as the new maximum values for the + // specified columns. This allows the processor to generate the correctly partitioned SQL statements as well as to update the state with the + // latest observed maximum values. + String whereClause = null; + List<String> maxValueColumnNameList = StringUtils.isEmpty(maxValueColumnNames) + ? new ArrayList<>(0) + : Arrays.asList(maxValueColumnNames.split("\\s*,\\s*")); + List<String> maxValueClauses = new ArrayList<>(maxValueColumnNameList.size()); + + String columnsClause = null; + List<String> maxValueSelectColumns = new ArrayList<>(maxValueColumnNameList.size() + 1); + maxValueSelectColumns.add("COUNT(*)"); + + // For each maximum-value column, get a WHERE filter and a MAX(column) alias + maxValueColumnNameList.forEach(colName -> { + maxValueSelectColumns.add("MAX(" + colName + ") " + colName); + String maxValue = statePropertyMap.get(colName.toLowerCase()); + if (!StringUtils.isEmpty(maxValue)) { + Integer type = columnTypeMap.get(colName.toLowerCase()); + if (type == null) { + // This shouldn't happen as we are populating columnTypeMap when the processor is scheduled. + throw new IllegalArgumentException("No column type found for: " + colName); + } + // Add a condition for the WHERE clause + maxValueClauses.add(colName + " > " + getLiteralByType(type, maxValue, dbAdapter.getName())); + } + }); + + whereClause = StringUtils.join(maxValueClauses, " AND "); + columnsClause = StringUtils.join(maxValueSelectColumns, ", "); + + // Build a SELECT query with maximum-value columns (if present) + final String selectQuery = dbAdapter.getSelectStatement(tableName, columnsClause, whereClause, null, null, null); + + try (final Connection con = dbcpService.getConnection(); + final Statement st = con.createStatement()) { + + final Integer queryTimeout = context.getProperty(QUERY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue(); + st.setQueryTimeout(queryTimeout); // timeout in seconds + + logger.debug("Executing {}", new Object[]{selectQuery}); + ResultSet resultSet = st.executeQuery(selectQuery); + if (resultSet.next()) { + // Total row count is in the first column + int rowCount = resultSet.getInt(1); + + // Update the state map with the newly-observed maximum values + ResultSetMetaData rsmd = resultSet.getMetaData(); + for (int i = 2; i <= rsmd.getColumnCount(); i++) { + String resultColumnName = rsmd.getColumnName(i).toLowerCase(); + int type = rsmd.getColumnType(i); + try { + String newMaxValue = getMaxValueFromRow(resultSet, i, type, statePropertyMap.get(resultColumnName.toLowerCase()), dbAdapter.getName()); + if (newMaxValue != null) { + statePropertyMap.put(resultColumnName, newMaxValue); + } + } catch (ParseException | IOException pie) { + // Fail the whole thing here before we start creating flow files and such + throw new ProcessException(pie); + } + } + + final int numberOfFetches = (partitionSize == 0) ? rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1); + + // Generate SQL statements to read "pages" of data + for (int i = 0; i < numberOfFetches; i++) { + FlowFile sqlFlowFile = null; + try { + Integer limit = partitionSize == 0 ? null : partitionSize; + Integer offset = partitionSize == 0 ? null : i * partitionSize; + final String query = dbAdapter.getSelectStatement(tableName, columnNames, StringUtils.join(maxValueClauses, " AND "), null, limit, offset); + sqlFlowFile = session.create(); + sqlFlowFile = session.write(sqlFlowFile, out -> { + out.write(query.getBytes()); + }); + session.transfer(sqlFlowFile, REL_SUCCESS); + + } catch (Exception e) { + logger.error("Error while generating SQL statement", e); + if (sqlFlowFile != null) { + session.remove(sqlFlowFile); --- End diff -- Definitely possible, but not sure how one page would fail when the others wouldn't, unless there's a bug in the code (which I should fix). If exceptions could happen for arbitrary pages, I will move the try/catch outside
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---