Repository: nifi
Updated Branches:
  refs/heads/master ad3d63d20 -> 4fc0e9c40


nifi-2381 Connection Pooling Service -Drop invalid connections and create new 
ones.

This closes #986.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4fc0e9c4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4fc0e9c4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4fc0e9c4

Branch: refs/heads/master
Commit: 4fc0e9c40763d05eee01b48f76cd4cf273b61330
Parents: ad3d63d
Author: Toivo Adams <toivo.ad...@gmail.com>
Authored: Sun Sep 4 13:40:24 2016 +0300
Committer: Pierre Villard <pierre.villard...@gmail.com>
Committed: Tue Sep 20 18:21:42 2016 +0200

----------------------------------------------------------------------
 .../nifi-dbcp-service/pom.xml                   |  24 +++
 .../apache/nifi/dbcp/DBCPConnectionPool.java    |  18 ++
 .../org/apache/nifi/dbcp/DBCPServiceTest.java   | 201 +++++++++++++++++++
 3 files changed, 243 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4fc0e9c4/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
index 85c460f..6891cb9 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/pom.xml
@@ -53,6 +53,30 @@
             <artifactId>derby</artifactId>
         </dependency>        
         <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derbynet</artifactId>
+            <version>10.11.1.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derbytools</artifactId>
+            <version>10.11.1.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derbyclient</artifactId>
+            <version>10.11.1.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.h2database</groupId>
+            <artifactId>h2</artifactId>
+            <version>1.4.192</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
             <version>1.3</version>

http://git-wip-us.apache.org/repos/asf/nifi/blob/4fc0e9c4/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
index 522a720..9bb5a47 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPool.java
@@ -114,6 +114,17 @@ public class DBCPConnectionPool extends 
AbstractControllerService implements DBC
         .sensitive(false)
         .build();
 
