Repository: nifi Updated Branches: refs/heads/master ebead820f -> d28b1172d
NIFI-5739: Maintain CaptureChangeMySQL JDBC connection automatically Signed-off-by: Pierre Villard <[email protected]> This closes #3103. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d28b1172 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d28b1172 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d28b1172 Branch: refs/heads/master Commit: d28b1172db974cb1bd6aeba479b7655ce89c42db Parents: ebead82 Author: Koji Kawamura <[email protected]> Authored: Tue Oct 23 15:38:24 2018 +0900 Committer: Pierre Villard <[email protected]> Committed: Wed Oct 24 09:26:41 2018 +0200 ---------------------------------------------------------------------- .../mysql/processors/CaptureChangeMySQL.java | 77 +++++++++++++++----- .../processors/CaptureChangeMySQLTest.groovy | 3 +- 2 files changed, 61 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d28b1172/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index f58ed7e..e8c94d1 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -377,7 +377,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { private final Serializer<TableInfo> cacheValueSerializer = new TableInfo.Serializer(); private final Deserializer<TableInfo> cacheValueDeserializer = new TableInfo.Deserializer(); - private Connection jdbcConnection = null; + private JDBCConnectionHolder jdbcConnectionHolder = null; private final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter(); private final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter(); @@ -710,9 +710,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } if (createEnrichmentConnection) { + jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, username, password, null, connectTimeout); try { - jdbcConnection = getJdbcConnection(driverLocation, driverName, connectedHost, username, password, null); - } catch (InitializationException | SQLException e) { + // Ensure connection can be created. + getJdbcConnection(); + } catch (SQLException e) { binlogClient.disconnect(); binlogClient = null; throw new IOException("Error creating binlog enrichment JDBC connection to any of the specified hosts", e); @@ -945,6 +947,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { throw new CDCException("Error closing CDC connection", e); } finally { binlogClient = null; + + if (jdbcConnectionHolder != null) { + jdbcConnectionHolder.close(); + } } } @@ -998,8 +1004,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { */ protected TableInfo loadTableInfo(TableInfoCacheKey key) throws SQLException { TableInfo tableInfo = null; - if (jdbcConnection != null) { - try (Statement s = jdbcConnection.createStatement()) { + if (jdbcConnectionHolder != null) { + + try (Statement s = getJdbcConnection().createStatement()) { s.execute("USE `" + key.getDatabaseName() + "`"); ResultSet rs = s.executeQuery("SELECT * FROM `" + key.getTableName() + "` LIMIT 0"); ResultSetMetaData rsmd = rs.getMetaData(); @@ -1018,23 +1025,59 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { return tableInfo; } + protected Connection getJdbcConnection() throws SQLException { + return jdbcConnectionHolder.getConnection(); + } + + private class JDBCConnectionHolder { + private String connectionUrl; + private Properties connectionProps = new Properties(); + private long connectionTimeoutMillis; + + private Connection connection; + + private JDBCConnectionHolder(InetSocketAddress host, String username, String password, Map<String, String> customProperties, long connectionTimeoutMillis) { + this.connectionUrl = "jdbc:mysql://" + host.getHostString() + ":" + host.getPort(); + if (customProperties != null) { + connectionProps.putAll(customProperties); + } + connectionProps.put("user", username); + connectionProps.put("password", password); + this.connectionTimeoutMillis = connectionTimeoutMillis; + } + + private Connection getConnection() throws SQLException { + if (connection != null && connection.isValid((int) (connectionTimeoutMillis / 1000))) { + getLogger().trace("Returning the pooled JDBC connection."); + return connection; + } + + // Close the existing connection just in case. + close(); + + getLogger().trace("Creating a new JDBC connection."); + connection = DriverManager.getConnection(connectionUrl, connectionProps); + return connection; + } + + private void close() { + if (connection != null) { + try { + getLogger().trace("Closing the pooled JDBC connection."); + connection.close(); + } catch (SQLException e) { + getLogger().warn("Failed to close JDBC connection due to " + e, e); + } + } + } + } + + /** * using Thread.currentThread().getContextClassLoader(); will ensure that you are using the ClassLoader for you NAR. * * @throws InitializationException if there is a problem obtaining the ClassLoader */ - protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map<String, String> customProperties) - throws InitializationException, SQLException { - Properties connectionProps = new Properties(); - if (customProperties != null) { - connectionProps.putAll(customProperties); - } - connectionProps.put("user", username); - connectionProps.put("password", password); - - return DriverManager.getConnection("jdbc:mysql://" + host.getHostString() + ":" + host.getPort(), connectionProps); - } - protected void registerDriver(String locationString, String drvName) throws InitializationException { if (locationString != null && locationString.length() > 0) { try { http://git-wip-us.apache.org/repos/asf/nifi/blob/d28b1172/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index 7e8607d..5b07850 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -967,8 +967,7 @@ class CaptureChangeMySQLTest { } @Override - protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map<String, String> customProperties) - throws InitializationException, SQLException { + protected Connection getJdbcConnection() throws SQLException { Connection mockConnection = mock(Connection) Statement mockStatement = mock(Statement) when(mockConnection.createStatement()).thenReturn(mockStatement)
