[
https://issues.apache.org/jira/browse/NIFI-1706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16353876#comment-16353876
]
ASF GitHub Bot commented on NIFI-1706:
--------------------------------------
Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2162#discussion_r166278036
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
---
@@ -249,34 +260,56 @@ public void setup(final ProcessContext context,
boolean shouldCleanCache, FlowFi
return;
}
- // Try to fill the columnTypeMap with the types of the desired
max-value columns
- final DBCPService dbcpService =
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
- final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ // Try to fill the columnTypeMap with the types of the desired
max-value columns
+ final DBCPService dbcpService =
context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);
+ final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String sqlQuery =
context.getProperty(SQL_QUERY).evaluateAttributeExpressions().getValue();
final DatabaseAdapter dbAdapter =
dbAdapters.get(context.getProperty(DB_TYPE).getValue());
try (final Connection con = dbcpService.getConnection();
final Statement st = con.createStatement()) {
- // Try a query that returns no rows, for the purposes of
getting metadata about the columns. It is possible
- // to use DatabaseMetaData.getColumns(), but not all
drivers support this, notably the schema-on-read
- // approach as in Apache Drill
- String query = dbAdapter.getSelectStatement(tableName,
maxValueColumnNames, "1 = 0", null, null, null);
- ResultSet resultSet = st.executeQuery(query);
- ResultSetMetaData resultSetMetaData =
resultSet.getMetaData();
- int numCols = resultSetMetaData.getColumnCount();
- if (numCols > 0) {
- if (shouldCleanCache) {
- columnTypeMap.clear();
- }
- for (int i = 1; i <= numCols; i++) {
- String colName =
resultSetMetaData.getColumnName(i).toLowerCase();
- String colKey = getStateKey(tableName, colName);
- int colType = resultSetMetaData.getColumnType(i);
- columnTypeMap.putIfAbsent(colKey, colType);
+ // Try a query that returns no rows, for the purposes of
getting metadata about the columns. It is possible
+ // to use DatabaseMetaData.getColumns(), but not all drivers
support this, notably the schema-on-read
+ // approach as in Apache Drill
+ String query;
+
+ if(StringUtils.isEmpty(sqlQuery)) {
+ query = dbAdapter.getSelectStatement(tableName,
maxValueColumnNames, "1 = 0", null, null, null);
+ } else {
+ StringBuilder sbQuery = getWrappedQuery(sqlQuery,
tableName);
+ sbQuery.append(" WHERE 1=0");
+
+ query = sbQuery.toString();
+ }
+
+ ResultSet resultSet = st.executeQuery(query);
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ int numCols = resultSetMetaData.getColumnCount();
+ if (numCols > 0) {
+ if (shouldCleanCache){
+ columnTypeMap.clear();
+ }
+ for (int i = 1; i <= numCols; i++) {
+ String colName =
resultSetMetaData.getColumnName(i).toLowerCase();
+ String colKey = getStateKey(tableName, colName);
+ int colType = resultSetMetaData.getColumnType(i);
+ columnTypeMap.putIfAbsent(colKey, colType);
+ }
+
+ List<String> maxValueColumnNameList =
org.apache.commons.lang3.StringUtils.isEmpty(maxValueColumnNames)
--- End diff --
I think we can use `org.apache.nifi.util.StringUtils` here instead, which
is already imported. Moreover, we can remove this emptiness check because it's
already checked at the beginning of this method.
> Extend QueryDatabaseTable to support arbitrary queries
> ------------------------------------------------------
>
> Key: NIFI-1706
> URL: https://issues.apache.org/jira/browse/NIFI-1706
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Affects Versions: 1.4.0
> Reporter: Paul Bormans
> Assignee: Peter Wicks
> Priority: Major
> Labels: features
>
> The QueryDatabaseTable is able to observe a configured database table for new
> rows and yield these into the flowfile. The model of an rdbms however is
> often (if not always) normalized so you would need to join various tables in
> order to "flatten" the data into useful events for a processing pipeline as
> can be build with nifi or various tools within the hadoop ecosystem.
> The request is to extend the processor to specify an arbitrary sql query
> instead of specifying the table name + columns.
> In addition (this may be another issue?) it is desired to limit the number of
> rows returned per run. Not just because of bandwidth issue's from the nifi
> pipeline onwards but mainly because huge databases may not be able to return
> so many records within a reasonable time.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)