[
https://issues.apache.org/jira/browse/NIFI-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354859#comment-16354859
]
ASF GitHub Bot commented on NIFI-1706:
--------------------------------------
Github user patricker commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2162#discussion_r166500502
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
---
@@ -249,34 +260,56 @@ public void setup(final ProcessContext context,
boolean shouldCleanCache, FlowFi
return;
}
- // Try to fill the columnTypeMap with the types of the desired
max-value columns
- final DBCPService dbcpService =
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
- final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ // Try to fill the columnTypeMap with the types of the desired
max-value columns
+ final DBCPService dbcpService =
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+ final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String sqlQuery =
context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
final DatabaseAdapter dbAdapter =
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
- // Try a query that returns no rows, for the purposes of
getting metadata about the columns. It is possible
- // to use DatabaseMetaData.getColumns(), but not all
drivers support this, notably the schema-on-read
- // approach as in Apache Drill
- String query = dbAdapter.getSelectStatement(tableName,
maxValueColumnNames, "1 = 0", null, null, null);
- ResultSet resultSet = st.executeQuery(query);
- ResultSetMetaData resultSetMetaData =
resultSet.getMetaData();
- int numCols = resultSetMetaData.getColumnCount();
- if (numCols > 0) {
- if (shouldCleanCache) {
- columnTypeMap.clear();
- }
- for (int i = 1; i <= numCols; i++) {
- String colName =
resultSetMetaData.getColumnName(i).toLowerCase();
- String colKey = getStateKey(tableName, colName);
- int colType = resultSetMetaData.getColumnType(i);
- columnTypeMap.putIfAbsent(colKey, colType);
+ // Try a query that returns no rows, for the purposes of
getting metadata about the columns. It is possible
+ // to use DatabaseMetaData.getColumns(), but not all drivers
support this, notably the schema-on-read
+ // approach as in Apache Drill
+ String query;
+
+ if(StringUtils.isEmpty(sqlQuery)) {
+ query = dbAdapter.getSelectStatement(tableName,
maxValueColumnNames, "1 = 0", null, null, null);
+ } else {
+ StringBuilder sbQuery = getWrappedQuery(sqlQuery,
tableName);
--- End diff --
I've checked in fixes for everything except this change. I don't want to
put in any more SQL building logic than I already have hard coded into QDB.
What if I added a new method to `DatabaseAdapter` for wrapping a `SELECT`
statement as a sub query. Input parameters would be similar to the existing
method for building a SELECT statement; column list, where clause, order by
clause.
> Extend QueryDatabaseTable to support arbitrary queries
> ------------------------------------------------------
>
> Key: NIFI-1706
> URL: https://issues.apache.org/jira/browse/NIFI-1706
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Affects Versions: 1.4.0
> Reporter: Paul Bormans
> Assignee: Peter Wicks
> Priority: Major
> Labels: features
>
> The QueryDatabaseTable is able to observe a configured database table for new
> rows and yield these into the flowfile. The model of an rdbms however is
> often (if not always) normalized so you would need to join various tables in
> order to "flatten" the data into useful events for a processing pipeline as
> can be build with nifi or various tools within the hadoop ecosystem.
> The request is to extend the processor to specify an arbitrary sql query
> instead of specifying the table name + columns.
> In addition (this may be another issue?) it is desired to limit the number of
> rows returned per run. Not just because of bandwidth issue's from the nifi
> pipeline onwards but mainly because huge databases may not be able to return
> so many records within a reasonable time.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)