NIFI-1175 Exposing minimum properties required to create an HBase connection on the HBaseClientService as an optional alternative to the conf files
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2b9b5e00 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2b9b5e00 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2b9b5e00 Branch: refs/heads/NIFI-655 Commit: 2b9b5e008f307ff5f31c36840ec503cacae5ff30 Parents: 453b140 Author: Bryan Bende <[email protected]> Authored: Mon Nov 16 14:01:23 2015 -0500 Committer: Bryan Bende <[email protected]> Committed: Tue Nov 17 12:01:46 2015 -0500 ---------------------------------------------------------------------- .../apache/nifi/hbase/HBaseClientService.java | 30 ++++++- .../nifi-hbase_1_1_2-client-service/pom.xml | 6 ++ .../nifi/hbase/HBase_1_1_2_ClientService.java | 93 +++++++++++++++++++- .../hbase/TestHBase_1_1_2_ClientService.java | 67 ++++++++++++++ 4 files changed, 190 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/2b9b5e00/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java index a3f2040..9ff2c46 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/HBaseClientService.java @@ -24,6 +24,7 @@ import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultHandler; import org.apache.nifi.hbase.validate.ConfigFilesValidator; +import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; import java.util.Collection; @@ -34,12 +35,35 @@ public interface HBaseClientService extends ControllerService { PropertyDescriptor HADOOP_CONF_FILES = new PropertyDescriptor.Builder() .name("Hadoop Configuration Files") - .description("Comma-separated list of Hadoop Configuration files, such as hbase-site.xml") - .required(true) - .defaultValue("./conf/hbase-site.xml") + .description("Comma-separated list of Hadoop Configuration files, such as hbase-site.xml, including full paths to the files.") .addValidator(new ConfigFilesValidator()) .build(); + PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder() + .name("ZooKeeper Quorum") + .description("Comma-separated list of ZooKeeper hosts for HBase. Required if Hadoop Configuration Files are not provided.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + PropertyDescriptor ZOOKEEPER_CLIENT_PORT = new PropertyDescriptor.Builder() + .name("ZooKeeper Client Port") + .description("The port on which ZooKeeper is accepting client connections. Required if Hadoop Configuration Files are not provided.") + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + + PropertyDescriptor ZOOKEEPER_ZNODE_PARENT = new PropertyDescriptor.Builder() + .name("ZooKeeper ZNode Parent") + .description("The ZooKeeper ZNode Parent value for HBase (example: /hbase). Required if Hadoop Configuration Files are not provided.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + PropertyDescriptor HBASE_CLIENT_RETRIES = new PropertyDescriptor.Builder() + .name("HBase Client Retries") + .description("The number of times the HBase client will retry connecting. Required if Hadoop Configuration Files are not provided.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .build(); + /** * Puts a batch of mutations to the given table. * http://git-wip-us.apache.org/repos/asf/nifi/blob/2b9b5e00/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml index a90e0e3..68ea55d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/pom.xml @@ -57,6 +57,12 @@ <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + <version>${org.slf4j.version}</version> + </dependency> + <!-- test dependencies --> <dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/2b9b5e00/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java index 9c300db..42590c2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; @@ -31,11 +32,14 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.ParseFilter; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerServiceInitializationContext; @@ -43,6 +47,7 @@ import org.apache.nifi.hbase.put.PutFlowFile; import org.apache.nifi.hbase.scan.Column; import org.apache.nifi.hbase.scan.ResultCell; import org.apache.nifi.hbase.scan.ResultHandler; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; import java.io.IOException; @@ -55,9 +60,20 @@ import java.util.List; import java.util.Map; @Tags({ "hbase", "client"}) -@CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2.") +@CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2. This service can be configured by providing " + + "a comma-separated list of configuration files, or by specifying values for the other properties. If configuration files " + + "are provided, they will be loaded first, and the values of the additional properties will override the values from " + + "the configuration files. In addition, any user defined properties on the processor will also be passed to the HBase " + + "configuration.") +@DynamicProperty(name="The name of an HBase configuration property.", value="The value of the given HBase configuration property.", + description="These properties will be set on the HBase configuration after loading any provided configuration files.") public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService { + static final String HBASE_CONF_ZK_QUORUM = "hbase.zookeeper.quorum"; + static final String HBASE_CONF_ZK_PORT = "hbase.zookeeper.property.clientPort"; + static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent"; + static final String HBASE_CONF_CLIENT_RETRIES = "hbase.client.retries.number"; + private volatile Connection connection; private List<PropertyDescriptor> properties; @@ -65,6 +81,10 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme protected void init(ControllerServiceInitializationContext config) throws InitializationException { List<PropertyDescriptor> props = new ArrayList<>(); props.add(HADOOP_CONF_FILES); + props.add(ZOOKEEPER_QUORUM); + props.add(ZOOKEEPER_CLIENT_PORT); + props.add(ZOOKEEPER_ZNODE_PARENT); + props.add(HBASE_CLIENT_RETRIES); this.properties = Collections.unmodifiableList(props); } @@ -73,16 +93,83 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme return properties; } + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' in the HBase configuration.") + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + boolean confFileProvided = validationContext.getProperty(HADOOP_CONF_FILES).isSet(); + boolean zkQuorumProvided = validationContext.getProperty(ZOOKEEPER_QUORUM).isSet(); + boolean zkPortProvided = validationContext.getProperty(ZOOKEEPER_CLIENT_PORT).isSet(); + boolean znodeParentProvided = validationContext.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet(); + boolean retriesProvided = validationContext.getProperty(HBASE_CLIENT_RETRIES).isSet(); + + final List<ValidationResult> problems = new ArrayList<>(); + + if (!confFileProvided && (!zkQuorumProvided || !zkPortProvided || !znodeParentProvided || !retriesProvided)) { + problems.add(new ValidationResult.Builder() + .valid(false) + .subject(this.getClass().getSimpleName()) + .explanation("ZooKeeper Quorum, ZooKeeper Client Port, ZooKeeper ZNode Parent, and HBase Client Retries are required " + + "when Hadoop Configuration Files are not provided.") + .build()); + } + + return problems; + } + @OnEnabled public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException { this.connection = createConnection(context); + + // connection check + if (this.connection != null) { + final Admin admin = this.connection.getAdmin(); + if (admin != null) { + admin.listTableNames(); + } + } } protected Connection createConnection(final ConfigurationContext context) throws IOException { final Configuration hbaseConfig = HBaseConfiguration.create(); - for (final String configFile : context.getProperty(HADOOP_CONF_FILES).getValue().split(",")) { - hbaseConfig.addResource(new Path(configFile.trim())); + + // if conf files are provided, start with those + if (context.getProperty(HADOOP_CONF_FILES).isSet()) { + for (final String configFile : context.getProperty(HADOOP_CONF_FILES).getValue().split(",")) { + hbaseConfig.addResource(new Path(configFile.trim())); + } + } + + // override with any properties that are provided + if (context.getProperty(ZOOKEEPER_QUORUM).isSet()) { + hbaseConfig.set(HBASE_CONF_ZK_QUORUM, context.getProperty(ZOOKEEPER_QUORUM).getValue()); + } + if (context.getProperty(ZOOKEEPER_CLIENT_PORT).isSet()) { + hbaseConfig.set(HBASE_CONF_ZK_PORT, context.getProperty(ZOOKEEPER_CLIENT_PORT).getValue()); } + if (context.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet()) { + hbaseConfig.set(HBASE_CONF_ZNODE_PARENT, context.getProperty(ZOOKEEPER_ZNODE_PARENT).getValue()); + } + if (context.getProperty(HBASE_CLIENT_RETRIES).isSet()) { + hbaseConfig.set(HBASE_CONF_CLIENT_RETRIES, context.getProperty(HBASE_CLIENT_RETRIES).getValue()); + } + + // add any dynamic properties to the HBase configuration + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + final PropertyDescriptor descriptor = entry.getKey(); + if (descriptor.isDynamic()) { + hbaseConfig.set(descriptor.getName(), entry.getValue()); + } + } + return ConnectionFactory.createConnection(hbaseConfig); } http://git-wip-us.apache.org/repos/asf/nifi/blob/2b9b5e00/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java index 71dd51b..1575f3c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientService.java @@ -56,6 +56,73 @@ import static org.mockito.Mockito.when; public class TestHBase_1_1_2_ClientService { @Test + public void testCustomValidate() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + + final String tableName = "nifi"; + final Table table = Mockito.mock(Table.class); + when(table.getName()).thenReturn(TableName.valueOf(tableName)); + + // no conf file or zk properties so should be invalid + MockHBaseClientService service = new MockHBaseClientService(table); + runner.addControllerService("hbaseClientService", service); + runner.enableControllerService(service); + + runner.assertNotValid(service); + runner.removeControllerService(service); + + // conf file with no zk properties should be valid + service = new MockHBaseClientService(table); + runner.addControllerService("hbaseClientService", service); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml"); + runner.enableControllerService(service); + + runner.assertValid(service); + runner.removeControllerService(service); + + // only quorum and no conf file should be invalid + service = new MockHBaseClientService(table); + runner.addControllerService("hbaseClientService", service); + runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); + runner.enableControllerService(service); + + runner.assertNotValid(service); + runner.removeControllerService(service); + + // quorum and port, no znode, no conf file, should be invalid + service = new MockHBaseClientService(table); + runner.addControllerService("hbaseClientService", service); + runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); + runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); + runner.enableControllerService(service); + + runner.assertNotValid(service); + runner.removeControllerService(service); + + // quorum, port, and znode, no conf file, should be valid + service = new MockHBaseClientService(table); + runner.addControllerService("hbaseClientService", service); + runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); + runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); + runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_ZNODE_PARENT, "/hbase"); + runner.enableControllerService(service); + + runner.assertValid(service); + runner.removeControllerService(service); + + // quorum and port with conf file should be valid + service = new MockHBaseClientService(table); + runner.addControllerService("hbaseClientService", service); + runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml"); + runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); + runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); + runner.enableControllerService(service); + + runner.assertValid(service); + runner.removeControllerService(service); + } + + @Test public void testSinglePut() throws InitializationException, IOException { final String tableName = "nifi"; final String row = "row1";
