Repository: nifi Updated Branches: refs/heads/master f15e6c7ab -> cf6089196
NIFI-5229 Adding a DBCPService implementation that can lookup other DBCPServices dynamically at runtime This closes #2735 Signed-off-by: Mike Thomsen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cf608919 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cf608919 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cf608919 Branch: refs/heads/master Commit: cf6089196f7780e546bdd9d36ce52f535149fabf Parents: f15e6c7 Author: Bryan Bende <[email protected]> Authored: Wed May 23 11:21:12 2018 -0400 Committer: Mike Thomsen <[email protected]> Committed: Fri May 25 08:54:03 2018 -0400 ---------------------------------------------------------------------- .../nifi/dbcp/DBCPConnectionPoolLookup.java | 140 ++++++++++++++ ...org.apache.nifi.controller.ControllerService | 3 +- .../nifi/dbcp/TestDBCPConnectionPoolLookup.java | 182 +++++++++++++++++++ 3 files changed, 324 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/cf608919/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java new file mode 100644 index 0000000..1198eb5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/java/org/apache/nifi/dbcp/DBCPConnectionPoolLookup.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dbcp; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" }) +@CapabilityDescription("Provides a DBCPService that can be used to dynamically select another DBCPService. This service " + + "requires an attribute named 'database.name' to be passed in when asking for a connection, and will throw an exception " + + "if the attribute is missing. The value of 'database.name' will be used to select the DBCPService that has been " + + "registered with that name. This will allow multiple DBCPServices to be defined and registered, and then selected " + + "dynamically at runtime by tagging flow files with the appropriate 'database.name' attribute.") +@DynamicProperty(name = "The ", value = "JDBC property value", expressionLanguageScope = ExpressionLanguageScope.NONE, + description = "") +public class DBCPConnectionPoolLookup extends AbstractControllerService implements DBCPService { + + public static final String DATABASE_NAME_ATTRIBUTE = "database.name"; + + private volatile Map<String,DBCPService> dbcpServiceMap; + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("The DBCPService to return when database.name = '" + propertyDescriptorName + "'") + .identifiesControllerService(DBCPService.class) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext context) { + final List<ValidationResult> results = new ArrayList<>(); + + int numDefinedServices = 0; + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (descriptor.isDynamic()) { + numDefinedServices++; + } + + final String referencedId = context.getProperty(descriptor).getValue(); + if (this.getIdentifier().equals(referencedId)) { + results.add(new ValidationResult.Builder() + .subject(descriptor.getDisplayName()) + .explanation("the current service cannot be registered as a DBCPService to lookup") + .valid(false) + .build()); + } + } + + if (numDefinedServices == 0) { + results.add(new ValidationResult.Builder() + .subject(this.getClass().getSimpleName()) + .explanation("at least one DBCPService must be defined via dynamic properties") + .valid(false) + .build()); + } + + return results; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final Map<String,DBCPService> serviceMap = new HashMap<>(); + + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (descriptor.isDynamic()) { + final DBCPService dbcpService = context.getProperty(descriptor).asControllerService(DBCPService.class); + serviceMap.put(descriptor.getName(), dbcpService); + } + } + + dbcpServiceMap = Collections.unmodifiableMap(serviceMap); + } + + @OnDisabled + public void onDisabled() { + dbcpServiceMap = null; + } + + @Override + public Connection getConnection() throws ProcessException { + throw new UnsupportedOperationException("Cannot lookup DBCPConnectionPool without attributes"); + } + + @Override + public Connection getConnection(Map<String, String> attributes) throws ProcessException { + if (!attributes.containsKey(DATABASE_NAME_ATTRIBUTE)) { + throw new ProcessException("Attributes must contain an attribute name '" + DATABASE_NAME_ATTRIBUTE + "'"); + } + + final String databaseName = attributes.get(DATABASE_NAME_ATTRIBUTE); + if (StringUtils.isBlank(databaseName)) { + throw new ProcessException(DATABASE_NAME_ATTRIBUTE + " cannot be null or blank"); + } + + final DBCPService dbcpService = dbcpServiceMap.get(databaseName); + if (dbcpService == null) { + throw new ProcessException("No DBCPService was found for database.name '" + databaseName + "'"); + } + + return dbcpService.getConnection(attributes); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/cf608919/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index d022695..eaff270 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -12,4 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.dbcp.DBCPConnectionPool \ No newline at end of file +org.apache.nifi.dbcp.DBCPConnectionPool +org.apache.nifi.dbcp.DBCPConnectionPoolLookup \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/cf608919/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java new file mode 100644 index 0000000..635af2e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-dbcp-service-bundle/nifi-dbcp-service/src/test/java/org/apache/nifi/dbcp/TestDBCPConnectionPoolLookup.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.dbcp; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestDBCPConnectionPoolLookup { + + private MockConnection connectionA; + private MockConnection connectionB; + + private MockDBCPService dbcpServiceA; + private MockDBCPService dbcpServiceB; + + private DBCPService dbcpLookupService; + private TestRunner runner; + + @Before + public void setup() throws InitializationException { + connectionA = mock(MockConnection.class); + when(connectionA.getName()).thenReturn("A"); + + connectionB = mock(MockConnection.class); + when(connectionB.getName()).thenReturn("B"); + + dbcpServiceA = new MockDBCPService(connectionA); + dbcpServiceB = new MockDBCPService(connectionB); + + dbcpLookupService = new DBCPConnectionPoolLookup(); + + runner = TestRunners.newTestRunner(TestProcessor.class); + + final String dbcpServiceAIdentifier = "dbcp-a"; + runner.addControllerService(dbcpServiceAIdentifier, dbcpServiceA); + + final String dbcpServiceBIdentifier = "dbcp-b"; + runner.addControllerService(dbcpServiceBIdentifier, dbcpServiceB); + + runner.addControllerService("dbcp-lookup", dbcpLookupService); + runner.setProperty(dbcpLookupService, "a", dbcpServiceAIdentifier); + runner.setProperty(dbcpLookupService, "b", dbcpServiceBIdentifier); + + runner.enableControllerService(dbcpServiceA); + runner.enableControllerService(dbcpServiceB); + runner.enableControllerService(dbcpLookupService); + + } + + @Test + public void testLookupServiceA() { + final Map<String,String> attributes = new HashMap<>(); + attributes.put(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "a"); + + final Connection connection = dbcpLookupService.getConnection(attributes); + assertNotNull(connection); + assertTrue(connection instanceof MockConnection); + + final MockConnection mockConnection = (MockConnection)connection; + assertEquals(connectionA.getName(), mockConnection.getName()); + } + + @Test + public void testLookupServiceB() { + final Map<String,String> attributes = new HashMap<>(); + attributes.put(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "b"); + + final Connection connection = dbcpLookupService.getConnection(attributes); + assertNotNull(connection); + assertTrue(connection instanceof MockConnection); + + final MockConnection mockConnection = (MockConnection)connection; + assertEquals(connectionB.getName(), mockConnection.getName()); + } + + @Test(expected = UnsupportedOperationException.class) + public void testLookupWithoutAttributes() { + dbcpLookupService.getConnection(); + } + + @Test(expected = ProcessException.class) + public void testLookupMissingDatabaseNameAttribute() { + final Map<String,String> attributes = new HashMap<>(); + dbcpLookupService.getConnection(attributes); + } + + @Test(expected = ProcessException.class) + public void testLookupWithDatabaseNameThatDoesNotExist() { + final Map<String,String> attributes = new HashMap<>(); + attributes.put(DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE, "DOES-NOT-EXIST"); + dbcpLookupService.getConnection(attributes); + } + + @Test + public void testCustomValidateAtLeaseOneServiceDefined() throws InitializationException { + // enable lookup service with no services registered, verify not valid + runner = TestRunners.newTestRunner(TestProcessor.class); + runner.addControllerService("dbcp-lookup", dbcpLookupService); + runner.enableControllerService(dbcpLookupService); + runner.assertNotValid(dbcpLookupService); + + final String dbcpServiceAIdentifier = "dbcp-a"; + runner.addControllerService(dbcpServiceAIdentifier, dbcpServiceA); + runner.enableControllerService(dbcpServiceA); + + // register a service and now verify valid + runner.disableControllerService(dbcpLookupService); + runner.setProperty(dbcpLookupService, "a", dbcpServiceAIdentifier); + runner.enableControllerService(dbcpLookupService); + runner.assertValid(dbcpLookupService); + } + + @Test + public void testCustomValidateSelfReferenceNotAllowed() throws InitializationException { + runner = TestRunners.newTestRunner(TestProcessor.class); + runner.addControllerService("dbcp-lookup", dbcpLookupService); + runner.setProperty(dbcpLookupService, "dbcp-lookup", "dbcp-lookup"); + runner.enableControllerService(dbcpLookupService); + runner.assertNotValid(dbcpLookupService); + } + + /** + * A mock DBCPService that will always return the passed in MockConnection. + */ + private static class MockDBCPService extends AbstractControllerService implements DBCPService { + + private MockConnection connection; + + public MockDBCPService(MockConnection connection) { + this.connection = connection; + } + + @Override + public Connection getConnection() throws ProcessException { + return connection; + } + + @Override + public Connection getConnection(Map<String, String> attributes) throws ProcessException { + return connection; + } + } + + /** + * A mock Connection that will allow us to mock getName so we can identify we have the expected Connection. + */ + private interface MockConnection extends Connection { + + String getName(); + + } + +}
