Repository: nifi
Updated Branches:
  refs/heads/NIFI-810-InputRequirement 2215bc848 -> 8e2308b78


nifi-992 Adding nifi-couchbase-bundle.

- new CouchbaseClusterControllerService
- new Processors
  - GetCouchbaseKey
  - PutCouchbaseKey

Signed-off-by: Bryan Bende <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2466a245
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2466a245
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2466a245

Branch: refs/heads/NIFI-810-InputRequirement
Commit: 2466a24530f493238024eb22dc041eebe96621f3
Parents: 96764ed
Author: ijokarumawak <[email protected]>
Authored: Sat Sep 26 02:46:37 2015 +0900
Committer: Bryan Bende <[email protected]>
Committed: Mon Sep 28 11:21:42 2015 -0400

----------------------------------------------------------------------
 nifi-assembly/NOTICE                            |  10 +
 nifi-assembly/pom.xml                           |   5 +
 .../nifi-couchbase-nar/pom.xml                  |  37 +++
 .../nifi-couchbase-processors/pom.xml           | 208 +++++++++++++++
 .../nifi/couchbase/CouchbaseAttributes.java     |  59 +++++
 .../CouchbaseClusterControllerService.java      |  38 +++
 .../nifi/couchbase/CouchbaseClusterService.java | 130 ++++++++++
 .../couchbase/AbstractCouchbaseProcessor.java   | 174 +++++++++++++
 .../nifi/processors/couchbase/DocumentType.java |  36 +++
 .../processors/couchbase/GetCouchbaseKey.java   | 172 +++++++++++++
 .../processors/couchbase/PutCouchbaseKey.java   | 164 ++++++++++++
 ...org.apache.nifi.controller.ControllerService |  15 ++
 .../org.apache.nifi.processor.Processor         |  16 ++
 .../couchbase/TestCouchbaseClusterService.java  |  59 +++++
 .../couchbase/TestGetCouchbaseKey.java          | 224 ++++++++++++++++
 .../couchbase/TestPutCouchbaseKey.java          | 254 +++++++++++++++++++
 nifi-nar-bundles/nifi-couchbase-bundle/pom.xml  |  35 +++
 nifi-nar-bundles/pom.xml                        |   1 +
 pom.xml                                         |   6 +
 19 files changed, 1643 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-assembly/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 3362740..1f7e3f1 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -709,6 +709,16 @@ The following binary components are provided under the 
Apache Software License v
         Metadata-Extractor
         Copyright 2002-2015 Drew Noakes
 
