This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 73571ae  NIFI-6185: ListDatabaseTables processor doesn't close 
ResultSets
73571ae is described below

commit 73571ae30933063b121f9a757a64cf5f7f9c500e
Author: Lars Francke <[email protected]>
AuthorDate: Wed Apr 3 22:53:39 2019 +0200

    NIFI-6185: ListDatabaseTables processor doesn't close ResultSets
    
    This closes #3405.
    
    Signed-off-by: Koji Kawamura <[email protected]>
---
 .../processors/standard/ListDatabaseTables.java    | 136 +++++++++++----------
 1 file changed, 71 insertions(+), 65 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
index ec2d3c1..fa7dc5c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
@@ -230,79 +230,85 @@ public class ListDatabaseTables extends AbstractProcessor 
{
         try (final Connection con = 
dbcpService.getConnection(Collections.emptyMap())) {
 
             DatabaseMetaData dbMetaData = con.getMetaData();
-            ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, 
tableNamePattern, tableTypes);
-            while (rs.next()) {
-                final String tableCatalog = rs.getString(1);
-                final String tableSchema = rs.getString(2);
-                final String tableName = rs.getString(3);
-                final String tableType = rs.getString(4);
-                final String tableRemarks = rs.getString(5);
+            try (ResultSet rs = dbMetaData.getTables(catalog, schemaPattern, 
tableNamePattern, tableTypes)) {
+                while (rs.next()) {
+                    final String tableCatalog = rs.getString(1);
+                    final String tableSchema = rs.getString(2);
+                    final String tableName = rs.getString(3);
+                    final String tableType = rs.getString(4);
+                    final String tableRemarks = rs.getString(5);
 
-                // Build fully-qualified name
-                String fqn = Stream.of(tableCatalog, tableSchema, tableName)
-                        .filter(segment -> !StringUtils.isEmpty(segment))
-                        .collect(Collectors.joining("."));
+                    // Build fully-qualified name
+                    String fqn = Stream.of(tableCatalog, tableSchema, 
tableName)
+                      .filter(segment -> !StringUtils.isEmpty(segment))
+                      .collect(Collectors.joining("."));
 
-                String lastTimestampForTable = stateMapProperties.get(fqn);
-                boolean refreshTable = true;
-                try {
-                    // Refresh state if the interval has elapsed
-                    long lastRefreshed = -1;
-                    final long currentTime = System.currentTimeMillis();
-                    if (!StringUtils.isEmpty(lastTimestampForTable)) {
-                        lastRefreshed = Long.parseLong(lastTimestampForTable);
-                    }
-                    if (lastRefreshed == -1 || (refreshInterval > 0 && 
currentTime >= (lastRefreshed + refreshInterval))) {
-                        stateMapProperties.remove(lastTimestampForTable);
-                    } else {
-                        refreshTable = false;
+                    String lastTimestampForTable = stateMapProperties.get(fqn);
+                    boolean refreshTable = true;
+                    try {
+                        // Refresh state if the interval has elapsed
+                        long lastRefreshed = -1;
+                        final long currentTime = System.currentTimeMillis();
+                        if (!StringUtils.isEmpty(lastTimestampForTable)) {
+                            lastRefreshed = 
Long.parseLong(lastTimestampForTable);
+                        }
+                        if (lastRefreshed == -1 || (refreshInterval > 0 && 
currentTime >= (lastRefreshed
+                                                                               
            + refreshInterval))) {
+                            stateMapProperties.remove(lastTimestampForTable);
+                        } else {
+                            refreshTable = false;
+                        }
+                    } catch (final NumberFormatException nfe) {
+                        getLogger().error(
+                          "Failed to retrieve observed last table fetches from 
the State Manager. Will not perform "
+                          + "query until this is accomplished.", nfe);
+                        context.yield();
+                        return;
                     }
-                } catch (final NumberFormatException nfe) {
-                    getLogger().error("Failed to retrieve observed last table 
fetches from the State Manager. Will not perform "
-                            + "query until this is accomplished.", nfe);
-                    context.yield();
-                    return;
-                }
-                if (refreshTable) {
-                    FlowFile flowFile = session.create();
-                    logger.info("Found {}: {}", new Object[]{tableType, fqn});
-                    if (includeCount) {
-                        try (Statement st = con.createStatement()) {
-                            final String countQuery = "SELECT COUNT(1) FROM " 
+ fqn;
+                    if (refreshTable) {
+                        FlowFile flowFile = session.create();
+                        logger.info("Found {}: {}", new Object[] {tableType, 
fqn});
+                        if (includeCount) {
+                            try (Statement st = con.createStatement()) {
+                                final String countQuery = "SELECT COUNT(1) 
FROM " + fqn;
 
-                            logger.debug("Executing query: {}", new 
Object[]{countQuery});
-                            ResultSet countResult = 
st.executeQuery(countQuery);
-                            if (countResult.next()) {
-                                flowFile = session.putAttribute(flowFile, 
DB_TABLE_COUNT, Long.toString(countResult.getLong(1)));
+                                logger.debug("Executing query: {}", new 
Object[] {countQuery});
+                                try (ResultSet countResult = 
st.executeQuery(countQuery)) {
+                                    if (countResult.next()) {
+                                        flowFile = session
+                                          .putAttribute(flowFile, 
DB_TABLE_COUNT,
+                                            
Long.toString(countResult.getLong(1)));
+                                    }
+                                }
+                            } catch (SQLException se) {
+                                logger.error("Couldn't get row count for {}", 
new Object[] {fqn});
+                                session.remove(flowFile);
+                                continue;
                             }
-                        } catch (SQLException se) {
-                            logger.error("Couldn't get row count for {}", new 
Object[]{fqn});
-                            session.remove(flowFile);
-                            continue;
                         }
-                    }
-                    if (tableCatalog != null) {
-                        flowFile = session.putAttribute(flowFile, 
DB_TABLE_CATALOG, tableCatalog);
-                    }
-                    if (tableSchema != null) {
-                        flowFile = session.putAttribute(flowFile, 
DB_TABLE_SCHEMA, tableSchema);
-                    }
-                    flowFile = session.putAttribute(flowFile, DB_TABLE_NAME, 
tableName);
-                    flowFile = session.putAttribute(flowFile, 
DB_TABLE_FULLNAME, fqn);
-                    flowFile = session.putAttribute(flowFile, DB_TABLE_TYPE, 
tableType);
-                    if (tableRemarks != null) {
-                        flowFile = session.putAttribute(flowFile, 
DB_TABLE_REMARKS, tableRemarks);
-                    }
+                        if (tableCatalog != null) {
+                            flowFile = session.putAttribute(flowFile, 
DB_TABLE_CATALOG, tableCatalog);
+                        }
+                        if (tableSchema != null) {
+                            flowFile = session.putAttribute(flowFile, 
DB_TABLE_SCHEMA, tableSchema);
+                        }
+                        flowFile = session.putAttribute(flowFile, 
DB_TABLE_NAME, tableName);
+                        flowFile = session.putAttribute(flowFile, 
DB_TABLE_FULLNAME, fqn);
+                        flowFile = session.putAttribute(flowFile, 
DB_TABLE_TYPE, tableType);
+                        if (tableRemarks != null) {
+                            flowFile = session.putAttribute(flowFile, 
DB_TABLE_REMARKS, tableRemarks);
+                        }
 
-                    String transitUri;
-                    try {
-                        transitUri = dbMetaData.getURL();
-                    } catch (SQLException sqle) {
-                        transitUri = "<unknown>";
+                        String transitUri;
+                        try {
+                            transitUri = dbMetaData.getURL();
+                        } catch (SQLException sqle) {
+                            transitUri = "<unknown>";
+                        }
+                        session.getProvenanceReporter().receive(flowFile, 
transitUri);
+                        session.transfer(flowFile, REL_SUCCESS);
+                        stateMapProperties.put(fqn, 
Long.toString(System.currentTimeMillis()));
                     }
-                    session.getProvenanceReporter().receive(flowFile, 
transitUri);
-                    session.transfer(flowFile, REL_SUCCESS);
-                    stateMapProperties.put(fqn, 
Long.toString(System.currentTimeMillis()));
                 }
             }
             // Update the timestamps for listed tables

Reply via email to