This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 73571ae NIFI-6185: ListDatabaseTables processor doesn't close
ResultSets
73571ae is described below
commit 73571ae30933063b121f9a757a64cf5f7f9c500e
Author: Lars Francke <[email protected]>
AuthorDate: Wed Apr 3 22:53:39 2019 +0200
NIFI-6185: ListDatabaseTables processor doesn't close ResultSets
This closes #3405.
Signed-off-by: Koji Kawamura <[email protected]>
---
.../processors/standard/ListDatabaseTables.java | 136 +++++++++++----------
1 file changed, 71 insertions(+), 65 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
index ec2d3c1..fa7dc5c 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
@@ -230,79 +230,85 @@ public class ListDatabaseTables extends AbstractProcessor
{
try (final Connection con =
dbcpService.getConnection(Collections.emptyMap())) {
DatabaseMetaData dbMetaData = con.getMetaData();
- ResultSet rs = dbMetaData.getTables(catalog, schemaPattern,
tableNamePattern, tableTypes);
- while (rs.next()) {
- final String tableCatalog = rs.getString(1);
- final String tableSchema = rs.getString(2);
- final String tableName = rs.getString(3);
- final String tableType = rs.getString(4);
- final String tableRemarks = rs.getString(5);
+ try (ResultSet rs = dbMetaData.getTables(catalog, schemaPattern,
tableNamePattern, tableTypes)) {
+ while (rs.next()) {
+ final String tableCatalog = rs.getString(1);
+ final String tableSchema = rs.getString(2);
+ final String tableName = rs.getString(3);
+ final String tableType = rs.getString(4);
+ final String tableRemarks = rs.getString(5);
- // Build fully-qualified name
- String fqn = Stream.of(tableCatalog, tableSchema, tableName)
- .filter(segment -> !StringUtils.isEmpty(segment))
- .collect(Collectors.joining("."));
+ // Build fully-qualified name
+ String fqn = Stream.of(tableCatalog, tableSchema,
tableName)
+ .filter(segment -> !StringUtils.isEmpty(segment))
+ .collect(Collectors.joining("."));
- String lastTimestampForTable = stateMapProperties.get(fqn);
- boolean refreshTable = true;
- try {
- // Refresh state if the interval has elapsed
- long lastRefreshed = -1;
- final long currentTime = System.currentTimeMillis();
- if (!StringUtils.isEmpty(lastTimestampForTable)) {
- lastRefreshed = Long.parseLong(lastTimestampForTable);
- }
- if (lastRefreshed == -1 || (refreshInterval > 0 &&
currentTime >= (lastRefreshed + refreshInterval))) {
- stateMapProperties.remove(lastTimestampForTable);
- } else {
- refreshTable = false;
+ String lastTimestampForTable = stateMapProperties.get(fqn);
+ boolean refreshTable = true;
+ try {
+ // Refresh state if the interval has elapsed
+ long lastRefreshed = -1;
+ final long currentTime = System.currentTimeMillis();
+ if (!StringUtils.isEmpty(lastTimestampForTable)) {
+ lastRefreshed =
Long.parseLong(lastTimestampForTable);
+ }
+ if (lastRefreshed == -1 || (refreshInterval > 0 &&
currentTime >= (lastRefreshed
+
+ refreshInterval))) {
+ stateMapProperties.remove(lastTimestampForTable);
+ } else {
+ refreshTable = false;
+ }
+ } catch (final NumberFormatException nfe) {
+ getLogger().error(
+ "Failed to retrieve observed last table fetches from
the State Manager. Will not perform "
+ + "query until this is accomplished.", nfe);
+ context.yield();
+ return;
}
- } catch (final NumberFormatException nfe) {
- getLogger().error("Failed to retrieve observed last table
fetches from the State Manager. Will not perform "
- + "query until this is accomplished.", nfe);
- context.yield();
- return;
- }
- if (refreshTable) {
- FlowFile flowFile = session.create();
- logger.info("Found {}: {}", new Object[]{tableType, fqn});
- if (includeCount) {
- try (Statement st = con.createStatement()) {
- final String countQuery = "SELECT COUNT(1) FROM "
+ fqn;
+ if (refreshTable) {
+ FlowFile flowFile = session.create();
+ logger.info("Found {}: {}", new Object[] {tableType,
fqn});
+ if (includeCount) {
+ try (Statement st = con.createStatement()) {
+ final String countQuery = "SELECT COUNT(1)
FROM " + fqn;
- logger.debug("Executing query: {}", new
Object[]{countQuery});
- ResultSet countResult =
st.executeQuery(countQuery);
- if (countResult.next()) {
- flowFile = session.putAttribute(flowFile,
DB_TABLE_COUNT, Long.toString(countResult.getLong(1)));
+ logger.debug("Executing query: {}", new
Object[] {countQuery});
+ try (ResultSet countResult =
st.executeQuery(countQuery)) {
+ if (countResult.next()) {
+ flowFile = session
+ .putAttribute(flowFile,
DB_TABLE_COUNT,
+
Long.toString(countResult.getLong(1)));
+ }
+ }
+ } catch (SQLException se) {
+ logger.error("Couldn't get row count for {}",
new Object[] {fqn});
+ session.remove(flowFile);
+ continue;
}
- } catch (SQLException se) {
- logger.error("Couldn't get row count for {}", new
Object[]{fqn});
- session.remove(flowFile);
- continue;
}
- }
- if (tableCatalog != null) {
- flowFile = session.putAttribute(flowFile,
DB_TABLE_CATALOG, tableCatalog);
- }
- if (tableSchema != null) {
- flowFile = session.putAttribute(flowFile,
DB_TABLE_SCHEMA, tableSchema);
- }
- flowFile = session.putAttribute(flowFile, DB_TABLE_NAME,
tableName);
- flowFile = session.putAttribute(flowFile,
DB_TABLE_FULLNAME, fqn);
- flowFile = session.putAttribute(flowFile, DB_TABLE_TYPE,
tableType);
- if (tableRemarks != null) {
- flowFile = session.putAttribute(flowFile,
DB_TABLE_REMARKS, tableRemarks);
- }
+ if (tableCatalog != null) {
+ flowFile = session.putAttribute(flowFile,
DB_TABLE_CATALOG, tableCatalog);
+ }
+ if (tableSchema != null) {
+ flowFile = session.putAttribute(flowFile,
DB_TABLE_SCHEMA, tableSchema);
+ }
+ flowFile = session.putAttribute(flowFile,
DB_TABLE_NAME, tableName);
+ flowFile = session.putAttribute(flowFile,
DB_TABLE_FULLNAME, fqn);
+ flowFile = session.putAttribute(flowFile,
DB_TABLE_TYPE, tableType);
+ if (tableRemarks != null) {
+ flowFile = session.putAttribute(flowFile,
DB_TABLE_REMARKS, tableRemarks);
+ }
- String transitUri;
- try {
- transitUri = dbMetaData.getURL();
- } catch (SQLException sqle) {
- transitUri = "<unknown>";
+ String transitUri;
+ try {
+ transitUri = dbMetaData.getURL();
+ } catch (SQLException sqle) {
+ transitUri = "<unknown>";
+ }
+ session.getProvenanceReporter().receive(flowFile,
transitUri);
+ session.transfer(flowFile, REL_SUCCESS);
+ stateMapProperties.put(fqn,
Long.toString(System.currentTimeMillis()));
}
- session.getProvenanceReporter().receive(flowFile,
transitUri);
- session.transfer(flowFile, REL_SUCCESS);
- stateMapProperties.put(fqn,
Long.toString(System.currentTimeMillis()));
}
}
// Update the timestamps for listed tables