Repository: nifi
Updated Branches:
  refs/heads/master f15e6c7ab -> cf6089196


NIFI-5229 Adding a DBCPService implementation that can lookup other 
DBCPServices dynamically at runtime

This closes #2735

Signed-off-by: Mike Thomsen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cf608919
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cf608919
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cf608919

Branch: refs/heads/master
Commit: cf6089196f7780e546bdd9d36ce52f535149fabf
Parents: f15e6c7
Author: Bryan Bende <[email protected]>
Authored: Wed May 23 11:21:12 2018 -0400
Committer: Mike Thomsen <[email protected]>
Committed: Fri May 25 08:54:03 2018 -0400

----------------------------------------------------------------------
 .../nifi/dbcp/DBCPConnectionPoolLookup.java     | 140 ++++++++++++++
 ...org.apache.nifi.controller.ControllerService |   3 +-
 .../nifi/dbcp/TestDBCPConnectionPoolLookup.java | 182 +++++++++++++++++++
 3 files changed, 324 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/cf608919/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java
new file mode 100644
index 0000000..1198eb5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
+@CapabilityDescription("Provides a DBCPService that can be used to dynamically 
select another DBCPService. This service " +
+        "requires an attribute named 'database.name' to be passed in when 
asking for a connection, and will throw an exception " +
+        "if the attribute is missing. The value of 'database.name' will be 
used to select the DBCPService that has been " +
+        "registered with that name. This will allow multiple DBCPServices to 
be defined and registered, and then selected " +
+        "dynamically at runtime by tagging flow files with the appropriate 
'database.name' attribute.")
+@DynamicProperty(name = "The ", value = "JDBC property value", 
expressionLanguageScope = ExpressionLanguageScope.NONE,
+        description = "")
+public class DBCPConnectionPoolLookup extends AbstractControllerService 
implements DBCPService {
+
+    public static final String DATABASE_NAME_ATTRIBUTE = "database.name";
+
+    private volatile Map<String,DBCPService> dbcpServiceMap;
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .description("The DBCPService to return when database.name = 
'" + propertyDescriptorName + "'")
+                .identifiesControllerService(DBCPService.class)
+                .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+                .build();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        int numDefinedServices = 0;
+        for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
+            if (descriptor.isDynamic()) {
+                numDefinedServices++;
+            }
+
+            final String referencedId = 
context.getProperty(descriptor).getValue();
+            if (this.getIdentifier().equals(referencedId)) {
+                results.add(new ValidationResult.Builder()
+                        .subject(descriptor.getDisplayName())
+                        .explanation("the current service cannot be registered 
as a DBCPService to lookup")
+                        .valid(false)
+                        .build());
+            }
+        }
+
+        if (numDefinedServices == 0) {
+            results.add(new ValidationResult.Builder()
+                    .subject(this.getClass().getSimpleName())
+                    .explanation("at least one DBCPService must be defined via 
dynamic properties")
+                    .valid(false)
+                    .build());
+        }
+
+        return results;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final Map<String,DBCPService> serviceMap = new HashMap<>();
+
+        for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
+            if (descriptor.isDynamic()) {
+                final DBCPService dbcpService = 
context.getProperty(descriptor).asControllerService(DBCPService.class);
+                serviceMap.put(descriptor.getName(), dbcpService);
+            }
+        }
+
+        dbcpServiceMap = Collections.unmodifiableMap(serviceMap);
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        dbcpServiceMap = null;
+    }
+
+    @Override
+    public Connection getConnection() throws ProcessException {
+        throw new UnsupportedOperationException("Cannot lookup 
DBCPConnectionPool without attributes");
+    }
+
+    @Override
+    public Connection getConnection(Map<String, String> attributes) throws 
ProcessException {
+        if (!attributes.containsKey(DATABASE_NAME_ATTRIBUTE)) {
+            throw new ProcessException("Attributes must contain an attribute 
name '" + DATABASE_NAME_ATTRIBUTE + "'");
+        }
+
+        final String databaseName = attributes.get(DATABASE_NAME_ATTRIBUTE);
+        if (StringUtils.isBlank(databaseName)) {
+            throw new ProcessException(DATABASE_NAME_ATTRIBUTE + " cannot be 
null or blank");
+        }
+
+        final DBCPService dbcpService = dbcpServiceMap.get(databaseName);
+        if (dbcpService == null) {
+            throw new ProcessException("No DBCPService was found for 
database.name '" + databaseName + "'");
+        }
+
+        return dbcpService.getConnection(attributes);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf608919/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index d022695..eaff270 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,4 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.dbcp.DBCPConnectionPool
\ No newline at end of file
+org.apache.nifi.dbcp.DBCPConnectionPool
+org.apache.nifi.dbcp.DBCPConnectionPoolLookup
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/cf608919/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java
new file mode 100644
index 0000000..635af2e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.dbcp;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestDBCPConnectionPoolLookup {
+
+    private MockConnection connectionA;
+    private MockConnection connectionB;
+
+    private MockDBCPService dbcpServiceA;
+    private MockDBCPService dbcpServiceB;
+
+    private DBCPService dbcpLookupService;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException {
+        connectionA = mock(MockConnection.class);
+        when(connectionA.getName()).thenReturn("A");
+
+        connectionB = mock(MockConnection.class);
+        when(connectionB.getName()).thenReturn("B");
+
+        dbcpServiceA = new MockDBCPService(connectionA);
+        dbcpServiceB = new MockDBCPService(connectionB);
+
+        dbcpLookupService = new DBCPConnectionPoolLookup();
+
+        runner = TestRunners.newTestRunner(TestProcessor.class);
+
+        final String dbcpServiceAIdentifier = "dbcp-a";
+        runner.addControllerService(dbcpServiceAIdentifier, dbcpServiceA);
+
+        final String dbcpServiceBIdentifier = "dbcp-b";
+        runner.addControllerService(dbcpServiceBIdentifier, dbcpServiceB);
+
+        runner.addControllerService("dbcp-lookup", dbcpLookupService);
+        runner.setProperty(dbcpLookupService, "a", dbcpServiceAIdentifier);
+        runner.setProperty(dbcpLookupService, "b", dbcpServiceBIdentifier);
+
+        runner.enableControllerService(dbcpServiceA);
+        runner.enableControllerService(dbcpServiceB);
+        runner.enableControllerService(dbcpLookupService);
+
+    }
+
+    @Test
+    public void testLookupServiceA() {
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "a");
+
+        final Connection connection = 
dbcpLookupService.getConnection(attributes);
+        assertNotNull(connection);
+        assertTrue(connection instanceof MockConnection);
+
+        final MockConnection mockConnection = (MockConnection)connection;
+        assertEquals(connectionA.getName(), mockConnection.getName());
+    }
+
+    @Test
+    public void testLookupServiceB() {
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "b");
+
+        final Connection connection = 
dbcpLookupService.getConnection(attributes);
+        assertNotNull(connection);
+        assertTrue(connection instanceof MockConnection);
+
+        final MockConnection mockConnection = (MockConnection)connection;
+        assertEquals(connectionB.getName(), mockConnection.getName());
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testLookupWithoutAttributes() {
+        dbcpLookupService.getConnection();
+    }
+
+    @Test(expected = ProcessException.class)
+    public void testLookupMissingDatabaseNameAttribute() {
+        final Map<String,String> attributes = new HashMap<>();
+        dbcpLookupService.getConnection(attributes);
+    }
+
+    @Test(expected = ProcessException.class)
+    public void testLookupWithDatabaseNameThatDoesNotExist() {
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, 
"DOES-NOT-EXIST");
+        dbcpLookupService.getConnection(attributes);
+    }
+
+    @Test
+    public void testCustomValidateAtLeaseOneServiceDefined() throws 
InitializationException {
+        // enable lookup service with no services registered, verify not valid
+        runner = TestRunners.newTestRunner(TestProcessor.class);
+        runner.addControllerService("dbcp-lookup", dbcpLookupService);
+        runner.enableControllerService(dbcpLookupService);
+        runner.assertNotValid(dbcpLookupService);
+
+        final String dbcpServiceAIdentifier = "dbcp-a";
+        runner.addControllerService(dbcpServiceAIdentifier, dbcpServiceA);
+        runner.enableControllerService(dbcpServiceA);
+
+        // register a service and now verify valid
+        runner.disableControllerService(dbcpLookupService);
+        runner.setProperty(dbcpLookupService, "a", dbcpServiceAIdentifier);
+        runner.enableControllerService(dbcpLookupService);
+        runner.assertValid(dbcpLookupService);
+    }
+
+    @Test
+    public void testCustomValidateSelfReferenceNotAllowed() throws 
InitializationException {
+        runner = TestRunners.newTestRunner(TestProcessor.class);
+        runner.addControllerService("dbcp-lookup", dbcpLookupService);
+        runner.setProperty(dbcpLookupService, "dbcp-lookup", "dbcp-lookup");
+        runner.enableControllerService(dbcpLookupService);
+        runner.assertNotValid(dbcpLookupService);
+    }
+
+    /**
+     * A mock DBCPService that will always return the passed in MockConnection.
+     */
+    private static class MockDBCPService extends AbstractControllerService 
implements DBCPService {
+
+        private MockConnection connection;
+
+        public MockDBCPService(MockConnection connection) {
+            this.connection = connection;
+        }
+
+        @Override
+        public Connection getConnection() throws ProcessException {
+            return connection;
+        }
+
+        @Override
+        public Connection getConnection(Map<String, String> attributes) throws 
ProcessException {
+            return connection;
+        }
+    }
+
+    /**
+     * A mock Connection that will allow us to mock getName so we can identify 
we have the expected Connection.
+     */
+    private interface MockConnection extends Connection {
+
+        String getName();
+
+    }
+
+}

Reply via email to