Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1407#discussion_r98680344
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateTableFetch.java
 ---
    @@ -202,40 +246,60 @@ public void onTrigger(final ProcessContext context, 
final ProcessSessionFactory
                         ResultSetMetaData rsmd = resultSet.getMetaData();
                         for (int i = 2; i <= rsmd.getColumnCount(); i++) {
                             String resultColumnName = 
rsmd.getColumnName(i).toLowerCase();
    +                        String fullyQualifiedStateKey = 
getStateKey(tableName, resultColumnName);
    +                        String resultColumnCurrentMax = 
statePropertyMap.get(fullyQualifiedStateKey);
    +                        if (StringUtils.isEmpty(resultColumnCurrentMax) && 
!isDynamicTableName) {
    +                            // If we can't find the value at the 
fully-qualified key name and the table name is static, it is possible (under a 
previous scheme)
    +                            // the value has been stored under a key that 
is only the column name. Fall back to check the column name; either way, when a 
new
    +                            // maximum value is observed, it will be 
stored under the fully-qualified key from then on.
    +                            resultColumnCurrentMax = 
statePropertyMap.get(resultColumnName);
    +                        }
    +
                             int type = rsmd.getColumnType(i);
    +                        if (isDynamicTableName) {
    +                            // We haven't pre-populated the column type 
map if the table name is dynamic, so do it here
    +                            columnTypeMap.put(fullyQualifiedStateKey, 
type);
    +                        }
                             try {
    -                            String newMaxValue = 
getMaxValueFromRow(resultSet, i, type, 
statePropertyMap.get(resultColumnName.toLowerCase()), dbAdapter.getName());
    +                            String newMaxValue = 
getMaxValueFromRow(resultSet, i, type, resultColumnCurrentMax, 
dbAdapter.getName());
                                 if (newMaxValue != null) {
    -                                statePropertyMap.put(resultColumnName, 
newMaxValue);
    +                                
statePropertyMap.put(fullyQualifiedStateKey, newMaxValue);
                                 }
                             } catch (ParseException | IOException pie) {
                                 // Fail the whole thing here before we start 
creating flow files and such
                                 throw new ProcessException(pie);
                             }
    +
                         }
                     } else {
                         // Something is very wrong here, one row (even if 
count is zero) should be returned
                         throw new SQLException("No rows returned from metadata 
query: " + selectQuery);
                     }
    -            } catch (SQLException e) {
    -                logger.error("Unable to execute SQL select query {} due to 
{}", new Object[]{selectQuery, e});
    -                throw new ProcessException(e);
    -            }
    -            final int numberOfFetches = (partitionSize == 0) ? rowCount : 
(rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
     
    +                final int numberOfFetches = (partitionSize == 0) ? 
rowCount : (rowCount / partitionSize) + (rowCount % partitionSize == 0 ? 0 : 1);
     
    -            // Generate SQL statements to read "pages" of data
    -            for (int i = 0; i < numberOfFetches; i++) {
    -                FlowFile sqlFlowFile;
    +                // Generate SQL statements to read "pages" of data
    +                for (int i = 0; i < numberOfFetches; i++) {
    +                    Integer limit = partitionSize == 0 ? null : 
partitionSize;
    +                    Integer offset = partitionSize == 0 ? null : i * 
partitionSize;
    +                    final String query = 
dbAdapter.getSelectStatement(tableName, columnNames, whereClause, 
StringUtils.join(maxValueColumnNameList, ", "), limit, offset);
    +                    FlowFile sqlFlowFile = (fileToProcess == null) ? 
session.create() : session.create(fileToProcess);
    +                    sqlFlowFile = session.write(sqlFlowFile, out -> 
out.write(query.getBytes()));
    +                    session.transfer(sqlFlowFile, REL_SUCCESS);
    +                }
    +
    +                if (fileToProcess != null) {
    +                    session.remove(fileToProcess);
    +                }
    +            } catch (SQLException e) {
    +                if (fileToProcess != null) {
    +                    logger.error("Unable to execute SQL select query {} 
due to {}, routing {} to failure", new Object[]{selectQuery, e, fileToProcess});
    +                    session.transfer(fileToProcess, REL_FAILURE);
    --- End diff --
    
    agreed, will add the message (not the stack trace) as an attribute.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to