+    (ASLv2) Couchbase Java SDK
+      The following NOTICE information applies:
+        Couchbase Java SDK
+        Copyright 2014 Couchbase, Inc.
+
+    (ASLv2) RxJava
+      The following NOTICE information applies:
+        Couchbase Java SDK
+        Copyright 2012 Netflix, Inc.
+
 ************************
 Common Development and Distribution License 1.1
 ************************

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index f162588..de4fdcb 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -227,6 +227,11 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-image-nar</artifactId>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-couchbase-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
 
     <properties>

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
new file mode 100644
index 0000000..4f58d1f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-nar/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-couchbase-bundle</artifactId>
+        <version>0.3.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-couchbase-nar</artifactId>
+    <version>0.3.1-SNAPSHOT</version>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-couchbase-processors</artifactId>
+            <version>0.3.1-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
new file mode 100644
index 0000000..33b0baa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml
@@ -0,0 +1,208 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-couchbase-bundle</artifactId>
+        <version>0.3.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-couchbase-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+               <dependency>
+                       <groupId>com.couchbase.client</groupId>
+                       <artifactId>java-client</artifactId>
+                       <version>2.2.0</version>
+               </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+               <dependency>
+                       <groupId>org.apache.nifi</groupId>
+                       <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+               </dependency>
+    </dependencies>
+       <build>
+               <pluginManagement>
+                       <plugins>
+                               <plugin>
+                                       
<groupId>org.apache.maven.plugins</groupId>
+                                       
<artifactId>maven-checkstyle-plugin</artifactId>
+                                       <version>2.15</version>
+                                       <dependencies>
+                                               <dependency>
+                                                       
<groupId>com.puppycrawl.tools</groupId>
+                                                       
<artifactId>checkstyle</artifactId>
+                                                       <version>6.5</version>
+                                               </dependency>
+                                       </dependencies>
+                               </plugin>
+                       </plugins>
+               </pluginManagement>
+               <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <configuration>
+                    <checkstyleRules>
+                        <module name="Checker">
+                            <property name="charset" value="UTF-8" />
+                            <property name="severity" value="warning" />
+                            <!-- Checks for whitespace -->
+                            <!-- See 
http://checkstyle.sf.net/config_whitespace.html -->
+                            <module name="FileTabCharacter">
+                                <property name="eachLine" value="true" />
+                            </module>
+                            <module name="TreeWalker">
+                                <module name="RegexpSinglelineJava">
+                                    <property name="format" value="\s+$" />
+                                    <property name="message" value="Line has 
trailing whitespace." />
+                                </module>
+                                <module name="RegexpSinglelineJava">
+                                    <property name="format" 
value="[@]see\s+[{][@]link" />
+                                    <property name="message" value="Javadoc 
@see does not need @link: pick one or the other." />
+                                </module>
+                                <module name="OuterTypeFilename" />
+                                <module name="LineLength">
+                                    <!-- needs extra, because Eclipse 
formatter ignores the ending left
+                                        brace -->
+                                    <property name="max" value="200" />
+                                    <property name="ignorePattern" 
value="^package.*|^import.*|a href|href|http://|https://|ftp://"; />
+                                </module>
+                                <module name="AvoidStarImport" />
+                                <module name="UnusedImports">
+                                    <property name="processJavadoc" 
value="true" />
+                                </module>
+                                 <module name="NoLineWrap" />
+                                <module name="LeftCurly">
+                                    <property name="maxLineLength" value="160" 
/>
+                                </module>
+                                <module name="RightCurly" />
+                                <module name="RightCurly">
+                                    <property name="option" value="alone" />
+                                    <property name="tokens" value="CLASS_DEF, 
METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, 
INSTANCE_INIT" />
+                                </module>
+                                <module name="SeparatorWrap">
+                                    <property name="tokens" value="DOT" />
+                                    <property name="option" value="nl" />
+                                </module>
+                                <module name="SeparatorWrap">
+                                    <property name="tokens" value="COMMA" />
+                                    <property name="option" value="EOL" />
+                                </module>
+                                <module name="PackageName">
+                                    <property name="format" 
value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" />
+                                </module>
+                                <module name="MethodTypeParameterName">
+                                    <property name="format" 
value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" />
+                                </module>
+                                <module name="MethodParamPad" />
+                                <module name="OperatorWrap">
+                                    <property name="option" value="NL" />
+                                    <property name="tokens" value="BAND, BOR, 
BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, 
MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " />
+                                </module>
+                                 <module name="AnnotationLocation">
+                                    <property name="tokens" value="CLASS_DEF, 
INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" />
+                                </module>
+                                <module name="AnnotationLocation">
+                                    <property name="tokens" 
value="VARIABLE_DEF" />
+                                    <property 
name="allowSamelineMultipleAnnotations" value="true" />
+                                </module>
+                                <module name="NonEmptyAtclauseDescription" />
+                                <module name="JavadocMethod">
+                                    <property name="allowMissingJavadoc" 
value="true" />
+                                    <property name="allowMissingParamTags" 
value="true" />
+                                    <property name="allowMissingThrowsTags" 
value="true" />
+                                    <property name="allowMissingReturnTag" 
value="true" />
+                                    <property name="allowedAnnotations" 
value="Override,Test,BeforeClass,AfterClass,Before,After" />
+                                    <property 
name="allowThrowsTagsForSubclasses" value="true" />
+                                </module>
+                                <module name="SingleLineJavadoc" />
+                             </module>
+                        </module>
+                    </checkstyleRules>
+                    <violationSeverity>warning</violationSeverity>
+                    
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                </configuration>
+                       </plugin>
+               </plugins>
+       </build>
+    <profiles>
+        <profile>
+            <!-- Checks style and licensing requirements. This is a good idea 
to run
+                for contributions and for the release process. While it would 
be nice to
+                run always these plugins can considerably slow the build and 
have proven
+                to create unstable builds in our multi-module project and when 
building using
+                multiple threads. The stability issues seen with Checkstyle in 
multi-module
+                builds include false-positives and false negatives. -->
+            <id>contrib-check</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.rat</groupId>
+                        <artifactId>apache-rat-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <goals>
+                                    <goal>check</goal>
+                                </goals>
+                                <phase>verify</phase>
+                            </execution>
+                        </executions>
+                    </plugin>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-checkstyle-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <id>check-style</id>
+                                <goals>
+                                    <goal>check</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
new file mode 100644
index 0000000..a4d69fc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
+
+/**
+ * Couchbase related attribute keys.
+ */
+public enum CouchbaseAttributes implements FlowFileAttributeKey {
+
+    /**
+     * A reference to the related cluster.
+     */
+    Cluster("couchbase.cluster"),
+    /**
+     * A related bucket name.
+     */
+    Bucket("couchbase.bucket"),
+    /**
+     * The id of a related document.
+     */
+    DocId("couchbase.doc.id"),
+    /**
+     * The CAS value of a related document.
+     */
+    Cas("couchbase.doc.cas"),
+    /**
+     * The expiration of a related document.
+     */
+    Expiry("couchbase.doc.expiry"),
+    ;
+
+    private final String key;
+
+    private CouchbaseAttributes(final String key) {
+        this.key = key;
+    }
+
+    @Override
+    public String key() {
+        return key;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java
 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java
new file mode 100644
index 0000000..fcf72d5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterControllerService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.controller.ControllerService;
+
+import com.couchbase.client.java.Bucket;
+
+/**
+ * Provides a connection to a Couchbase Server cluster throughout a NiFi Data
+ * flow.
+ */
+@CapabilityDescription("Provides a centralized Couchbase connection.")
+public interface CouchbaseClusterControllerService extends ControllerService {
+
+    /**
+     * Open a bucket connection.
+     * @param bucketName the bucket name to access
+     * @return a connected bucket instance
+     */
+    public Bucket openBucket(String bucketName);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java
 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java
new file mode 100644
index 0000000..7daa97c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseClusterService.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.couchbase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+
+import com.couchbase.client.core.CouchbaseException;
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.CouchbaseCluster;
+
+/**
+ * Provides a centralized Couchbase connection and bucket passwords management.
+ */
+@CapabilityDescription("Provides a centralized Couchbase connection and bucket 
passwords management."
+        + " Bucket passwords can be specified via dynamic properties.")
+@Tags({ "nosql", "couchbase", "database", "connection" })
+@DynamicProperty(name = "Bucket Password for BUCKET_NAME", value = "bucket 
password", description = "Specify bucket password if neseccery.")
+public class CouchbaseClusterService extends AbstractControllerService 
implements CouchbaseClusterControllerService {
+
+    public static final PropertyDescriptor CONNECTION_STRING = new 
PropertyDescriptor
+            .Builder().name("Connection String")
+            .description("The hostnames or ip addresses of the bootstraping 
nodes and optional parameters."
+                    + " Syntax) 
couchbase://node1,node2,nodeN?param1=value1&param2=value2&paramN=valueN")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    private static final List<PropertyDescriptor> properties;
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(CONNECTION_STRING);
+
+        properties = Collections.unmodifiableList(props);
+    }
+
+    private static final String DYNAMIC_PROP_BUCKET_PASSWORD = "Bucket 
Password for ";
+    private static final Map<String, String> bucketPasswords = new HashMap<>();
+
+    private volatile CouchbaseCluster cluster;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(
+            String propertyDescriptorName) {
+        if(propertyDescriptorName.startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){
+            return new PropertyDescriptor
+                    .Builder().name(propertyDescriptorName)
+                    .description("Bucket password.")
+                    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                    .dynamic(true)
+                    .sensitive(true)
+                    .build();
+        }
+        return null;
+    }
+
+
+    /**
+     * Establish a connection to a Couchbase cluster.
+     * @param context the configuration context
+     * @throws InitializationException if unable to connect a Couchbase cluster
+     */
+    @OnEnabled
+    public void onConfigured(final ConfigurationContext context) throws 
InitializationException {
+
+        for(PropertyDescriptor p : context.getProperties().keySet()){
+            if(p.isDynamic() && 
p.getName().startsWith(DYNAMIC_PROP_BUCKET_PASSWORD)){
+                String bucketName = 
p.getName().substring(DYNAMIC_PROP_BUCKET_PASSWORD.length());
+                String password = context.getProperty(p).getValue();
+                bucketPasswords.put(bucketName, password);
+            }
+        }
+        try {
+            cluster = 
CouchbaseCluster.fromConnectionString(context.getProperty(CONNECTION_STRING).getValue());
+        } catch(CouchbaseException e) {
+            throw new InitializationException(e);
+        }
+    }
+
+    @Override
+    public Bucket openBucket(String bucketName){
+        return cluster.openBucket(bucketName, bucketPasswords.get(bucketName));
+    }
+
+    /**
+     * Disconnect from the Couchbase cluster.
+     */
+    @OnDisabled
+    public void shutdown() {
+        if(cluster != null){
+            cluster.disconnect();
+            cluster = null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
new file mode 100644
index 0000000..d370728
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.couchbase.client.java.Bucket;
+
+/**
+ * Provides common functionalities for Couchbase processors.
+ */
+public abstract class AbstractCouchbaseProcessor extends AbstractProcessor {
+
+    public static final PropertyDescriptor DOCUMENT_TYPE = new 
PropertyDescriptor
+            .Builder().name("Document Type")
+            .description("The type of contents.")
+            .required(true)
+            .allowableValues(DocumentType.values())
+            .defaultValue(DocumentType.Json.toString())
+            .build();
+
+    public static final PropertyDescriptor DOC_ID = new PropertyDescriptor
+            .Builder().name("Static Document Id")
+            .description("A static, fixed Couchbase document id.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor DOC_ID_EXP = new PropertyDescriptor
+            .Builder().name("Document Id Expression")
+            .description("An expression to construct the Couchbase document 
id."
+                    + " If 'Static Document Id' is specified, then 'Static 
Document Id' is used.")
+            .required(false)
+            .expressionLanguageSupported(true)
+            
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .build();
+
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles that are written to Couchbase Server 
are routed to this relationship.")
+            .build();
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("The original input file will be routed to this 
destination when it has been successfully processed.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("All FlowFiles that cannot written to Couchbase 
Server are routed to this relationship.")
+            .build();
+
+    public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new 
PropertyDescriptor
+            .Builder().name("Couchbase Cluster Controller Service")
+            .description("A Couchbase Cluster Controller Service which manages 
connections to a Couchbase cluster.")
+            .required(true)
+            
.identifiesControllerService(CouchbaseClusterControllerService.class)
+            .build();
+
+    public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor
+            .Builder().name("Bucket Name")
+            .description("The name of bucket to access.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("default")
+            .build();
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    private CouchbaseClusterControllerService clusterService;
+
+    @Override
+    protected final void init(final ProcessorInitializationContext context) {
+
+        final List<PropertyDescriptor> descriptors = new 
ArrayList<PropertyDescriptor>();
+        descriptors.add(COUCHBASE_CLUSTER_SERVICE);
+        descriptors.add(BUCKET_NAME);
+        addSupportedProperties(descriptors);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<Relationship>();
+        addSupportedRelationships(relationships);
+        this.relationships = Collections.unmodifiableSet(relationships);
+
+    }
+
+    /**
+     * Add processor specific properties.
+     * @param descriptors add properties to this list
+     */
+    protected void addSupportedProperties(List<PropertyDescriptor> 
descriptors) {
+        return;
+    }
+
+    /**
+     * Add processor specific relationships.
+     * @param relationships add relationships to this list
+     */
+    protected void addSupportedRelationships(Set<Relationship> relationships) {
+        return;
+    }
+
+    @Override
+    public final Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    private CouchbaseClusterControllerService getClusterService(final 
ProcessContext context) {
+        if(clusterService == null){
+            synchronized(AbstractCouchbaseProcessor.class){
+                if(clusterService == null){
+                    clusterService = 
context.getProperty(COUCHBASE_CLUSTER_SERVICE)
+                            
.asControllerService(CouchbaseClusterControllerService.class);
+                }
+            }
+        }
+
+        return clusterService;
+    }
+
+    /**
+     * Open a bucket connection using a CouchbaseClusterControllerService.
+     * @param context a process context
+     * @return a bucket instance
+     */
+    protected final Bucket openBucket(final ProcessContext context) {
+        return 
getClusterService(context).openBucket(context.getProperty(BUCKET_NAME).getValue());
+    }
+
+    /**
+     * Generate a transit url.
+     * @param context a process context
+     * @return a transit url based on the bucket name and the 
CouchbaseClusterControllerService name
+     */
+    protected String getTransitUrl(final ProcessContext context) {
+        return new StringBuilder(context.getProperty(BUCKET_NAME).getValue())
+            .append('@')
+            .append(context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue())
+            .toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java
 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java
new file mode 100644
index 0000000..81dd465
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/DocumentType.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+
+/**
+ * Supported Couchbase document types.
+ *
+ * In order to handle a variety type of document classes such as JsonDocument,
+ * JsonLongDocument or JsonStringDocument, Couchbase processors use
+ * RawJsonDocument for Json type.
+ *
+ * The distinction between Json and Binary exists because BinaryDocument 
doesn't
+ * set Json flag when it stored on Couchbase Server even if the content byte
+ * array represents a Json string, and it can't be retrieved as a Json 
document.
+ */
+public enum DocumentType {
+
+    Json,
+    Binary
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
new file mode 100644
index 0000000..6d9a476
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.couchbase.CouchbaseAttributes;
+import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.document.BinaryDocument;
+import com.couchbase.client.java.document.Document;
+import com.couchbase.client.java.document.RawJsonDocument;
+
+@Tags({ "nosql", "couchbase", "database", "get" })
+@CapabilityDescription("Get a document from Couchbase Server via Key/Value 
access.")
+@SeeAlso({CouchbaseClusterControllerService.class})
+@ReadsAttributes({
+    @ReadsAttribute(attribute = "FlowFile content", description = "Used as a 
document id if none of 'Static Document Id' or 'Document Id Expression' is 
specified"),
+    @ReadsAttribute(attribute = "*", description = "Any attribute can be used 
as part of a document id by 'Document Id Excepression.")
+    })
+@WritesAttributes({
+    @WritesAttribute(attribute="couchbase.cluster", description="Cluster where 
the document was retrieved from."),
+    @WritesAttribute(attribute="couchbase.bucket", description="Bucket where 
the document was retrieved from."),
+    @WritesAttribute(attribute="couchbase.doc.id", description="Id of the 
document."),
+    @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the 
document."),
+    @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration 
of the document.")
+    })
+public class GetCouchbaseKey extends AbstractCouchbaseProcessor {
+
+    @Override
+    protected void addSupportedProperties(List<PropertyDescriptor> 
descriptors) {
+        descriptors.add(DOCUMENT_TYPE);
+        descriptors.add(DOC_ID);
+        descriptors.add(DOC_ID_EXP);
+    }
+
+    @Override
+    protected void addSupportedRelationships(Set<Relationship> relationships) {
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_ORIGINAL);
+        relationships.add(REL_FAILURE);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final ProcessorLog logger = getLogger();
+        FlowFile inFile = session.get();
+
+        String docId = null;
+        if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
+            docId = context.getProperty(DOC_ID).getValue();
+        } else {
+            // Otherwise docId has to be extracted from inFile.
+            if ( inFile == null ) {
+                return;
+            }
+            
if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){
+                docId = 
context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(inFile).getValue();
+            } else {
+                final byte[] content = new byte[(int) inFile.getSize()];
+                session.read(inFile, new InputStreamCallback() {
+                    @Override
+                    public void process(final InputStream in) throws 
IOException {
+                        StreamUtils.fillBuffer(in, content, true);
+                    }
+                });
+                docId = new String(content, StandardCharsets.UTF_8);
+            }
+        }
+
+        if(StringUtils.isEmpty(docId)){
+            logger.error("Couldn't get document id from from {}", new 
Object[]{inFile});
+            session.transfer(inFile, REL_FAILURE);
+        }
+
+        try {
+            Document<?> doc = null;
+            byte[] content = null;
+            Bucket bucket = openBucket(context);
+            DocumentType documentType = 
DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
+            switch (documentType){
+                case Json : {
+                    RawJsonDocument document = bucket.get(docId, 
RawJsonDocument.class);
+                    if(document != null){
+                        content = 
document.content().getBytes(StandardCharsets.UTF_8);
+                        doc = document;
+                    }
+                    break;
+                }
+                case Binary : {
+                    BinaryDocument document = bucket.get(docId, 
BinaryDocument.class);
+                    if(document != null){
+                        content = document.content().array();
+                        doc = document;
+                    }
+                    break;
+                }
+            }
+
+            if(doc == null) {
+                logger.info("Document {} was not found in {}", new 
Object[]{docId, getTransitUrl(context)});
+                if(inFile != null){
+                    session.transfer(inFile, REL_FAILURE);
+                }
+                return;
+            }
+
+            if(inFile != null){
+                session.transfer(inFile, REL_ORIGINAL);
+            }
+
+            FlowFile outFile = session.create();
+            outFile = session.importFrom(new ByteArrayInputStream(content), 
outFile);
+            Map<String, String> updatedAttrs = new HashMap<>();
+            updatedAttrs.put(CouchbaseAttributes.Cluster.key(), 
context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
+            updatedAttrs.put(CouchbaseAttributes.Bucket.key(), 
context.getProperty(BUCKET_NAME).getValue());
+            updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
+            updatedAttrs.put(CouchbaseAttributes.Cas.key(), 
String.valueOf(doc.cas()));
+            updatedAttrs.put(CouchbaseAttributes.Expiry.key(), 
String.valueOf(doc.expiry()));
+            outFile = session.putAllAttributes(outFile, updatedAttrs);
+            session.getProvenanceReporter().receive(outFile, 
getTransitUrl(context));
+            session.transfer(outFile, REL_SUCCESS);
+
+        } catch (Throwable t){
+            logger.error("Getting docuement {} from Couchbase Server using {} 
failed due to {}",
+                    new Object[]{docId, inFile, t}, t);
+            if(inFile != null){
+                session.transfer(inFile, REL_FAILURE);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
new file mode 100644
index 0000000..6bfa480
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.couchbase.CouchbaseAttributes;
+import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+import com.couchbase.client.deps.io.netty.buffer.Unpooled;
+import com.couchbase.client.java.PersistTo;
+import com.couchbase.client.java.ReplicateTo;
+import com.couchbase.client.java.document.BinaryDocument;
+import com.couchbase.client.java.document.Document;
+import com.couchbase.client.java.document.RawJsonDocument;
+
+@Tags({ "nosql", "couchbase", "database", "put" })
+@CapabilityDescription("Put a document to Couchbase Server via Key/Value 
access.")
+@SeeAlso({CouchbaseClusterControllerService.class})
+@ReadsAttributes({
+    @ReadsAttribute(attribute = "uuid", description = "Used as a document id 
if none of 'Static Document Id' or 'Document Id Expression' is specified"),
+    @ReadsAttribute(attribute = "*", description = "Any attribute can be used 
as part of a document id by 'Document Id Excepression.")
+    })
+@WritesAttributes({
+    @WritesAttribute(attribute="couchbase.cluster", description="Cluster where 
the document was stored."),
+    @WritesAttribute(attribute="couchbase.bucket", description="Bucket where 
the document was stored."),
+    @WritesAttribute(attribute="couchbase.doc.id", description="Id of the 
document."),
+    @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the 
document."),
+    @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration 
of the document.")
+    })
+public class PutCouchbaseKey extends AbstractCouchbaseProcessor {
+
+
+    public static final PropertyDescriptor PERSIST_TO = new PropertyDescriptor
+            .Builder().name("Persist To")
+            .description("Durability constraint about disk persistence.")
+            .required(true)
+            .allowableValues(PersistTo.values())
+            .defaultValue(PersistTo.NONE.toString())
+            .build();
+
+    public static final PropertyDescriptor REPLICATE_TO = new 
PropertyDescriptor
+            .Builder().name("Replicate To")
+            .description("Durability constraint about replication.")
+            .required(true)
+            .allowableValues(ReplicateTo.values())
+            .defaultValue(ReplicateTo.NONE.toString())
+            .build();
+
+    @Override
+    protected void addSupportedProperties(List<PropertyDescriptor> 
descriptors) {
+        descriptors.add(DOCUMENT_TYPE);
+        descriptors.add(DOC_ID);
+        descriptors.add(DOC_ID_EXP);
+        descriptors.add(PERSIST_TO);
+        descriptors.add(REPLICATE_TO);
+    }
+
+    @Override
+    protected void addSupportedRelationships(Set<Relationship> relationships) {
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final ProcessorLog logger = getLogger();
+        FlowFile flowFile = session.get();
+        if ( flowFile == null ) {
+            return;
+        }
+
+        try {
+
+            final byte[] content = new byte[(int) flowFile.getSize()];
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    StreamUtils.fillBuffer(in, content, true);
+                }
+            });
+
+
+            String docId = 
String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key()));
+            if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){
+                docId = context.getProperty(DOC_ID).getValue();
+            } else 
if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){
+                docId = 
context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(flowFile).getValue();
+            }
+
+            Document<?> doc = null;
+            DocumentType documentType = 
DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue());
+            switch (documentType){
+                case Json : {
+                    doc = RawJsonDocument.create(docId, new String(content, 
StandardCharsets.UTF_8));
+                    break;
+                }
+                case Binary : {
+                    ByteBuf buf = Unpooled.copiedBuffer(content);
+                    doc = BinaryDocument.create(docId, buf);
+                    break;
+                }
+            }
+
+
+            PersistTo persistTo = 
PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue());
+            ReplicateTo replicateTo = 
ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue());
+            doc = openBucket(context).upsert(doc, persistTo, replicateTo);
+            Map<String, String> updatedAttrs = new HashMap<>();
+            updatedAttrs.put(CouchbaseAttributes.Cluster.key(), 
context.getProperty(COUCHBASE_CLUSTER_SERVICE).getValue());
+            updatedAttrs.put(CouchbaseAttributes.Bucket.key(), 
context.getProperty(BUCKET_NAME).getValue());
+            updatedAttrs.put(CouchbaseAttributes.DocId.key(), docId);
+            updatedAttrs.put(CouchbaseAttributes.Cas.key(), 
String.valueOf(doc.cas()));
+            updatedAttrs.put(CouchbaseAttributes.Expiry.key(), 
String.valueOf(doc.expiry()));
+            flowFile = session.putAllAttributes(flowFile, updatedAttrs);
+            session.getProvenanceReporter().send(flowFile, 
getTransitUrl(context));
+            session.transfer(flowFile, REL_SUCCESS);
+
+        } catch (Throwable t) {
+            logger.error("Writing {} into Couchbase Server failed due to {}", 
new Object[]{flowFile, t}, t);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..e5e3ea7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.couchbase.CouchbaseClusterService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..1304435
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.couchbase.GetCouchbaseKey
+org.apache.nifi.processors.couchbase.PutCouchbaseKey
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
new file mode 100644
index 0000000..d96b1c2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
+import org.apache.nifi.couchbase.CouchbaseClusterService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TestCouchbaseClusterService {
+
+    private static final String SERVICE_ID = "couchbaseClusterService";
+    private TestRunner testRunner;
+
+    @Before
+    public void init() throws Exception {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey",
 "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.CouchbaseClusterService",
 "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.couchbase.TestCouchbaseClusterService",
 "debug");
+
+        testRunner = TestRunners.newTestRunner(PutCouchbaseKey.class);
+        testRunner.setValidateExpressionUsage(false);
+    }
+
+    @Test
+    public void testConnectionFailure() throws InitializationException {
+        String connectionString = "couchbase://invalid-hostname";
+        CouchbaseClusterControllerService service = new 
CouchbaseClusterService();
+        testRunner.addControllerService(SERVICE_ID, service);
+        testRunner.setProperty(service, 
CouchbaseClusterService.CONNECTION_STRING, connectionString);
+        try {
+            testRunner.enableControllerService(service);
+            Assert.fail("The service shouldn't be enabled when it couldn't 
connect to a cluster.");
+        } catch (AssertionError e) {
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
new file mode 100644
index 0000000..4ea4dff
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME;
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE;
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOCUMENT_TYPE;
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID;
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP;
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE;
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_ORIGINAL;
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.couchbase.CouchbaseAttributes;
+import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.couchbase.client.core.ServiceNotAvailableException;
+import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
+import com.couchbase.client.deps.io.netty.buffer.Unpooled;
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.document.BinaryDocument;
+import com.couchbase.client.java.document.RawJsonDocument;
+
+
+public class TestGetCouchbaseKey {
+
+    private static final String SERVICE_ID = "couchbaseClusterService";
+    private TestRunner testRunner;
+
+    @Before
+    public void init() throws Exception {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.GetCouchbaseKey",
 "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.TestGetCouchbaseKey",
 "debug");
+
+        testRunner = TestRunners.newTestRunner(GetCouchbaseKey.class);
+        testRunner.setValidateExpressionUsage(false);
+    }
+
+    private void setupMockBucket(Bucket bucket) throws InitializationException 
{
+        CouchbaseClusterControllerService service = 
mock(CouchbaseClusterControllerService.class);
+        when(service.getIdentifier()).thenReturn(SERVICE_ID);
+        when(service.openBucket(anyString())).thenReturn(bucket);
+        testRunner.addControllerService(SERVICE_ID, service);
+        testRunner.enableControllerService(service);
+        testRunner.setProperty(COUCHBASE_CLUSTER_SERVICE, SERVICE_ID);
+    }
+
+    @Test
+    public void testStaticDocId() throws Exception {
+        String bucketName = "bucket-1";
+        String docId = "doc-a";
+
+        Bucket bucket = mock(Bucket.class);
+        String content = "{\"key\":\"value\"}";
+        int expiry = 100;
+        long cas = 200L;
+        when(bucket.get(docId, 
RawJsonDocument.class)).thenReturn(RawJsonDocument.create(docId, expiry, 
content, cas));
+        setupMockBucket(bucket);
+
+        testRunner.setProperty(BUCKET_NAME, bucketName);
+        testRunner.setProperty(DOC_ID, docId);
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+
+        outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), 
SERVICE_ID);
+        outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), 
bucketName);
+        outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId);
+        outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), 
String.valueOf(cas));
+        outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), 
String.valueOf(expiry));
+    }
+
+
+    /**
+     * Use static document id even if doc id expression is set.
+     */
+    @Test
+    public void testStaticDocIdAndDocIdExp() throws Exception {
+        String docId = "doc-a";
+        String docIdExp = "${someProperty}";
+
+        Bucket bucket = mock(Bucket.class);
+        String content = "{\"key\":\"value\"}";
+        when(bucket.get(docId, 
RawJsonDocument.class)).thenReturn(RawJsonDocument.create(docId, content));
+        setupMockBucket(bucket);
+
+        testRunner.setProperty(DOC_ID, docId);
+        testRunner.setProperty(DOC_ID_EXP, docIdExp);
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+    }
+
+    @Test
+    public void testDocIdExp() throws Exception {
+        String docIdExp = "${'someProperty'}";
+        String somePropertyValue = "doc-p";
+
+        Bucket bucket = mock(Bucket.class);
+        String content = "{\"key\":\"value\"}";
+        when(bucket.get(somePropertyValue, RawJsonDocument.class))
+            .thenReturn(RawJsonDocument.create(somePropertyValue, content));
+        setupMockBucket(bucket);
+
+        testRunner.setProperty(DOC_ID_EXP, docIdExp);
+
+        byte[] inFileData = "input FlowFile 
data".getBytes(StandardCharsets.UTF_8);
+        Map<String, String> properties = new HashMap<>();
+        properties.put("someProperty", somePropertyValue);
+        testRunner.enqueue(inFileData, properties);
+        testRunner.run();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_ORIGINAL, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+    }
+
+    @Test
+    public void testInputFlowFileContent() throws Exception {
+
+        Bucket bucket = mock(Bucket.class);
+        String inFileDataStr = "doc-in";
+        String content = "{\"key\":\"value\"}";
+        when(bucket.get(inFileDataStr, RawJsonDocument.class))
+            .thenReturn(RawJsonDocument.create(inFileDataStr, content));
+        setupMockBucket(bucket);
+
+
+        byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8);
+        testRunner.enqueue(inFileData);
+        testRunner.run();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_ORIGINAL, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+    }
+
+    @Test
+    public void testBinaryDocument() throws Exception {
+
+        Bucket bucket = mock(Bucket.class);
+        String inFileDataStr = "doc-in";
+        String content = "binary";
+        ByteBuf buf = 
Unpooled.copiedBuffer(content.getBytes(StandardCharsets.UTF_8));
+        when(bucket.get(inFileDataStr, BinaryDocument.class))
+            .thenReturn(BinaryDocument.create(inFileDataStr, buf));
+        setupMockBucket(bucket);
+
+
+        byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8);
+        testRunner.enqueue(inFileData);
+        testRunner.setProperty(DOCUMENT_TYPE, DocumentType.Binary.toString());
+        testRunner.run();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_ORIGINAL, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+    }
+
+
+    @Test
+    public void testCouchbaseFailure() throws Exception {
+
+        Bucket bucket = mock(Bucket.class);
+        String inFileDataStr = "doc-in";
+        when(bucket.get(inFileDataStr, RawJsonDocument.class))
+            .thenThrow(new ServiceNotAvailableException());
+        setupMockBucket(bucket);
+
+
+        byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8);
+        testRunner.enqueue(inFileData);
+        testRunner.run();
+
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_ORIGINAL, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 1);
+        MockFlowFile outFile = 
testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
+        outFile.assertContentEquals(inFileDataStr);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
new file mode 100644
index 0000000..3995528
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.couchbase;
+
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME;
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE;
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID;
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP;
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE;
+import static 
org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.couchbase.CouchbaseAttributes;
+import org.apache.nifi.couchbase.CouchbaseClusterControllerService;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.PersistTo;
+import com.couchbase.client.java.ReplicateTo;
+import com.couchbase.client.java.document.RawJsonDocument;
+import com.couchbase.client.java.error.DurabilityException;
+
+
+public class TestPutCouchbaseKey {
+
+    private static final String SERVICE_ID = "couchbaseClusterService";
+    private TestRunner testRunner;
+
+    @Before
+    public void init() throws Exception {
+        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
+        System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.PutCouchbaseKey",
 "debug");
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.couchbase.TestPutCouchbaseKey",
 "debug");
+
+        testRunner = TestRunners.newTestRunner(PutCouchbaseKey.class);
+        testRunner.setValidateExpressionUsage(false);
+    }
+
+    private void setupMockBucket(Bucket bucket) throws InitializationException 
{
+        CouchbaseClusterControllerService service = 
mock(CouchbaseClusterControllerService.class);
+        when(service.getIdentifier()).thenReturn(SERVICE_ID);
+        when(service.openBucket(anyString())).thenReturn(bucket);
+        testRunner.addControllerService(SERVICE_ID, service);
+        testRunner.enableControllerService(service);
+        testRunner.setProperty(COUCHBASE_CLUSTER_SERVICE, SERVICE_ID);
+    }
+
+    @Test
+    public void testStaticDocId() throws Exception {
+        String bucketName = "bucket-1";
+        String docId = "doc-a";
+        int expiry = 100;
+        long cas = 200L;
+
+        String inFileData = "{\"key\":\"value\"}";
+        byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
+
+        Bucket bucket = mock(Bucket.class);
+        when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), 
eq(ReplicateTo.NONE)))
+            .thenReturn(RawJsonDocument.create(docId, expiry, inFileData, 
cas));
+        setupMockBucket(bucket);
+
+        testRunner.enqueue(inFileDataBytes);
+        testRunner.setProperty(BUCKET_NAME, bucketName);
+        testRunner.setProperty(DOC_ID, docId);
+        testRunner.run();
+
+        verify(bucket, times(1)).upsert(any(RawJsonDocument.class), 
eq(PersistTo.NONE), eq(ReplicateTo.NONE));
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(inFileData);
+        outFile.assertAttributeEquals(CouchbaseAttributes.Cluster.key(), 
SERVICE_ID);
+        outFile.assertAttributeEquals(CouchbaseAttributes.Bucket.key(), 
bucketName);
+        outFile.assertAttributeEquals(CouchbaseAttributes.DocId.key(), docId);
+        outFile.assertAttributeEquals(CouchbaseAttributes.Cas.key(), 
String.valueOf(cas));
+        outFile.assertAttributeEquals(CouchbaseAttributes.Expiry.key(), 
String.valueOf(expiry));
+    }
+
+    @Test
+    public void testDurabilityConstraint() throws Exception {
+        String docId = "doc-a";
+
+        String inFileData = "{\"key\":\"value\"}";
+        byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
+
+        Bucket bucket = mock(Bucket.class);
+        when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.MASTER), 
eq(ReplicateTo.ONE)))
+            .thenReturn(RawJsonDocument.create(docId, inFileData));
+        setupMockBucket(bucket);
+
+        testRunner.enqueue(inFileDataBytes);
+        testRunner.setProperty(DOC_ID, docId);
+        testRunner.setProperty(PutCouchbaseKey.PERSIST_TO, 
PersistTo.MASTER.toString());
+        testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, 
ReplicateTo.ONE.toString());
+        testRunner.run();
+
+        verify(bucket, times(1)).upsert(any(RawJsonDocument.class), 
eq(PersistTo.MASTER), eq(ReplicateTo.ONE));
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(inFileData);
+    }
+
+    /**
+     * Use static document id even if doc id expression is set.
+     */
+    @Test
+    public void testStaticDocIdAndDocIdExp() throws Exception {
+        String docId = "doc-a";
+        String docIdExp = "${someProperty}";
+
+        String inFileData = "{\"key\":\"value\"}";
+        byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
+
+        Bucket bucket = mock(Bucket.class);
+        when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), 
eq(ReplicateTo.NONE)))
+            .thenReturn(RawJsonDocument.create(docId, inFileData));
+        setupMockBucket(bucket);
+
+        testRunner.enqueue(inFileDataBytes);
+        testRunner.setProperty(DOC_ID, docId);
+        testRunner.setProperty(DOC_ID_EXP, docIdExp);
+        testRunner.run();
+
+        verify(bucket, times(1)).upsert(any(RawJsonDocument.class), 
eq(PersistTo.NONE), eq(ReplicateTo.NONE));
+
+        testRunner.assertAllFlowFilesTransferred(REL_SUCCESS);
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(inFileData);
+    }
+
+    @Test
+    public void testDocIdExp() throws Exception {
+        String docIdExp = "${'someProperty'}";
+        String somePropertyValue = "doc-p";
+
+        String inFileData = "{\"key\":\"value\"}";
+        byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
+
+        Bucket bucket = mock(Bucket.class);
+        when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), 
eq(ReplicateTo.NONE)))
+            .thenReturn(RawJsonDocument.create(somePropertyValue, inFileData));
+        setupMockBucket(bucket);
+
+        testRunner.setProperty(DOC_ID_EXP, docIdExp);
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("someProperty", somePropertyValue);
+        testRunner.enqueue(inFileDataBytes, properties);
+        testRunner.run();
+
+        verify(bucket, times(1)).upsert(any(RawJsonDocument.class), 
eq(PersistTo.NONE), eq(ReplicateTo.NONE));
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(inFileData);
+    }
+
+    @Test
+    public void testInputFlowFileUuid() throws Exception {
+
+        String uuid = "00029362-5106-40e8-b8a9-bf2cecfbc0d7";
+        String inFileData = "{\"key\":\"value\"}";
+        byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
+
+        Bucket bucket = mock(Bucket.class);
+        when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), 
eq(ReplicateTo.NONE)))
+            .thenReturn(RawJsonDocument.create(uuid, inFileData));
+        setupMockBucket(bucket);
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CoreAttributes.UUID.key(), uuid);
+        testRunner.enqueue(inFileDataBytes, properties);
+        testRunner.run();
+
+        ArgumentCaptor<RawJsonDocument> capture = 
ArgumentCaptor.forClass(RawJsonDocument.class);
+        verify(bucket, times(1)).upsert(capture.capture(), eq(PersistTo.NONE), 
eq(ReplicateTo.NONE));
+        assertEquals(uuid, capture.getValue().id());
+
+        testRunner.assertTransferCount(REL_SUCCESS, 1);
+        testRunner.assertTransferCount(REL_FAILURE, 0);
+        MockFlowFile outFile = 
testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0);
+        outFile.assertContentEquals(inFileData);
+    }
+
+
+    @Test
+    public void testCouchbaseFailure() throws Exception {
+
+        String docId = "doc-a";
+
+        String inFileData = "{\"key\":\"value\"}";
+        byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8);
+
+        Bucket bucket = mock(Bucket.class);
+        when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), 
eq(ReplicateTo.ONE)))
+            .thenThrow(new DurabilityException());
+        setupMockBucket(bucket);
+
+        testRunner.enqueue(inFileDataBytes);
+        testRunner.setProperty(DOC_ID, docId);
+        testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, 
ReplicateTo.ONE.toString());
+        testRunner.run();
+
+        verify(bucket, times(1)).upsert(any(RawJsonDocument.class), 
eq(PersistTo.NONE), eq(ReplicateTo.ONE));
+
+        testRunner.assertAllFlowFilesTransferred(REL_FAILURE);
+        testRunner.assertTransferCount(REL_SUCCESS, 0);
+        testRunner.assertTransferCount(REL_FAILURE, 1);
+        MockFlowFile outFile = 
testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0);
+        outFile.assertContentEquals(inFileData);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/nifi-couchbase-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/pom.xml 
b/nifi-nar-bundles/nifi-couchbase-bundle/pom.xml
new file mode 100644
index 0000000..3654295
--- /dev/null
+++ b/nifi-nar-bundles/nifi-couchbase-bundle/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>0.3.1-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-couchbase-bundle</artifactId>
+    <version>0.3.1-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-couchbase-processors</module>
+        <module>nifi-couchbase-nar</module>
+    </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index d51c9b6..841818a 100644
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -45,6 +45,7 @@
         <module>nifi-ambari-bundle</module>
         <module>nifi-image-bundle</module>
         <module>nifi-avro-bundle</module>
+        <module>nifi-couchbase-bundle</module>
     </modules>
     <dependencyManagement>
         <dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/2466a245/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7f68c32..1d5a857 100644
--- a/pom.xml
+++ b/pom.xml
@@ -908,6 +908,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-couchbase-nar</artifactId>
+                <version>0.3.1-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-properties</artifactId>
                 <version>0.3.1-SNAPSHOT</version>
             </dependency>

Reply via email to