This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 2ab9997 NIFI-6662: Adding Kudu Lookup Service NIFI-6662: Cleaning up
Kudu logic NIFI-6662: Minor enhancements and build fixes NIFI-6662: This closes
#3732.
2ab9997 is described below
commit 2ab99970b778b09e3544867310458acd0913fbbc
Author: samhjelmfelt <[email protected]>
AuthorDate: Thu Sep 12 18:55:37 2019 -0500
NIFI-6662: Adding Kudu Lookup Service
NIFI-6662: Cleaning up Kudu logic
NIFI-6662: Minor enhancements and build fixes
NIFI-6662: This closes #3732.
Signed-off-by: Joe Witt <[email protected]>
---
.../nifi-kudu-controller-service/pom.xml | 152 +++++++++
.../nifi/controller/kudu/KuduLookupService.java | 354 +++++++++++++++++++++
.../org.apache.nifi.controller.ControllerService | 16 +
.../controller/kudu/TestKuduLookupService.java | 234 ++++++++++++++
.../nifi-kudu-bundle/nifi-kudu-nar/pom.xml | 5 +
nifi-nar-bundles/nifi-kudu-bundle/pom.xml | 2 +-
6 files changed, 762 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
new file mode 100644
index 0000000..1f8c6a2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
@@ -0,0 +1,152 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>nifi-kudu-bundle</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.10.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-kudu-controller-service</artifactId>
+ <packaging>jar</packaging>
+
+
+ <properties>
+ <exclude.tests>None</exclude.tests>
+ <kudu.version>1.10.0</kudu.version>
+ </properties>
+ <build>
+ <extensions>
+ <!-- Used to find the right kudu-binary artifact with the Maven
+ property ${os.detected.classifier} -->
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.6.2</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>${exclude.tests}</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-lookup-service-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-security-utils</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-client</artifactId>
+ <version>${kudu.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-test-utils</artifactId>
+ <version>${kudu.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <profiles>
+ <profile>
+ <id>kudu-windows</id>
+ <activation>
+ <os>
+ <family>Windows</family>
+ </os>
+ </activation>
+ <properties>
+ <!-- Kudu tests do not support Windows. -->
+ <exclude.tests>**/*.java</exclude.tests>
+ </properties>
+ </profile>
+ <profile>
+ <id>kudu-linux</id>
+ <activation>
+ <os>
+ <family>Unix</family>
+ </os>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-binary</artifactId>
+ <version>${kudu.version}</version>
+ <classifier>${os.detected.classifier}</classifier>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>kudu-mac</id>
+ <activation>
+ <os>
+ <family>mac</family>
+ </os>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kudu</groupId>
+ <artifactId>kudu-binary</artifactId>
+ <version>${kudu.version}</version>
+ <classifier>${os.detected.classifier}</classifier>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+</project>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
new file mode 100644
index 0000000..b044f9a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.kudu;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.ReplicaSelection;
+import org.apache.kudu.client.RowResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.lookup.RecordLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import javax.security.auth.login.LoginException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@CapabilityDescription("Lookup a record from Kudu Server associated with the
specified key. Binary columns are base64 encoded. Only one matched row will be
returned")
+@Tags({"lookup", "enrich", "key", "value", "kudu"})
+public class KuduLookupService extends AbstractControllerService implements
RecordLookupService {
+
+ public static final PropertyDescriptor KUDU_MASTERS = new
PropertyDescriptor.Builder()
+ .name("kudu-lu-masters")
+ .displayName("Kudu Masters")
+ .description("Comma separated addresses of the Kudu masters to
connect to.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new
PropertyDescriptor.Builder()
+ .name("kudu-lu-kerberos-credentials-service")
+ .displayName("Kerberos Credentials Service")
+ .description("Specifies the Kerberos Credentials to use for
authentication")
+ .required(false)
+ .identifiesControllerService(KerberosCredentialsService.class)
+ .build();
+
+ public static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new
PropertyDescriptor.Builder()
+ .name("kudu-lu-operations-timeout-ms")
+ .displayName("Kudu Operation Timeout")
+ .description("Default timeout used for user operations (using
sessions and scanners)")
+ .required(false)
+ .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS + "ms")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final AllowableValue CLOSEST_REPLICA = new
AllowableValue(ReplicaSelection.CLOSEST_REPLICA.toString(),
ReplicaSelection.CLOSEST_REPLICA.name(),
+ "Select the closest replica to the client. Replicas are classified
from closest to furthest as follows: "+
+ "1) Local replicas 2) Replicas whose tablet server has the
same location as the client 3) All other replicas");
+ public static final AllowableValue LEADER_ONLY = new
AllowableValue(ReplicaSelection.LEADER_ONLY.toString(),
ReplicaSelection.LEADER_ONLY.name(),
+ "Select the LEADER replica");
+ public static final PropertyDescriptor KUDU_REPLICA_SELECTION = new
PropertyDescriptor.Builder()
+ .name("kudu-lu-replica-selection")
+ .displayName("Kudu Replica Selection")
+ .description("Policy with which to choose amongst multiple
replicas")
+ .required(true)
+ .defaultValue(CLOSEST_REPLICA.getValue())
+ .allowableValues(CLOSEST_REPLICA, LEADER_ONLY)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ public static final PropertyDescriptor TABLE_NAME = new
PropertyDescriptor.Builder()
+ .name("kudu-lu-table-name")
+ .displayName("Kudu Table Name")
+ .description("Name of the table to access.")
+ .required(true)
+ .defaultValue("default")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ public static final PropertyDescriptor RETURN_COLUMNS = new
PropertyDescriptor.Builder()
+ .name("kudu-lu-return-cols")
+ .displayName("Kudu Return Columns")
+ .description("A comma-separated list of columns to return when
scanning. To return all columns set to \"*\"")
+ .required(true)
+ .defaultValue("*")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+
+ protected List<PropertyDescriptor> properties;
+
+ protected KerberosCredentialsService credentialsService;
+ private volatile KerberosUser kerberosUser;
+
+ protected String kuduMasters;
+ protected KuduClient kuduClient;
+ protected ReplicaSelection replicaSelection;
+ protected volatile String tableName;
+ protected volatile KuduTable table;
+ protected volatile List<String> columnNames;
+
+ protected volatile RecordSchema resultSchema;
+ protected volatile Schema tableSchema;
+
+ @Override
+ protected void init(final ControllerServiceInitializationContext context) {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(KUDU_MASTERS);
+ properties.add(KERBEROS_CREDENTIALS_SERVICE);
+ properties.add(KUDU_OPERATION_TIMEOUT_MS);
+ properties.add(KUDU_REPLICA_SELECTION);
+ properties.add(TABLE_NAME);
+ properties.add(RETURN_COLUMNS);
+ addProperties(properties);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ protected void addProperties(List<PropertyDescriptor> properties) {
+ }
+
+ protected void createKuduClient(ConfigurationContext context) throws
LoginException {
+ final String kuduMasters =
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+ final KerberosCredentialsService credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+ if (credentialsService != null) {
+ final String keytab = credentialsService.getKeytab();
+ final String principal = credentialsService.getPrincipal();
+ kerberosUser = loginKerberosUser(principal, keytab);
+
+ final KerberosAction<KuduClient> kerberosAction = new
KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context),
getLogger());
+ this.kuduClient = kerberosAction.execute();
+ } else {
+ this.kuduClient = buildClient(kuduMasters, context);
+ }
+ }
+
+ protected KerberosUser loginKerberosUser(final String principal, final
String keytab) throws LoginException {
+ final KerberosUser kerberosUser = new KerberosKeytabUser(principal,
keytab);
+ kerberosUser.login();
+ return kerberosUser;
+ }
+
+ protected KuduClient buildClient(final String masters, final
ConfigurationContext context) {
+ final Integer operationTimeout =
context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+
+ return new KuduClient.KuduClientBuilder(masters)
+ .defaultOperationTimeoutMs(operationTimeout)
+ .build();
+ }
+
+ /**
+ * Establish a connection to a Kudu cluster.
+ * @param context the configuration context
+ * @throws InitializationException if unable to connect a Kudu cluster
+ */
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) throws
InitializationException {
+
+ try {
+ kuduMasters =
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+ credentialsService =
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+ if (kuduClient == null) {
+ getLogger().debug("Setting up Kudu connection...");
+
+ createKuduClient(context);
+ getLogger().debug("Kudu connection successfully initialized");
+ }
+ } catch(Exception ex){
+ getLogger().error("Exception occurred while interacting with Kudu
due to " + ex.getMessage(), ex);
+ throw new InitializationException(ex);
+ }
+
+ replicaSelection =
ReplicaSelection.valueOf(context.getProperty(KUDU_REPLICA_SELECTION).getValue());
+ tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+ try {
+ table = kuduClient.openTable(tableName);
+ tableSchema = table.getSchema();
+ columnNames =
getColumns(context.getProperty(RETURN_COLUMNS).getValue());
+
+ //Result Schema
+ resultSchema = kuduSchemaToNiFiSchema(tableSchema, columnNames);
+
+ } catch (KuduException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public Set<String> getRequiredKeys() {
+ return new HashSet<>();
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Optional<Record> lookup(Map<String, Object> coordinates) {
+
+ //Scanner
+ KuduScanner.KuduScannerBuilder builder =
kuduClient.newScannerBuilder(table);
+
+ builder.setProjectedColumnNames(columnNames);
+ builder.replicaSelection(replicaSelection);
+
+ //Only expecting one match
+ builder.limit(1);
+
+ coordinates.forEach((key,value)->
+
builder.addPredicate(KuduPredicate.newComparisonPredicate(tableSchema.getColumn(key),
KuduPredicate.ComparisonOp.EQUAL, value))
+ );
+
+ KuduScanner kuduScanner = builder.build();
+
+ //Run lookup
+ for ( RowResult row : kuduScanner){
+ final Map<String, Object> values = new HashMap<>();
+ for(String columnName : columnNames){
+ Object object;
+ if(row.getColumnType(columnName) == Type.BINARY){
+ object =
Base64.getEncoder().encodeToString(row.getBinaryCopy(columnName));
+ } else {
+ object = row.getObject(columnName);
+ }
+ values.put(columnName, object);
+ }
+ return Optional.of(new MapRecord(resultSchema, values));
+ }
+
+ //No match
+ return Optional.empty();
+ }
+
+ private List<String> getColumns(String columns){
+ if(columns.equals("*")){
+ return tableSchema
+ .getColumns()
+ .stream().map(ColumnSchema::getName)
+ .collect(Collectors.toList());
+ } else {
+ return Arrays.asList(columns.split(","));
+ }
+ }
+
+ private RecordSchema kuduSchemaToNiFiSchema(Schema kuduTableSchema,
List<String> columnNames){
+ final List<RecordField> fields = new ArrayList<>();
+ for(String columnName : columnNames) {
+ if(!kuduTableSchema.hasColumn(columnName)){
+ throw new IllegalArgumentException("Column not found in Kudu
table schema " + columnName);
+ }
+ ColumnSchema cs = kuduTableSchema.getColumn(columnName);
+ switch (cs.getType()) {
+ case INT8:
+ fields.add(new RecordField(cs.getName(),
RecordFieldType.BYTE.getDataType()));
+ break;
+ case INT16:
+ fields.add(new RecordField(cs.getName(),
RecordFieldType.SHORT.getDataType()));
+ break;
+ case INT32:
+ fields.add(new RecordField(cs.getName(),
RecordFieldType.INT.getDataType()));
+ break;
+ case INT64:
+ fields.add(new RecordField(cs.getName(),
RecordFieldType.LONG.getDataType()));
+ break;
+ case UNIXTIME_MICROS:
+ fields.add(new RecordField(cs.getName(),
RecordFieldType.TIMESTAMP.getDataType()));
+ break;
+ case BINARY:
+ case STRING:
+ case DECIMAL:
+ fields.add(new RecordField(cs.getName(),
RecordFieldType.STRING.getDataType()));
+ break;
+ case DOUBLE:
+ fields.add(new RecordField(cs.getName(),
RecordFieldType.DOUBLE.getDataType()));
+ break;
+ case BOOL:
+ fields.add(new RecordField(cs.getName(),
RecordFieldType.BOOLEAN.getDataType()));
+ break;
+ case FLOAT:
+ fields.add(new RecordField(cs.getName(),
RecordFieldType.FLOAT.getDataType()));
+ break;
+ }
+ }
+ return new SimpleRecordSchema(fields);
+ }
+
+ /**
+ * Disconnect from the Kudu cluster.
+ */
+ @OnDisabled
+ public void onDisabled() throws Exception {
+ try {
+ if (this.kuduClient != null) {
+ getLogger().debug("Closing KuduClient");
+ this.kuduClient.close();
+ this.kuduClient = null;
+ }
+ } finally {
+ if (kerberosUser != null) {
+ kerberosUser.logout();
+ kerberosUser = null;
+ }
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..240825c
--- /dev/null
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.controller.kudu.KuduLookupService
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java
new file mode 100644
index 0000000..0b746fe
--- /dev/null
+++
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.kudu;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.cluster.MiniKuduCluster;
+import org.apache.kudu.util.DecimalUtil;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestKuduLookupService {
+
+ // The KuduTestHarness automatically starts and stops a real Kudu cluster
+ // when each test is run. Kudu persists its on-disk state in a temporary
+ // directory under a location defined by the environment variable
TEST_TMPDIR
+ // if set, or under /tmp otherwise. That cluster data is deleted on
+ // successful exit of the test. The cluster output is logged through slf4j.
+ @Rule
+ public KuduTestHarness harness = new KuduTestHarness(
+ new MiniKuduCluster.MiniKuduClusterBuilder()
+ .addMasterServerFlag("--use_hybrid_clock=false")
+ .addTabletServerFlag("--use_hybrid_clock=false")
+ );
+ private TestRunner testRunner;
+ private long nowMillis = System.currentTimeMillis();
+ private KuduLookupService kuduLookupService;
+
+ public static class SampleProcessor extends AbstractProcessor {
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+
+ }
+ }
+
+ @Before
+ public void init() throws Exception {
+ testRunner = TestRunners.newTestRunner(SampleProcessor.class);
+ testRunner.setValidateExpressionUsage(false);
+ final String tableName = "table1";
+
+ KuduClient client = harness.getClient();
+ List<ColumnSchema> columns = new ArrayList<>();
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("string",
Type.STRING).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("binary",
Type.BINARY).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("bool",
Type.BOOL).build());
+ columns.add(new ColumnSchema
+ .ColumnSchemaBuilder("decimal", Type.DECIMAL)
+
.typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL64_PRECISION,
1))
+ .build()
+ );
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("double",
Type.DOUBLE).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("float",
Type.FLOAT).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("int8",
Type.INT8).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("int16",
Type.INT16).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("int32",
Type.INT32).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("int64",
Type.INT64).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros",
Type.UNIXTIME_MICROS).build());
+ Schema schema = new Schema(columns);
+
+ CreateTableOptions opts = new
CreateTableOptions().setRangePartitionColumns(Collections.singletonList("string"));
+ client.createTable(tableName, schema, opts);
+
+ KuduTable table = client.openTable(tableName);
+ KuduSession session = client.newSession();
+
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addString("string", "string1");
+ row.addBinary("binary", "binary1".getBytes());
+ row.addBoolean("bool",true);
+ row.addDecimal("decimal", BigDecimal.valueOf(0.1));
+ row.addDouble("double",0.2);
+ row.addFloat("float",0.3f);
+ row.addByte("int8", (byte) 1);
+ row.addShort("int16", (short) 2);
+ row.addInt("int32",3);
+ row.addLong("int64",4L);
+ row.addTimestamp("unixtime_micros", new Timestamp(nowMillis));
+ session.apply(insert);
+
+ insert = table.newInsert();
+ row = insert.getRow();
+ row.addString("string", "string2");
+ row.addBinary("binary", "binary2".getBytes());
+ row.addBoolean("bool",false);
+ row.addDecimal("decimal", BigDecimal.valueOf(0.1));
+ row.addDouble("double",1.2);
+ row.addFloat("float",1.3f);
+ row.addByte("int8", (byte) 11);
+ row.addShort("int16", (short) 12);
+ row.addInt("int32",13);
+ row.addLong("int64",14L);
+ row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L *
60 * 60 * 24 * 365))); //+ 1 year
+ session.apply(insert);
+
+ session.close();
+
+ kuduLookupService = new KuduLookupService();
+ testRunner.addControllerService("kuduLookupService",
kuduLookupService);
+ testRunner.setProperty(kuduLookupService,
KuduLookupService.KUDU_MASTERS, "testLocalHost:7051");
+ testRunner.setProperty(kuduLookupService,
KuduLookupService.KUDU_REPLICA_SELECTION, KuduLookupService.LEADER_ONLY);
+ testRunner.setProperty(kuduLookupService,
KuduLookupService.TABLE_NAME, tableName);
+ kuduLookupService.kuduClient = client;
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void invalid_key() {
+ testRunner.setProperty(kuduLookupService,
KuduLookupService.RETURN_COLUMNS, "*");
+
+ testRunner.enableControllerService(kuduLookupService);
+
+ Map<String,Object> map = new HashMap<>();
+ map.put("invalid", "invalid key");
+ kuduLookupService.lookup(map);
+ }
+ @Test
+ public void row_not_found() {
+ testRunner.setProperty(kuduLookupService,
KuduLookupService.RETURN_COLUMNS, "*");
+
+ testRunner.enableControllerService(kuduLookupService);
+
+ Map<String,Object> map = new HashMap<>();
+ map.put("string", "key not found");
+ Optional<Record> result = kuduLookupService.lookup(map);
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void single_key() {
+ testRunner.setProperty(kuduLookupService,
KuduLookupService.RETURN_COLUMNS, "*");
+
+ testRunner.enableControllerService(kuduLookupService);
+
+ Map<String,Object> map = new HashMap<>();
+ map.put("string", "string1");
+ Record result = kuduLookupService.lookup(map).get();
+ validateRow1(result);
+ }
+ @Test
+ public void multi_key() {
+ testRunner.setProperty(kuduLookupService,
KuduLookupService.RETURN_COLUMNS, "*");
+
+ testRunner.enableControllerService(kuduLookupService);
+
+ Map<String,Object> map = new HashMap<>();
+ map.put("string", "string1");
+ map.put("binary", "binary1".getBytes());
+ map.put("bool",true);
+ map.put("decimal", BigDecimal.valueOf(0.1));
+ map.put("double",0.2);
+ map.put("float",0.3f);
+ map.put("int8", (byte) 1);
+ map.put("int16", (short) 2);
+ map.put("int32",3);
+ map.put("int64",4L);
+ map.put("unixtime_micros", new Timestamp(nowMillis));
+ Record result = kuduLookupService.lookup(map).get();
+ validateRow1(result);
+ }
+ @Test
+ public void specific_return_columns() {
+ testRunner.setProperty(kuduLookupService,
KuduLookupService.RETURN_COLUMNS, "binary,bool");
+
+ testRunner.enableControllerService(kuduLookupService);
+
+ Map<String,Object> map = new HashMap<>();
+ map.put("string", "string1");
+ Record result = kuduLookupService.lookup(map).get();
+
+ assertEquals(2,result.getValues().length);
+
+ assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()),
result.getValue("binary"));
+ assertEquals(true, result.getAsBoolean("bool"));
+ }
+ private void validateRow1(Record result){
+
+ assertEquals("string1", result.getAsString("string"));
+ assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()),
result.getValue("binary"));
+ assertEquals(true, result.getAsBoolean("bool"));
+ assertEquals(BigDecimal.valueOf(0.1), result.getValue("decimal"));
+ assertEquals(0.2, result.getAsDouble("double"),0);
+ assertEquals(0.3f, result.getAsFloat("float"),0);
+ assertEquals((byte)1, result.getValue("int8"));
+ assertEquals((short)2, result.getValue("int16"));
+ assertEquals(3, (int)result.getAsInt("int32"));
+ assertEquals(4L, (long)result.getAsLong("int64"));
+ assertEquals(new Timestamp(nowMillis),
result.getValue("unixtime_micros"));
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
index 16d4e5e..4b530a0 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
@@ -34,6 +34,11 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-kudu-controller-service</artifactId>
+ <version>1.10.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-libraries-nar</artifactId>
<version>1.10.0-SNAPSHOT</version>
<type>nar</type>
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
index 8a0759a..bea7232 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
@@ -30,6 +30,6 @@
<modules>
<module>nifi-kudu-processors</module>
<module>nifi-kudu-nar</module>
+ <module>nifi-kudu-controller-service</module>
</modules>
-
</project>