Repository: nifi Updated Branches: refs/heads/NIFI-810-InputRequirement 2215bc848 -> 8e2308b78
nifi-992 Adding nifi-couchbase-bundle. - new CouchbaseClusterControllerService - new Processors - GetCouchbaseKey - PutCouchbaseKey Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2466a245 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2466a245 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2466a245 Branch: refs/heads/NIFI-810-InputRequirement Commit: 2466a24530f493238024eb22dc041eebe96621f3 Parents: 96764ed Author: ijokarumawak <[email protected]> Authored: Sat Sep 26 02:46:37 2015 +0900 Committer: Bryan Bende <[email protected]> Committed: Mon Sep 28 11:21:42 2015 -0400 ---------------------------------------------------------------------- nifi-assembly/NOTICE | 10 + nifi-assembly/pom.xml | 5 + .../nifi-couchbase-nar/pom.xml | 37 +++ .../nifi-couchbase-processors/pom.xml | 208 +++++++++++++++ .../nifi/couchbase/CouchbaseAttributes.java | 59 +++++ .../CouchbaseClusterControllerService.java | 38 +++ .../nifi/couchbase/CouchbaseClusterService.java | 130 ++++++++++ .../couchbase/AbstractCouchbaseProcessor.java | 174 +++++++++++++ .../nifi/processors/couchbase/DocumentType.java | 36 +++ .../processors/couchbase/GetCouchbaseKey.java | 172 +++++++++++++ .../processors/couchbase/PutCouchbaseKey.java | 164 ++++++++++++ ...org.apache.nifi.controller.ControllerService | 15 ++ .../org.apache.nifi.processor.Processor | 16 ++ .../couchbase/TestCouchbaseClusterService.java | 59 +++++ .../couchbase/TestGetCouchbaseKey.java | 224 ++++++++++++++++ .../couchbase/TestPutCouchbaseKey.java | 254 +++++++++++++++++++ nifi-nar-bundles/nifi-couchbase-bundle/pom.xml | 35 +++ nifi-nar-bundles/pom.xml | 1 + pom.xml | 6 + 19 files changed, 1643 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-assembly/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE index 3362740..1f7e3f1 100644 --- a/nifi-assembly/NOTICE +++ b/nifi-assembly/NOTICE @@ -709,6 +709,16 @@ The following binary components are provided under the Apache Software License v Metadata-Extractor Copyright 2002-2015 Drew Noakes + (ASLv2) Couchbase Java SDK + The following NOTICE information applies: + Couchbase Java SDK + Copyright 2014 Couchbase, Inc. + + (ASLv2) RxJava + The following NOTICE information applies: + Couchbase Java SDK + Copyright 2012 Netflix, Inc. + ************************ Common Development and Distribution License 1.1 ************************ http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index f162588..de4fdcb 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -227,6 +227,11 @@ language governing permissions and limitations under the License. --> <artifactId>nifi-image-nar</artifactId> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-couchbase-nar</artifactId> + <type>nar</type> + </dependency> </dependencies> <properties> http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml new file mode 100644 index 0000000..4f58d1f --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-couchbase-bundle</artifactId> + <version>0.3.1-SNAPSHOT</version> + </parent> + + <artifactId>nifi-couchbase-nar</artifactId> + <version>0.3.1-SNAPSHOT</version> + <packaging>nar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-couchbase-processors</artifactId> + <version>0.3.1-SNAPSHOT</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml new file mode 100644 index 0000000..33b0baa --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml @@ -0,0 +1,208 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-couchbase-bundle</artifactId> + <version>0.3.1-SNAPSHOT</version> + </parent> + + <artifactId>nifi-couchbase-processors</artifactId> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.couchbase.client</groupId> + <artifactId>java-client</artifactId> + <version>2.2.0</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.15</version> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>6.5</version> + </dependency> + </dependencies> + </plugin> + </plugins> + </pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <checkstyleRules> + <module name="Checker"> + <property name="charset" value="UTF-8" /> + <property name="severity" value="warning" /> + <!-- Checks for whitespace --> + <!-- See http://checkstyle.sf.net/config_whitespace.html --> + <module name="FileTabCharacter"> + <property name="eachLine" value="true" /> + </module> + <module name="TreeWalker"> + <module name="RegexpSinglelineJava"> + <property name="format" value="\s+$" /> + <property name="message" value="Line has trailing whitespace." /> + </module> + <module name="RegexpSinglelineJava"> + <property name="format" value="[@]see\s+[{][@]link" /> + <property name="message" value="Javadoc @see does not need @link: pick one or the other." /> + </module> + <module name="OuterTypeFilename" /> + <module name="LineLength"> + <!-- needs extra, because Eclipse formatter ignores the ending left + brace --> + <property name="max" value="200" /> + <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://" /> + </module> + <module name="AvoidStarImport" /> + <module name="UnusedImports"> + <property name="processJavadoc" value="true" /> + </module> + <module name="NoLineWrap" /> + <module name="LeftCurly"> + <property name="maxLineLength" value="160" /> + </module> + <module name="RightCurly" /> + <module name="RightCurly"> + <property name="option" value="alone" /> + <property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT" /> + </module> + <module name="SeparatorWrap"> + <property name="tokens" value="DOT" /> + <property name="option" value="nl" /> + </module> + <module name="SeparatorWrap"> + <property name="tokens" value="COMMA" /> + <property name="option" value="EOL" /> + </module> + <module name="PackageName"> + <property name="format" value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" /> + </module> + <module name="MethodTypeParameterName"> + <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" /> + </module> + <module name="MethodParamPad" /> + <module name="OperatorWrap"> + <property name="option" value="NL" /> + <property name="tokens" value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " /> + </module> + <module name="AnnotationLocation"> + <property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" /> + </module> + <module name="AnnotationLocation"> + <property name="tokens" value="VARIABLE_DEF" /> + <property name="allowSamelineMultipleAnnotations" value="true" /> + </module> + <module name="NonEmptyAtclauseDescription" /> + <module name="JavadocMethod"> + <property name="allowMissingJavadoc" value="true" /> + <property name="allowMissingParamTags" value="true" /> + <property name="allowMissingThrowsTags" value="true" /> + <property name="allowMissingReturnTag" value="true" /> + <property name="allowedAnnotations" value="Override,Test,BeforeClass,AfterClass,Before,After" /> + <property name="allowThrowsTagsForSubclasses" value="true" /> + </module> + <module name="SingleLineJavadoc" /> + </module> + </module> + </checkstyleRules> + <violationSeverity>warning</violationSeverity> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + </configuration> + </plugin> + </plugins> + </build> + <profiles> + <profile> + <!-- Checks style and licensing requirements. This is a good idea to run + for contributions and for the release process. While it would be nice to + run always these plugins can considerably slow the build and have proven + to create unstable builds in our multi-module project and when building using + multiple threads. The stability issues seen with Checkstyle in multi-module + builds include false-positives and false negatives. --> + <id>contrib-check</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + <phase>verify</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <executions> + <execution> + <id>check-style</id> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java new file mode 100644 index 0000000..a4d69fc --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; + +/** + * Couchbase related attribute keys. + */ +public enum CouchbaseAttributes implements FlowFileAttributeKey { + + /** + * A reference to the related cluster. + */ + Cluster("couchbase.cluster"), + /** + * A related bucket name. + */ + Bucket("couchbase.bucket"), + /** + * The id of a related document. + */ + DocId("couchbase.doc.id"), + /** + * The CAS value of a related document. + */ + Cas("couchbase.doc.cas"), + /** + * The expiration of a related document. + */ + Expiry("couchbase.doc.expiry"), + ; + + private final String key; + + private CouchbaseAttributes(final String key) { + this.key = key; + } + + @Override + public String key() { + return key; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java new file mode 100644 index 0000000..fcf72d5 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.controller.ControllerService; + +import com.couchbase.client.java.Bucket; + +/** + * Provides a connection to a Couchbase Server cluster throughout a NiFi Data + * flow. + */ +@CapabilityDescription("Provides a centralized Couchbase connection.") +public interface CouchbaseClusterControllerService extends ControllerService { + + /** + * Open a bucket connection. + * @param bucketName the bucket name to access + * @return a connected bucket instance + */ + public Bucket openBucket(String bucketName); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java new file mode 100644 index 0000000..7daa97c --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.couchbase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; + +import com.couchbase.client.core.CouchbaseException; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.CouchbaseCluster; + +/** + * Provides a centralized Couchbase connection and bucket passwords management. + */ +@CapabilityDescription("Provides a centralized Couchbase connection and bucket passwords management." + + " Bucket passwords can be specified via dynamic properties.") +@Tags({ "nosql", "couchbase", "database", "connection" }) +@DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket password", description = "Specify bucket password if neseccery.") +public class CouchbaseClusterService extends AbstractControllerService implements CouchbaseClusterControllerService { + + public static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor + .Builder().name("Connection String") + .description("The hostnames or ip addresses of the bootstraping nodes and optional parameters." + + " Syntax) couchbase://node1,node2,nodeN?param1=value1¶m2=value2¶mN=valueN") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final List<PropertyDescriptor> properties; + + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(CONNECTION_STRING); + + properties = Collections.unmodifiableList(props); + } + + private static final String DYNAMIC_PROP_BUCKET_PASSWORD = "Bucket Password for "; + private static final Map<String, String> bucketPasswords = new HashMap<>(); + + private volatile CouchbaseCluster cluster; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor( + String propertyDescriptorName) { + if(propertyDescriptorName.startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){ + return new PropertyDescriptor + .Builder().name(propertyDescriptorName) + .description("Bucket password.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .sensitive(true) + .build(); + } + return null; + } + + + /** + * Establish a connection to a Couchbase cluster. + * @param context the configuration context + * @throws InitializationException if unable to connect a Couchbase cluster + */ + @OnEnabled + public void onConfigured(final ConfigurationContext context) throws InitializationException { + + for(PropertyDescriptor p : context.getProperties().keySet()){ + if(p.isDynamic() && p.getName().startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){ + String bucketName = p.getName().substring(DYNAMIC_PROP_BUCKET_PASSWORD.length()); + String password = context.getProperty(p).getValue(); + bucketPasswords.put(bucketName, password); + } + } + try { + cluster = CouchbaseCluster.fromConnectionString(context.getProperty(CONNECTION_STRING).getValue()); + } catch(CouchbaseException e) { + throw new InitializationException(e); + } + } + + @Override + public Bucket openBucket(String bucketName){ + return cluster.openBucket(bucketName, bucketPasswords.get(bucketName)); + } + + /** + * Disconnect from the Couchbase cluster. + */ + @OnDisabled + public void shutdown() { + if(cluster != null){ + cluster.disconnect(); + cluster = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java new file mode 100644 index 0000000..d370728 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; + +import com.couchbase.client.java.Bucket; + +/** + * Provides common functionalities for Couchbase processors. + */ +public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { + + public static final PropertyDescriptor DOCUMENT_TYPE = new PropertyDescriptor + .Builder().name("Document Type") + .description("The type of contents.") + .required(true) + .allowableValues(DocumentType.values()) + .defaultValue(DocumentType.Json.toString()) + .build(); + + public static final PropertyDescriptor DOC_ID = new PropertyDescriptor + .Builder().name("Static Document Id") + .description("A static, fixed Couchbase document id.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor DOC_ID_EXP = new PropertyDescriptor + .Builder().name("Document Id Expression") + .description("An expression to construct the Couchbase document id." + + " If 'Static Document Id' is specified, then 'Static Document Id' is used.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are written to Couchbase Server are routed to this relationship.") + .build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original input file will be routed to this destination when it has been successfully processed.") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All FlowFiles that cannot written to Couchbase Server are routed to this relationship.") + .build(); + + public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor + .Builder().name("Couchbase Cluster Controller Service") + .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.") + .required(true) + .identifiesControllerService(CouchbaseClusterControllerService.class) + .build(); + + public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor + .Builder().name("Bucket Name") + .description("The name of bucket to access.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("default") + .build(); + + private List<PropertyDescriptor> descriptors; + + private Set<Relationship> relationships; + + private CouchbaseClusterControllerService clusterService; + + @Override + protected final void init(final ProcessorInitializationContext context) { + + final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(); + descriptors.add(COUCHBASE_CLUSTER_SERVICE); + descriptors.add(BUCKET_NAME); + addSupportedProperties(descriptors); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<Relationship>(); + addSupportedRelationships(relationships); + this.relationships = Collections.unmodifiableSet(relationships); + + } + + /** + * Add processor specific properties. + * @param descriptors add properties to this list + */ + protected void addSupportedProperties(List<PropertyDescriptor> descriptors) { + return; + } + + /** + * Add processor specific relationships. + * @param relationships add relationships to this list + */ + protected void addSupportedRelationships(Set<Relationship> relationships) { + return; + } + + @Override + public final Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + private CouchbaseClusterControllerService getClusterService(final ProcessContext context) { + if(clusterService == null){ + synchronized(AbstractCouchbaseProcessor.class){ + if(clusterService == null){ + clusterService = context.getProperty(COUCHBASE_CLUSTER_SERVICE) + .asControllerService(CouchbaseClusterControllerService.class); + } + } + } + + return clusterService; + } + + /** + * Open a bucket connection using a CouchbaseClusterControllerService. + * @param context a process context + * @return a bucket instance + */ + protected final Bucket openBucket(final ProcessContext context) { + return getClusterService(context).openBucket(context.getProperty(BUCKET_NAME).getValue()); + } + + /** + * Generate a transit url. + * @param context a process context + * @return a transit url based on the bucket name and the CouchbaseClusterControllerService name + */ + protected String getTransitUrl(final ProcessContext context) { + return new StringBuilder(context.getProperty(BUCKET_NAME).getValue()) + .append('@') + .append(context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java new file mode 100644 index 0000000..81dd465 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + + +/** + * Supported Couchbase document types. + * + * In order to handle a variety type of document classes such as JsonDocument, + * JsonLongDocument or JsonStringDocument, Couchbase processors use + * RawJsonDocument for Json type. + * + * The distinction between Json and Binary exists because BinaryDocument doesn't + * set Json flag when it stored on Couchbase Server even if the content byte + * array represents a Json string, and it can't be retrieved as a Json document. + */ +public enum DocumentType { + + Json, + Binary + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java new file mode 100644 index 0000000..6d9a476 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.couchbase.CouchbaseAttributes; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.stream.io.StreamUtils; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.document.BinaryDocument; +import com.couchbase.client.java.document.Document; +import com.couchbase.client.java.document.RawJsonDocument; + +@Tags({ "nosql", "couchbase", "database", "get" }) +@CapabilityDescription("Get a document from Couchbase Server via Key/Value access.") +@SeeAlso({CouchbaseClusterControllerService.class}) +@ReadsAttributes({ + @ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if none of 'Static Document Id' or 'Document Id Expression' is specified"), + @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id Excepression.") + }) +@WritesAttributes({ + @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was retrieved from."), + @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was retrieved from."), + @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), + @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), + @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document.") + }) +public class GetCouchbaseKey extends AbstractCouchbaseProcessor { + + @Override + protected void addSupportedProperties(List<PropertyDescriptor> descriptors) { + descriptors.add(DOCUMENT_TYPE); + descriptors.add(DOC_ID); + descriptors.add(DOC_ID_EXP); + } + + @Override + protected void addSupportedRelationships(Set<Relationship> relationships) { + relationships.add(REL_SUCCESS); + relationships.add(REL_ORIGINAL); + relationships.add(REL_FAILURE); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ProcessorLog logger = getLogger(); + FlowFile inFile = session.get(); + + String docId = null; + if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ + docId = context.getProperty(DOC_ID).getValue(); + } else { + // Otherwise docId has to be extracted from inFile. + if ( inFile == null ) { + return; + } + if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){ + docId = context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(inFile).getValue(); + } else { + final byte[] content = new byte[(int) inFile.getSize()]; + session.read(inFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, content, true); + } + }); + docId = new String(content, StandardCharsets.UTF_8); + } + } + + if(StringUtils.isEmpty(docId)){ + logger.error("Couldn't get document id from from {}", new Object[]{inFile}); + session.transfer(inFile, REL_FAILURE); + } + + try { + Document<?> doc = null; + byte[] content = null; + Bucket bucket = openBucket(context); + DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + switch (documentType){ + case Json : { + RawJsonDocument document = bucket.get(docId, RawJsonDocument.class); + if(document != null){ + content = document.content().getBytes(StandardCharsets.UTF_8); + doc = document; + } + break; + } + case Binary : { + BinaryDocument document = bucket.get(docId, BinaryDocument.class); + if(document != null){ + content = document.content().array(); + doc = document; + } + break; + } + } + + if(doc == null) { + logger.info("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)}); + if(inFile != null){ + session.transfer(inFile, REL_FAILURE); + } + return; + } + + if(inFile != null){ + session.transfer(inFile, REL_ORIGINAL); + } + + FlowFile outFile = session.create(); + outFile = session.importFrom(new ByteArrayInputStream(content), outFile); + Map<String, String> updatedAttrs = new HashMap<>(); + updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()); + updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue()); + updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId); + updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas())); + updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry())); + outFile = session.putAllAttributes(outFile, updatedAttrs); + session.getProvenanceReporter().receive(outFile, getTransitUrl(context)); + session.transfer(outFile, REL_SUCCESS); + + } catch (Throwable t){ + logger.error("Getting docuement {} from Couchbase Server using {} failed due to {}", + new Object[]{docId, inFile, t}, t); + if(inFile != null){ + session.transfer(inFile, REL_FAILURE); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java new file mode 100644 index 0000000..6bfa480 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.couchbase.CouchbaseAttributes; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.stream.io.StreamUtils; + +import com.couchbase.client.deps.io.netty.buffer.ByteBuf; +import com.couchbase.client.deps.io.netty.buffer.Unpooled; +import com.couchbase.client.java.PersistTo; +import com.couchbase.client.java.ReplicateTo; +import com.couchbase.client.java.document.BinaryDocument; +import com.couchbase.client.java.document.Document; +import com.couchbase.client.java.document.RawJsonDocument; + +@Tags({ "nosql", "couchbase", "database", "put" }) +@CapabilityDescription("Put a document to Couchbase Server via Key/Value access.") +@SeeAlso({CouchbaseClusterControllerService.class}) +@ReadsAttributes({ + @ReadsAttribute(attribute = "uuid", description = "Used as a document id if none of 'Static Document Id' or 'Document Id Expression' is specified"), + @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id Excepression.") + }) +@WritesAttributes({ + @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."), + @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."), + @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), + @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), + @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document.") + }) +public class PutCouchbaseKey extends AbstractCouchbaseProcessor { + + + public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor + .Builder().name("Persist To") + .description("Durability constraint about disk persistence.") + .required(true) + .allowableValues(PersistTo.values()) + .defaultValue(PersistTo.NONE.toString()) + .build(); + + public static final PropertyDescriptor REPLICATE_TO = new PropertyDescriptor + .Builder().name("Replicate To") + .description("Durability constraint about replication.") + .required(true) + .allowableValues(ReplicateTo.values()) + .defaultValue(ReplicateTo.NONE.toString()) + .build(); + + @Override + protected void addSupportedProperties(List<PropertyDescriptor> descriptors) { + descriptors.add(DOCUMENT_TYPE); + descriptors.add(DOC_ID); + descriptors.add(DOC_ID_EXP); + descriptors.add(PERSIST_TO); + descriptors.add(REPLICATE_TO); + } + + @Override + protected void addSupportedRelationships(Set<Relationship> relationships) { + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ProcessorLog logger = getLogger(); + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + try { + + final byte[] content = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, content, true); + } + }); + + + String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key())); + if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ + docId = context.getProperty(DOC_ID).getValue(); + } else if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){ + docId = context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(flowFile).getValue(); + } + + Document<?> doc = null; + DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); + switch (documentType){ + case Json : { + doc = RawJsonDocument.create(docId, new String(content, StandardCharsets.UTF_8)); + break; + } + case Binary : { + ByteBuf buf = Unpooled.copiedBuffer(content); + doc = BinaryDocument.create(docId, buf); + break; + } + } + + + PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue()); + ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue()); + doc = openBucket(context).upsert(doc, persistTo, replicateTo); + Map<String, String> updatedAttrs = new HashMap<>(); + updatedAttrs.put(CouchbaseAttributes.Cluster.key(), context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue()); + updatedAttrs.put(CouchbaseAttributes.Bucket.key(), context.getProperty(BUCKET_NAME).getValue()); + updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId); + updatedAttrs.put(CouchbaseAttributes.Cas.key(), String.valueOf(doc.cas())); + updatedAttrs.put(CouchbaseAttributes.Expiry.key(), String.valueOf(doc.expiry())); + flowFile = session.putAllAttributes(flowFile, updatedAttrs); + session.getProvenanceReporter().send(flowFile, getTransitUrl(context)); + session.transfer(flowFile, REL_SUCCESS); + + } catch (Throwable t) { + logger.error("Writing {} into Couchbase Server failed due to {}", new Object[]{flowFile, t}, t); + session.transfer(flowFile, REL_FAILURE); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..e5e3ea7 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.couchbase.CouchbaseClusterService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..1304435 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.couchbase.GetCouchbaseKey +org.apache.nifi.processors.couchbase.PutCouchbaseKey \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java new file mode 100644 index 0000000..d96b1c2 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.couchbase.CouchbaseClusterService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +public class TestCouchbaseClusterService { + + private static final String SERVICE_ID = "couchbaseClusterService"; + private TestRunner testRunner; + + @Before + public void init() throws Exception { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.CouchbaseClusterService", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.TestCouchbaseClusterService", "debug"); + + testRunner = TestRunners.newTestRunner(PutCouchbaseKey.class); + testRunner.setValidateExpressionUsage(false); + } + + @Test + public void testConnectionFailure() throws InitializationException { + String connectionString = "couchbase://invalid-hostname"; + CouchbaseClusterControllerService service = new CouchbaseClusterService(); + testRunner.addControllerService(SERVICE_ID, service); + testRunner.setProperty(service, CouchbaseClusterService.CONNECTION_STRING, connectionString); + try { + testRunner.enableControllerService(service); + Assert.fail("The service shouldn't be enabled when it couldn't connect to a cluster."); + } catch (AssertionError e) { + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java new file mode 100644 index 0000000..4ea4dff --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOCUMENT_TYPE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_ORIGINAL; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.couchbase.CouchbaseAttributes; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import com.couchbase.client.core.ServiceNotAvailableException; +import com.couchbase.client.deps.io.netty.buffer.ByteBuf; +import com.couchbase.client.deps.io.netty.buffer.Unpooled; +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.document.BinaryDocument; +import com.couchbase.client.java.document.RawJsonDocument; + + +public class TestGetCouchbaseKey { + + private static final String SERVICE_ID = "couchbaseClusterService"; + private TestRunner testRunner; + + @Before + public void init() throws Exception { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.GetCouchbaseKey", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.TestGetCouchbaseKey", "debug"); + + testRunner = TestRunners.newTestRunner(GetCouchbaseKey.class); + testRunner.setValidateExpressionUsage(false); + } + + private void setupMockBucket(Bucket bucket) throws InitializationException { + CouchbaseClusterControllerService service = mock(CouchbaseClusterControllerService.class); + when(service.getIdentifier()).thenReturn(SERVICE_ID); + when(service.openBucket(anyString())).thenReturn(bucket); + testRunner.addControllerService(SERVICE_ID, service); + testRunner.enableControllerService(service); + testRunner.setProperty(COUCHBASE_CLUSTER_SERVICE, SERVICE_ID); + } + + @Test + public void testStaticDocId() throws Exception { + String bucketName = "bucket-1"; + String docId = "doc-a"; + + Bucket bucket = mock(Bucket.class); + String content = "{\"key\":\"value\"}"; + int expiry = 100; + long cas = 200L; + when(bucket.get(docId, RawJsonDocument.class)).thenReturn(RawJsonDocument.create(docId, expiry, content, cas)); + setupMockBucket(bucket); + + testRunner.setProperty(BUCKET_NAME, bucketName); + testRunner.setProperty(DOC_ID, docId); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + + outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID); + outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName); + outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId); + outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas)); + outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), String.valueOf(expiry)); + } + + + /** + * Use static document id even if doc id expression is set. + */ + @Test + public void testStaticDocIdAndDocIdExp() throws Exception { + String docId = "doc-a"; + String docIdExp = "${someProperty}"; + + Bucket bucket = mock(Bucket.class); + String content = "{\"key\":\"value\"}"; + when(bucket.get(docId, RawJsonDocument.class)).thenReturn(RawJsonDocument.create(docId, content)); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(DOC_ID_EXP, docIdExp); + testRunner.run(); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + } + + @Test + public void testDocIdExp() throws Exception { + String docIdExp = "${'someProperty'}"; + String somePropertyValue = "doc-p"; + + Bucket bucket = mock(Bucket.class); + String content = "{\"key\":\"value\"}"; + when(bucket.get(somePropertyValue, RawJsonDocument.class)) + .thenReturn(RawJsonDocument.create(somePropertyValue, content)); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID_EXP, docIdExp); + + byte[] inFileData = "input FlowFile data".getBytes(StandardCharsets.UTF_8); + Map<String, String> properties = new HashMap<>(); + properties.put("someProperty", somePropertyValue); + testRunner.enqueue(inFileData, properties); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + } + + @Test + public void testInputFlowFileContent() throws Exception { + + Bucket bucket = mock(Bucket.class); + String inFileDataStr = "doc-in"; + String content = "{\"key\":\"value\"}"; + when(bucket.get(inFileDataStr, RawJsonDocument.class)) + .thenReturn(RawJsonDocument.create(inFileDataStr, content)); + setupMockBucket(bucket); + + + byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + } + + @Test + public void testBinaryDocument() throws Exception { + + Bucket bucket = mock(Bucket.class); + String inFileDataStr = "doc-in"; + String content = "binary"; + ByteBuf buf = Unpooled.copiedBuffer(content.getBytes(StandardCharsets.UTF_8)); + when(bucket.get(inFileDataStr, BinaryDocument.class)) + .thenReturn(BinaryDocument.create(inFileDataStr, buf)); + setupMockBucket(bucket); + + + byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.setProperty(DOCUMENT_TYPE, DocumentType.Binary.toString()); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(content); + } + + + @Test + public void testCouchbaseFailure() throws Exception { + + Bucket bucket = mock(Bucket.class); + String inFileDataStr = "doc-in"; + when(bucket.get(inFileDataStr, RawJsonDocument.class)) + .thenThrow(new ServiceNotAvailableException()); + setupMockBucket(bucket); + + + byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_FAILURE, 1); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); + outFile.assertContentEquals(inFileDataStr); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java new file mode 100644 index 0000000..3995528 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.couchbase.CouchbaseAttributes; +import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.PersistTo; +import com.couchbase.client.java.ReplicateTo; +import com.couchbase.client.java.document.RawJsonDocument; +import com.couchbase.client.java.error.DurabilityException; + + +public class TestPutCouchbaseKey { + + private static final String SERVICE_ID = "couchbaseClusterService"; + private TestRunner testRunner; + + @Before + public void init() throws Exception { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.TestPutCouchbaseKey", "debug"); + + testRunner = TestRunners.newTestRunner(PutCouchbaseKey.class); + testRunner.setValidateExpressionUsage(false); + } + + private void setupMockBucket(Bucket bucket) throws InitializationException { + CouchbaseClusterControllerService service = mock(CouchbaseClusterControllerService.class); + when(service.getIdentifier()).thenReturn(SERVICE_ID); + when(service.openBucket(anyString())).thenReturn(bucket); + testRunner.addControllerService(SERVICE_ID, service); + testRunner.enableControllerService(service); + testRunner.setProperty(COUCHBASE_CLUSTER_SERVICE, SERVICE_ID); + } + + @Test + public void testStaticDocId() throws Exception { + String bucketName = "bucket-1"; + String docId = "doc-a"; + int expiry = 100; + long cas = 200L; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE))) + .thenReturn(RawJsonDocument.create(docId, expiry, inFileData, cas)); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(BUCKET_NAME, bucketName); + testRunner.setProperty(DOC_ID, docId); + testRunner.run(); + + verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), SERVICE_ID); + outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), bucketName); + outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId); + outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), String.valueOf(cas)); + outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), String.valueOf(expiry)); + } + + @Test + public void testDurabilityConstraint() throws Exception { + String docId = "doc-a"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.MASTER), eq(ReplicateTo.ONE))) + .thenReturn(RawJsonDocument.create(docId, inFileData)); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(PutCouchbaseKey.PERSIST_TO, PersistTo.MASTER.toString()); + testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString()); + testRunner.run(); + + verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.MASTER), eq(ReplicateTo.ONE)); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + } + + /** + * Use static document id even if doc id expression is set. + */ + @Test + public void testStaticDocIdAndDocIdExp() throws Exception { + String docId = "doc-a"; + String docIdExp = "${someProperty}"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE))) + .thenReturn(RawJsonDocument.create(docId, inFileData)); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(DOC_ID_EXP, docIdExp); + testRunner.run(); + + verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); + + testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + } + + @Test + public void testDocIdExp() throws Exception { + String docIdExp = "${'someProperty'}"; + String somePropertyValue = "doc-p"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE))) + .thenReturn(RawJsonDocument.create(somePropertyValue, inFileData)); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID_EXP, docIdExp); + + Map<String, String> properties = new HashMap<>(); + properties.put("someProperty", somePropertyValue); + testRunner.enqueue(inFileDataBytes, properties); + testRunner.run(); + + verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + } + + @Test + public void testInputFlowFileUuid() throws Exception { + + String uuid = "00029362-5106-40e8-b8a9-bf2cecfbc0d7"; + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE))) + .thenReturn(RawJsonDocument.create(uuid, inFileData)); + setupMockBucket(bucket); + + Map<String, String> properties = new HashMap<>(); + properties.put(CoreAttributes.UUID.key(), uuid); + testRunner.enqueue(inFileDataBytes, properties); + testRunner.run(); + + ArgumentCaptor<RawJsonDocument> capture = ArgumentCaptor.forClass(RawJsonDocument.class); + verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); + assertEquals(uuid, capture.getValue().id()); + + testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); + outFile.assertContentEquals(inFileData); + } + + + @Test + public void testCouchbaseFailure() throws Exception { + + String docId = "doc-a"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE))) + .thenThrow(new DurabilityException()); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString()); + testRunner.run(); + + verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)); + + testRunner.assertAllFlowFilesTransferred(REL_FAILURE); + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_FAILURE, 1); + MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); + outFile.assertContentEquals(inFileData); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/pom.xml b/nifi-nar-bundles/nifi-couchbase-bundle/pom.xml new file mode 100644 index 0000000..3654295 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/pom.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>0.3.1-SNAPSHOT</version> + </parent> + + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-couchbase-bundle</artifactId> + <version>0.3.1-SNAPSHOT</version> + <packaging>pom</packaging> + + <modules> + <module>nifi-couchbase-processors</module> + <module>nifi-couchbase-nar</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index d51c9b6..841818a 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -45,6 +45,7 @@ <module>nifi-ambari-bundle</module> <module>nifi-image-bundle</module> <module>nifi-avro-bundle</module> + <module>nifi-couchbase-bundle</module> </modules> <dependencyManagement> <dependencies> http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7f68c32..1d5a857 100644 --- a/pom.xml +++ b/pom.xml @@ -908,6 +908,12 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-couchbase-nar</artifactId> + <version>0.3.1-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-properties</artifactId> <version>0.3.1-SNAPSHOT</version> </dependency>
