This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ab9997  NIFI-6662: Adding Kudu Lookup Service NIFI-6662: Cleaning up 
Kudu logic NIFI-6662: Minor enhancements and build fixes NIFI-6662: This closes 
#3732.
2ab9997 is described below

commit 2ab99970b778b09e3544867310458acd0913fbbc
Author: samhjelmfelt <[email protected]>
AuthorDate: Thu Sep 12 18:55:37 2019 -0500

    NIFI-6662: Adding Kudu Lookup Service
    NIFI-6662: Cleaning up Kudu logic
    NIFI-6662: Minor enhancements and build fixes
    NIFI-6662: This closes #3732.
    
    Signed-off-by: Joe Witt <[email protected]>
---
 .../nifi-kudu-controller-service/pom.xml           | 152 +++++++++
 .../nifi/controller/kudu/KuduLookupService.java    | 354 +++++++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |  16 +
 .../controller/kudu/TestKuduLookupService.java     | 234 ++++++++++++++
 .../nifi-kudu-bundle/nifi-kudu-nar/pom.xml         |   5 +
 nifi-nar-bundles/nifi-kudu-bundle/pom.xml          |   2 +-
 6 files changed, 762 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
new file mode 100644
index 0000000..1f8c6a2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/pom.xml
@@ -0,0 +1,152 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>nifi-kudu-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.10.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-kudu-controller-service</artifactId>
+    <packaging>jar</packaging>
+
+
+    <properties>
+        <exclude.tests>None</exclude.tests>
+        <kudu.version>1.10.0</kudu.version>
+    </properties>
+    <build>
+        <extensions>
+            <!-- Used to find the right kudu-binary artifact with the Maven
+                 property ${os.detected.classifier} -->
+            <extension>
+                <groupId>kr.motd.maven</groupId>
+                <artifactId>os-maven-plugin</artifactId>
+                <version>1.6.2</version>
+            </extension>
+        </extensions>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <excludes>
+                        <exclude>${exclude.tests}</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-lookup-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-credentials-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-utils</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-client</artifactId>
+            <version>${kudu.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kudu</groupId>
+            <artifactId>kudu-test-utils</artifactId>
+            <version>${kudu.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <profiles>
+        <profile>
+            <id>kudu-windows</id>
+            <activation>
+                <os>
+                    <family>Windows</family>
+                </os>
+            </activation>
+            <properties>
+                <!-- Kudu tests do not support Windows. -->
+                <exclude.tests>**/*.java</exclude.tests>
+            </properties>
+        </profile>
+        <profile>
+            <id>kudu-linux</id>
+            <activation>
+                <os>
+                    <family>Unix</family>
+                </os>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.kudu</groupId>
+                    <artifactId>kudu-binary</artifactId>
+                    <version>${kudu.version}</version>
+                    <classifier>${os.detected.classifier}</classifier>
+                    <scope>test</scope>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>kudu-mac</id>
+            <activation>
+                <os>
+                    <family>mac</family>
+                </os>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.kudu</groupId>
+                    <artifactId>kudu-binary</artifactId>
+                    <version>${kudu.version}</version>
+                    <classifier>${os.detected.classifier}</classifier>
+                    <scope>test</scope>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+</project>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
new file mode 100644
index 0000000..b044f9a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/java/org/apache/nifi/controller/kudu/KuduLookupService.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.kudu;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.AsyncKuduClient;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduPredicate;
+import org.apache.kudu.client.KuduScanner;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.ReplicaSelection;
+import org.apache.kudu.client.RowResult;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.lookup.RecordLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosKeytabUser;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import javax.security.auth.login.LoginException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@CapabilityDescription("Lookup a record from Kudu Server associated with the 
specified key. Binary columns are base64 encoded. Only one matched row will be 
returned")
+@Tags({"lookup", "enrich", "key", "value", "kudu"})
+public class KuduLookupService extends AbstractControllerService implements 
RecordLookupService {
+
+    public static final PropertyDescriptor KUDU_MASTERS = new 
PropertyDescriptor.Builder()
+            .name("kudu-lu-masters")
+            .displayName("Kudu Masters")
+            .description("Comma separated addresses of the Kudu masters to 
connect to.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("kudu-lu-kerberos-credentials-service")
+            .displayName("Kerberos Credentials Service")
+            .description("Specifies the Kerberos Credentials to use for 
authentication")
+            .required(false)
+            .identifiesControllerService(KerberosCredentialsService.class)
+            .build();
+
+    public static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new 
PropertyDescriptor.Builder()
+            .name("kudu-lu-operations-timeout-ms")
+            .displayName("Kudu Operation Timeout")
+            .description("Default timeout used for user operations (using 
sessions and scanners)")
+            .required(false)
+            .defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS + "ms")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final AllowableValue CLOSEST_REPLICA = new 
AllowableValue(ReplicaSelection.CLOSEST_REPLICA.toString(), 
ReplicaSelection.CLOSEST_REPLICA.name(),
+            "Select the closest replica to the client. Replicas are classified 
from closest to furthest as follows: "+
+                    "1) Local replicas 2) Replicas whose tablet server has the 
same location as the client 3) All other replicas");
+    public static final AllowableValue LEADER_ONLY = new 
AllowableValue(ReplicaSelection.LEADER_ONLY.toString(), 
ReplicaSelection.LEADER_ONLY.name(),
+            "Select the LEADER replica");
+    public static final PropertyDescriptor KUDU_REPLICA_SELECTION = new 
PropertyDescriptor.Builder()
+            .name("kudu-lu-replica-selection")
+            .displayName("Kudu Replica Selection")
+            .description("Policy with which to choose amongst multiple 
replicas")
+            .required(true)
+            .defaultValue(CLOSEST_REPLICA.getValue())
+            .allowableValues(CLOSEST_REPLICA, LEADER_ONLY)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+            .name("kudu-lu-table-name")
+            .displayName("Kudu Table Name")
+            .description("Name of the table to access.")
+            .required(true)
+            .defaultValue("default")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor RETURN_COLUMNS = new 
PropertyDescriptor.Builder()
+            .name("kudu-lu-return-cols")
+            .displayName("Kudu Return Columns")
+            .description("A comma-separated list of columns to return when 
scanning. To return all columns set to \"*\"")
+            .required(true)
+            .defaultValue("*")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+
+    protected List<PropertyDescriptor> properties;
+
+    protected KerberosCredentialsService credentialsService;
+    private volatile KerberosUser kerberosUser;
+
+    protected String kuduMasters;
+    protected KuduClient kuduClient;
+    protected ReplicaSelection replicaSelection;
+    protected volatile String tableName;
+    protected volatile KuduTable table;
+    protected volatile List<String> columnNames;
+
+    protected volatile RecordSchema resultSchema;
+    protected volatile Schema tableSchema;
+
+    @Override
+    protected void init(final ControllerServiceInitializationContext context) {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(KUDU_MASTERS);
+        properties.add(KERBEROS_CREDENTIALS_SERVICE);
+        properties.add(KUDU_OPERATION_TIMEOUT_MS);
+        properties.add(KUDU_REPLICA_SELECTION);
+        properties.add(TABLE_NAME);
+        properties.add(RETURN_COLUMNS);
+        addProperties(properties);
+        this.properties = Collections.unmodifiableList(properties);
+    }
+
+    protected void addProperties(List<PropertyDescriptor> properties) {
+    }
+
+    protected void createKuduClient(ConfigurationContext context) throws 
LoginException {
+        final String kuduMasters = 
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+        final KerberosCredentialsService credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+        if (credentialsService != null) {
+            final String keytab = credentialsService.getKeytab();
+            final String principal = credentialsService.getPrincipal();
+            kerberosUser = loginKerberosUser(principal, keytab);
+
+            final KerberosAction<KuduClient> kerberosAction = new 
KerberosAction<>(kerberosUser, () -> buildClient(kuduMasters, context), 
getLogger());
+            this.kuduClient = kerberosAction.execute();
+        } else {
+            this.kuduClient = buildClient(kuduMasters, context);
+        }
+    }
+
+    protected KerberosUser loginKerberosUser(final String principal, final 
String keytab) throws LoginException {
+        final KerberosUser kerberosUser = new KerberosKeytabUser(principal, 
keytab);
+        kerberosUser.login();
+        return kerberosUser;
+    }
+
+    protected KuduClient buildClient(final String masters, final 
ConfigurationContext context) {
+        final Integer operationTimeout = 
context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+
+        return new KuduClient.KuduClientBuilder(masters)
+                .defaultOperationTimeoutMs(operationTimeout)
+                .build();
+    }
+
+    /**
+     * Establish a connection to a Kudu cluster.
+     * @param context the configuration context
+     * @throws InitializationException if unable to connect a Kudu cluster
+     */
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws 
InitializationException {
+
+        try {
+            kuduMasters = 
context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
+            credentialsService = 
context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
+
+            if (kuduClient == null) {
+                getLogger().debug("Setting up Kudu connection...");
+
+                createKuduClient(context);
+                getLogger().debug("Kudu connection successfully initialized");
+            }
+        } catch(Exception ex){
+            getLogger().error("Exception occurred while interacting with Kudu 
due to " + ex.getMessage(), ex);
+            throw new InitializationException(ex);
+        }
+
+        replicaSelection = 
ReplicaSelection.valueOf(context.getProperty(KUDU_REPLICA_SELECTION).getValue());
+        tableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
+        try {
+            table = kuduClient.openTable(tableName);
+            tableSchema = table.getSchema();
+            columnNames = 
getColumns(context.getProperty(RETURN_COLUMNS).getValue());
+
+            //Result Schema
+            resultSchema = kuduSchemaToNiFiSchema(tableSchema, columnNames);
+
+        } catch (KuduException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    @Override
+    public Set<String> getRequiredKeys() {
+        return new HashSet<>();
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Optional<Record> lookup(Map<String, Object> coordinates) {
+
+        //Scanner
+        KuduScanner.KuduScannerBuilder builder = 
kuduClient.newScannerBuilder(table);
+
+        builder.setProjectedColumnNames(columnNames);
+        builder.replicaSelection(replicaSelection);
+
+        //Only expecting one match
+        builder.limit(1);
+
+        coordinates.forEach((key,value)->
+                
builder.addPredicate(KuduPredicate.newComparisonPredicate(tableSchema.getColumn(key),
 KuduPredicate.ComparisonOp.EQUAL, value))
+        );
+
+        KuduScanner kuduScanner = builder.build();
+
+        //Run lookup
+        for ( RowResult row : kuduScanner){
+            final Map<String, Object> values = new HashMap<>();
+            for(String columnName : columnNames){
+                Object object;
+                if(row.getColumnType(columnName) == Type.BINARY){
+                    object = 
Base64.getEncoder().encodeToString(row.getBinaryCopy(columnName));
+                } else {
+                    object = row.getObject(columnName);
+                }
+                values.put(columnName, object);
+            }
+            return Optional.of(new MapRecord(resultSchema, values));
+        }
+
+        //No match
+        return Optional.empty();
+    }
+
+    private List<String> getColumns(String columns){
+        if(columns.equals("*")){
+            return tableSchema
+                    .getColumns()
+                    .stream().map(ColumnSchema::getName)
+                    .collect(Collectors.toList());
+        } else {
+            return Arrays.asList(columns.split(","));
+        }
+    }
+
+    private RecordSchema kuduSchemaToNiFiSchema(Schema kuduTableSchema, 
List<String> columnNames){
+        final List<RecordField> fields = new ArrayList<>();
+        for(String columnName : columnNames) {
+            if(!kuduTableSchema.hasColumn(columnName)){
+                throw new IllegalArgumentException("Column not found in Kudu 
table schema " + columnName);
+            }
+            ColumnSchema cs = kuduTableSchema.getColumn(columnName);
+            switch (cs.getType()) {
+                case INT8:
+                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.BYTE.getDataType()));
+                    break;
+                case INT16:
+                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.SHORT.getDataType()));
+                    break;
+                case INT32:
+                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.INT.getDataType()));
+                    break;
+                case INT64:
+                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.LONG.getDataType()));
+                    break;
+                case UNIXTIME_MICROS:
+                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.TIMESTAMP.getDataType()));
+                    break;
+                case BINARY:
+                case STRING:
+                case DECIMAL:
+                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.STRING.getDataType()));
+                    break;
+                case DOUBLE:
+                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.DOUBLE.getDataType()));
+                    break;
+                case BOOL:
+                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.BOOLEAN.getDataType()));
+                    break;
+                case FLOAT:
+                    fields.add(new RecordField(cs.getName(), 
RecordFieldType.FLOAT.getDataType()));
+                    break;
+            }
+        }
+        return new SimpleRecordSchema(fields);
+    }
+
+    /**
+     * Disconnect from the Kudu cluster.
+     */
+    @OnDisabled
+    public void onDisabled() throws Exception {
+        try {
+            if (this.kuduClient != null) {
+                getLogger().debug("Closing KuduClient");
+                this.kuduClient.close();
+                this.kuduClient  = null;
+            }
+        } finally {
+            if (kerberosUser != null) {
+                kerberosUser.logout();
+                kerberosUser = null;
+            }
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..240825c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.nifi.controller.kudu.KuduLookupService
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java
 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java
new file mode 100644
index 0000000..0b746fe
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-controller-service/src/test/java/org/apache/nifi/controller/kudu/TestKuduLookupService.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.kudu;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.CreateTableOptions;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.cluster.MiniKuduCluster;
+import org.apache.kudu.util.DecimalUtil;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class TestKuduLookupService {
+
+    // The KuduTestHarness automatically starts and stops a real Kudu cluster
+    // when each test is run. Kudu persists its on-disk state in a temporary
+    // directory under a location defined by the environment variable 
TEST_TMPDIR
+    // if set, or under /tmp otherwise. That cluster data is deleted on
+    // successful exit of the test. The cluster output is logged through slf4j.
+    @Rule
+    public KuduTestHarness harness = new KuduTestHarness(
+            new MiniKuduCluster.MiniKuduClusterBuilder()
+                .addMasterServerFlag("--use_hybrid_clock=false")
+                .addTabletServerFlag("--use_hybrid_clock=false")
+    );
+    private TestRunner testRunner;
+    private long nowMillis = System.currentTimeMillis();
+    private KuduLookupService kuduLookupService;
+
+    public static class SampleProcessor extends AbstractProcessor {
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+        }
+    }
+
+    @Before
+    public void init() throws Exception {
+        testRunner = TestRunners.newTestRunner(SampleProcessor.class);
+        testRunner.setValidateExpressionUsage(false);
+        final String tableName = "table1";
+
+        KuduClient client =  harness.getClient();
+        List<ColumnSchema> columns = new ArrayList<>();
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("string", 
Type.STRING).key(true).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("binary", 
Type.BINARY).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("bool", 
Type.BOOL).build());
+        columns.add(new ColumnSchema
+                .ColumnSchemaBuilder("decimal", Type.DECIMAL)
+                
.typeAttributes(DecimalUtil.typeAttributes(DecimalUtil.MAX_DECIMAL64_PRECISION, 
1))
+                .build()
+        );
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("double", 
Type.DOUBLE).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("float", 
Type.FLOAT).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("int8", 
Type.INT8).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("int16", 
Type.INT16).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("int32", 
Type.INT32).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("int64", 
Type.INT64).build());
+        columns.add(new ColumnSchema.ColumnSchemaBuilder("unixtime_micros", 
Type.UNIXTIME_MICROS).build());
+        Schema schema = new Schema(columns);
+
+        CreateTableOptions opts = new 
CreateTableOptions().setRangePartitionColumns(Collections.singletonList("string"));
+        client.createTable(tableName, schema, opts);
+
+        KuduTable table = client.openTable(tableName);
+        KuduSession session = client.newSession();
+
+        Insert insert = table.newInsert();
+        PartialRow row = insert.getRow();
+        row.addString("string", "string1");
+        row.addBinary("binary", "binary1".getBytes());
+        row.addBoolean("bool",true);
+        row.addDecimal("decimal", BigDecimal.valueOf(0.1));
+        row.addDouble("double",0.2);
+        row.addFloat("float",0.3f);
+        row.addByte("int8", (byte) 1);
+        row.addShort("int16", (short) 2);
+        row.addInt("int32",3);
+        row.addLong("int64",4L);
+        row.addTimestamp("unixtime_micros", new Timestamp(nowMillis));
+        session.apply(insert);
+
+        insert = table.newInsert();
+        row = insert.getRow();
+        row.addString("string", "string2");
+        row.addBinary("binary", "binary2".getBytes());
+        row.addBoolean("bool",false);
+        row.addDecimal("decimal", BigDecimal.valueOf(0.1));
+        row.addDouble("double",1.2);
+        row.addFloat("float",1.3f);
+        row.addByte("int8", (byte) 11);
+        row.addShort("int16", (short) 12);
+        row.addInt("int32",13);
+        row.addLong("int64",14L);
+        row.addTimestamp("unixtime_micros", new Timestamp(nowMillis+(1000L * 
60 * 60 * 24 * 365))); //+ 1 year
+        session.apply(insert);
+
+        session.close();
+
+        kuduLookupService = new KuduLookupService();
+        testRunner.addControllerService("kuduLookupService", 
kuduLookupService);
+        testRunner.setProperty(kuduLookupService, 
KuduLookupService.KUDU_MASTERS, "testLocalHost:7051");
+        testRunner.setProperty(kuduLookupService, 
KuduLookupService.KUDU_REPLICA_SELECTION, KuduLookupService.LEADER_ONLY);
+        testRunner.setProperty(kuduLookupService, 
KuduLookupService.TABLE_NAME, tableName);
+        kuduLookupService.kuduClient = client;
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void invalid_key() {
+        testRunner.setProperty(kuduLookupService, 
KuduLookupService.RETURN_COLUMNS, "*");
+
+        testRunner.enableControllerService(kuduLookupService);
+
+        Map<String,Object> map = new HashMap<>();
+        map.put("invalid", "invalid key");
+        kuduLookupService.lookup(map);
+    }
+    @Test
+    public void row_not_found() {
+        testRunner.setProperty(kuduLookupService, 
KuduLookupService.RETURN_COLUMNS, "*");
+
+        testRunner.enableControllerService(kuduLookupService);
+
+        Map<String,Object> map = new HashMap<>();
+        map.put("string", "key not found");
+        Optional<Record> result = kuduLookupService.lookup(map);
+        assertFalse(result.isPresent());
+    }
+
+    @Test
+    public void single_key() {
+        testRunner.setProperty(kuduLookupService, 
KuduLookupService.RETURN_COLUMNS, "*");
+
+        testRunner.enableControllerService(kuduLookupService);
+
+        Map<String,Object> map = new HashMap<>();
+        map.put("string", "string1");
+        Record result = kuduLookupService.lookup(map).get();
+        validateRow1(result);
+    }
+    @Test
+    public void multi_key() {
+        testRunner.setProperty(kuduLookupService, 
KuduLookupService.RETURN_COLUMNS, "*");
+
+        testRunner.enableControllerService(kuduLookupService);
+
+        Map<String,Object> map = new HashMap<>();
+        map.put("string", "string1");
+        map.put("binary", "binary1".getBytes());
+        map.put("bool",true);
+        map.put("decimal", BigDecimal.valueOf(0.1));
+        map.put("double",0.2);
+        map.put("float",0.3f);
+        map.put("int8", (byte) 1);
+        map.put("int16", (short) 2);
+        map.put("int32",3);
+        map.put("int64",4L);
+        map.put("unixtime_micros", new Timestamp(nowMillis));
+        Record result = kuduLookupService.lookup(map).get();
+        validateRow1(result);
+    }
+    @Test
+    public void specific_return_columns() {
+        testRunner.setProperty(kuduLookupService, 
KuduLookupService.RETURN_COLUMNS, "binary,bool");
+
+        testRunner.enableControllerService(kuduLookupService);
+
+        Map<String,Object> map = new HashMap<>();
+        map.put("string", "string1");
+        Record result = kuduLookupService.lookup(map).get();
+
+        assertEquals(2,result.getValues().length);
+
+        assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()), 
result.getValue("binary"));
+        assertEquals(true, result.getAsBoolean("bool"));
+    }
+    private void validateRow1(Record result){
+
+        assertEquals("string1", result.getAsString("string"));
+        assertEquals(Base64.getEncoder().encodeToString("binary1".getBytes()), 
result.getValue("binary"));
+        assertEquals(true, result.getAsBoolean("bool"));
+        assertEquals(BigDecimal.valueOf(0.1), result.getValue("decimal"));
+        assertEquals(0.2, result.getAsDouble("double"),0);
+        assertEquals(0.3f, result.getAsFloat("float"),0);
+        assertEquals((byte)1, result.getValue("int8"));
+        assertEquals((short)2, result.getValue("int16"));
+        assertEquals(3, (int)result.getAsInt("int32"));
+        assertEquals(4L, (long)result.getAsLong("int64"));
+        assertEquals(new Timestamp(nowMillis), 
result.getValue("unixtime_micros"));
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml 
b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
index 16d4e5e..4b530a0 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-nar/pom.xml
@@ -34,6 +34,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kudu-controller-service</artifactId>
+            <version>1.10.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-hadoop-libraries-nar</artifactId>
             <version>1.10.0-SNAPSHOT</version>
             <type>nar</type>
diff --git a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml 
b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
index 8a0759a..bea7232 100644
--- a/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-kudu-bundle/pom.xml
@@ -30,6 +30,6 @@
     <modules>
         <module>nifi-kudu-processors</module>
         <module>nifi-kudu-nar</module>
+        <module>nifi-kudu-controller-service</module>
     </modules>
-
 </project>

Reply via email to