+    public static final PropertyDescriptor VALIDATION_QUERY = new 
PropertyDescriptor.Builder()
+        .name("Validation-query")
+        .displayName("Validation query")
+        .description("Validation query used to validate connections before 
returning them. "
+            + "When connection is invalid, it get's dropped and new valid 
connection will be returned. "
+            + "Note!! Using validation might have some performance penalty.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
+
     private static final List<PropertyDescriptor> properties;
 
     static {
@@ -125,6 +136,7 @@ public class DBCPConnectionPool extends 
AbstractControllerService implements DBC
         props.add(DB_PASSWORD);
         props.add(MAX_WAIT_TIME);
         props.add(MAX_TOTAL_CONNECTIONS);
+        props.add(VALIDATION_QUERY);
 
         properties = Collections.unmodifiableList(props);
     }
@@ -158,6 +170,7 @@ public class DBCPConnectionPool extends 
AbstractControllerService implements DBC
         final String passw = 
context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
         final Long maxWaitMillis = 
context.getProperty(MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
         final Integer maxTotal = 
context.getProperty(MAX_TOTAL_CONNECTIONS).asInteger();
+        final String validationQuery = 
context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
 
         dataSource = new BasicDataSource();
         dataSource.setDriverClassName(drv);
@@ -171,6 +184,11 @@ public class DBCPConnectionPool extends 
AbstractControllerService implements DBC
         dataSource.setMaxWait(maxWaitMillis);
         dataSource.setMaxActive(maxTotal);
 
+        if (validationQuery!=null && !validationQuery.isEmpty()) {
+            dataSource.setValidationQuery(validationQuery);
+            dataSource.setTestOnBorrow(true);
+        }
+
         dataSource.setUrl(dburl);
         dataSource.setUsername(user);
         dataSource.setPassword(passw);

http://git-wip-us.apache.org/repos/asf/nifi/blob/4fc0e9c4/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java
index 1e2b8d5..6234cbe 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/DBCPServiceTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.nifi.dbcp;
 
+import org.apache.derby.drda.NetworkServerControl;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.h2.jdbc.JdbcSQLException;
+import org.h2.tools.Server;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -28,6 +31,8 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.File;
+import java.io.PrintWriter;
+import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -42,6 +47,7 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class DBCPServiceTest {
 
@@ -167,6 +173,201 @@ public class DBCPServiceTest {
     }
 
     /**
+     * Test Drop invalid connections and create new ones.
+     * Default behavior, invalid connections in pool.
+     */
+    @Test
+    public void testDropInvalidConnectionsH2_Default() throws Exception {
+
+        // start the H2 TCP Server
+        String[] args = new String[0];
+        Server server = Server.createTcpServer(args).start();
+
+        final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+        final DBCPConnectionPool service = new DBCPConnectionPool();
+        runner.addControllerService("test-dropcreate", service);
+
+        runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, 
"jdbc:h2:tcp://localhost/~/test");
+        runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, 
"org.h2.Driver");
+        runner.enableControllerService(service);
+
+        runner.assertValid(service);
+        final DBCPService dbcpService = (DBCPService) 
runner.getProcessContext().getControllerServiceLookup().getControllerService("test-dropcreate");
+        Assert.assertNotNull(dbcpService);
+
+        // get and verify connections
+        for (int i = 0; i < 10; i++) {
+            final Connection connection = dbcpService.getConnection();
+            System.out.println(connection);
+            Assert.assertNotNull(connection);
+            assertValidConnectionH2(connection, i);
+            connection.close();
+        }
+
+        // restart server, connections in pool should became invalid
+        server.stop();
+        server.shutdown();
+        server.start();
+
+        // Note!! We should get something like:
+        // org.h2.jdbc.JdbcSQLException: Connection is broken: "session 
closed" [90067-192]
+        exception.expect(JdbcSQLException.class);
+        for (int i = 0; i < 10; i++) {
+            final Connection connection = dbcpService.getConnection();
+            System.out.println(connection);
+            Assert.assertNotNull(connection);
+            assertValidConnectionH2(connection, i);
+            connection.close();
+        }
+
+        server.shutdown();
+    }
+
+    /**
+     * Test Drop invalid connections and create new ones.
+     * Better behavior, invalid connections are dropped and valid created.
+     */
+    @Test
+    public void testDropInvalidConnectionsH2_Better() throws Exception {
+
+        // start the H2 TCP Server
+        String[] args = new String[0];
+        Server server = Server.createTcpServer(args).start();
+
+        final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+        final DBCPConnectionPool service = new DBCPConnectionPool();
+        runner.addControllerService("test-dropcreate", service);
+
+        runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, 
"jdbc:h2:tcp://localhost/~/test");
+        runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, 
"org.h2.Driver");
+        runner.setProperty(service, DBCPConnectionPool.VALIDATION_QUERY, 
"SELECT 5");
+        runner.enableControllerService(service);
+
+        runner.assertValid(service);
+        final DBCPService dbcpService = (DBCPService) 
runner.getProcessContext().getControllerServiceLookup().getControllerService("test-dropcreate");
+        Assert.assertNotNull(dbcpService);
+
+        // get and verify connections
+        for (int i = 0; i < 10; i++) {
+            final Connection connection = dbcpService.getConnection();
+            System.out.println(connection);
+            Assert.assertNotNull(connection);
+            assertValidConnectionH2(connection, i);
+            connection.close();
+        }
+
+        // restart server, connections in pool should became invalid
+        server.stop();
+        server.shutdown();
+        server.start();
+
+        // Note!! We should not get something like:
+        // org.h2.jdbc.JdbcSQLException: Connection is broken: "session 
closed" [90067-192]
+        // Pool should remove invalid connections and create new valid 
connections.
+        for (int i = 0; i < 10; i++) {
+            final Connection connection = dbcpService.getConnection();
+            System.out.println(connection);
+            Assert.assertNotNull(connection);
+            assertValidConnectionH2(connection, i);
+            connection.close();
+        }
+
+        server.shutdown();
+    }
+
+    private void assertValidConnectionH2(Connection connection, int num) 
throws SQLException {
+        try (Statement statement = connection.createStatement()) {
+            ResultSet rs = statement.executeQuery("SELECT " + num);
+            assertTrue(rs.next());
+            int value = rs.getInt(1);
+            assertEquals(num, value);
+            assertTrue(connection.isValid(20));
+        }
+    }
+
+    /**
+     * Note!! Derby keeps something open even after server shutdown.
+     * So it's difficult to get invalid connections.
+     *
+     * Test Drop invalid connections and create new ones.
+     */
+    @Ignore
+    @Test
+    public void testDropInvalidConnectionsDerby() throws Exception {
+
+        // remove previous test database, if any
+        final File dbLocation = new File(DB_LOCATION);
+        dbLocation.delete();
+        if (dbLocation.exists())
+            throw new RuntimeException("Still exists " + 
dbLocation.getAbsolutePath());
+
+        // Start Derby server.
+        System.setProperty("derby.drda.startNetworkServer", "true");
+        System.setProperty("derby.system.home", DB_LOCATION);
+        NetworkServerControl serverControl = new 
NetworkServerControl(InetAddress.getLocalHost(),1527);
+        serverControl.start(new PrintWriter(System.out, true));
+
+        // create sample database
+        Class.forName("org.apache.derby.jdbc.ClientDriver");
+        Connection conn = 
DriverManager.getConnection("jdbc:derby://127.0.0.1:1527/sample;create=true");
+        conn.close();
+
+        final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+        final DBCPConnectionPool service = new DBCPConnectionPool();
+        runner.addControllerService("test-dropcreate", service);
+
+        // set Derby database props
+        runner.setProperty(service, DBCPConnectionPool.DATABASE_URL, 
"jdbc:derby:" + "//127.0.0.1:1527/sample");
+        runner.setProperty(service, DBCPConnectionPool.DB_USER, "tester");
+        runner.setProperty(service, DBCPConnectionPool.DB_DRIVERNAME, 
"org.apache.derby.jdbc.ClientDriver");
+
+        runner.enableControllerService(service);
+
+        runner.assertValid(service);
+        final DBCPService dbcpService = (DBCPService) 
runner.getProcessContext().getControllerServiceLookup().getControllerService("test-dropcreate");
+        Assert.assertNotNull(dbcpService);
+
+        for (int i = 0; i < 10; i++) {
+            final Connection connection = dbcpService.getConnection();
+            System.out.println(connection);
+            Assert.assertNotNull(connection);
+            assertValidConnectionDerby(connection, i);
+            connection.close();
+        }
+
+        serverControl.shutdown();
+        dbLocation.delete();
+        if (dbLocation.exists())
+            throw new RuntimeException("Still exists " + 
dbLocation.getAbsolutePath());
+        try {
+            serverControl.ping();
+        } catch (Exception e) {
+        }
+
+        Thread.sleep(2000);
+
+        for (int i = 0; i < 10; i++) {
+            final Connection connection = dbcpService.getConnection();
+            System.out.println(connection);
+            Assert.assertNotNull(connection);
+            assertValidConnectionDerby(connection, i);
+            connection.close();
+        }
+
+    }
+
+
+    private void assertValidConnectionDerby(Connection connection, int num) 
throws SQLException {
+        try (Statement statement = connection.createStatement()) {
+            ResultSet rs = statement.executeQuery("SELECT " + num + " FROM 
SYSIBM.SYSDUMMY1");
+            assertTrue(rs.next());
+            int value = rs.getInt(1);
+            assertEquals(num, value);
+            assertTrue(connection.isValid(20));
+        }
+    }
+
+    /**
      * Test get database connection using Derby. Get many times, release 
immediately and getConnection should not fail.
      */
     @Test

Reply via email to