Repository: nifi
Updated Branches:
  refs/heads/master 70878fe6d -> 0bb141153


NIFI-4212 - RethinkDB Delete Processor

Signed-off-by: James Wing <[email protected]>

This closes #2030.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0bb14115
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0bb14115
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0bb14115

Branch: refs/heads/master
Commit: 0bb141153282888427656d1471c35a5d83958780
Parents: 70878fe
Author: mans2singh <[email protected]>
Authored: Fri Jul 21 19:07:15 2017 -0700
Committer: James Wing <[email protected]>
Committed: Sat Jul 29 14:49:22 2017 -0700

----------------------------------------------------------------------
 .../rethinkdb/AbstractRethinkDBProcessor.java   |  31 ++-
 .../processors/rethinkdb/DeleteRethinkDB.java   | 215 ++++++++++++++++
 .../nifi/processors/rethinkdb/GetRethinkDB.java |  17 +-
 .../nifi/processors/rethinkdb/PutRethinkDB.java |  22 +-
 .../org.apache.nifi.processor.Processor         |   1 +
 .../rethinkdb/ITAbstractRethinkDBTest.java      |  55 ++++
 .../rethinkdb/ITDeleteRethinkDBTest.java        | 207 +++++++++++++++
 .../rethinkdb/ITGetRethinkDBTest.java           |  32 +--
 .../rethinkdb/ITPutRethinkDBTest.java           |  33 +--
 .../rethinkdb/TestDeleteRethinkDB.java          | 254 +++++++++++++++++++
 10 files changed, 785 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0bb14115/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java
 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java
index cfbf1bd..61ad457 100644
--- 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java
+++ 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/AbstractRethinkDBProcessor.java
@@ -18,9 +18,10 @@ package org.apache.nifi.processors.rethinkdb;
 
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -103,6 +104,30 @@ abstract class AbstractRethinkDBProcessor extends 
AbstractProcessor {
             .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
             .build();
 
+    public static final PropertyDescriptor RETHINKDB_DOCUMENT_ID = new 
PropertyDescriptor.Builder()
+                .displayName("Document Identifier")
+                .name("rethinkdb-document-identifier")
+                .description("A FlowFile attribute, or attribute expression 
used " +
+                    "for determining RethinkDB key for the Flow File content")
+                .required(true)
+                
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING,
 true))
+                .expressionLanguageSupported(true)
+                .build();
+
+    public static AllowableValue DURABILITY_SOFT = new AllowableValue("soft", 
"Soft", "Don't save changes to disk before ack");
+
+    public static AllowableValue DURABILITY_HARD = new AllowableValue("hard", 
"Hard", "Save change to disk before ack");
+
+    protected static final PropertyDescriptor DURABILITY = new 
PropertyDescriptor.Builder()
+                .name("rethinkdb-durability")
+                .displayName("Durablity of documents")
+                .description("Durability of documents being inserted")
+                .required(true)
+                .defaultValue("hard")
+                .allowableValues(DURABILITY_HARD, DURABILITY_SOFT)
+                .expressionLanguageSupported(true)
+                .build();
+
     static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
             .description("Sucessful FlowFiles are routed to this 
relationship").build();
 
@@ -122,7 +147,10 @@ abstract class AbstractRethinkDBProcessor extends 
AbstractProcessor {
     public static final String RESULT_FIRST_ERROR_KEY = "first_error";
     public static final String RESULT_WARNINGS_KEY = "warnings";
 
+    public static final String DURABILITY_OPTION_KEY = "durability";
+
     public static final String RETHINKDB_ERROR_MESSAGE = 
"rethinkdb.error.message";
+    public static final String DOCUMENT_ID_EMPTY_MESSAGE = "Document Id cannot 
be empty";
 
     protected Connection rethinkDbConnection;
     protected String databaseName;
@@ -156,7 +184,6 @@ abstract class AbstractRethinkDBProcessor extends 
AbstractProcessor {
         password = context.getProperty(PASSWORD).getValue();
         databaseName = context.getProperty(DB_NAME).getValue();
         tableName = context.getProperty(TABLE_NAME).getValue();
-        maxDocumentsSize = 
context.getProperty(MAX_DOCUMENTS_SIZE).asDataSize(DataUnit.B).longValue();
 
         try {
             rethinkDbConnection = makeConnection();

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bb14115/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/DeleteRethinkDB.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/DeleteRethinkDB.java
 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/DeleteRethinkDB.java
new file mode 100644
index 0000000..6fed9e2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/DeleteRethinkDB.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.rethinkdb;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import com.google.gson.Gson;
+
+import java.io.ByteArrayInputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@EventDriven
+@Tags({"rethinkdb", "delete", "remove"})
+@CapabilityDescription("Processor to remove a JSON document from RethinkDB 
(https://www.rethinkdb.com/) using the document id.")
+@WritesAttributes({
+    @WritesAttribute(attribute = DeleteRethinkDB.RETHINKDB_ERROR_MESSAGE, 
description = "RethinkDB error message"),
+    @WritesAttribute(attribute = 
DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY, description = "Error count 
while delete documents"),
+    @WritesAttribute(attribute = 
DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY, description = "Number of 
documents deleted"),
+    @WritesAttribute(attribute = 
DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY, description = "Number of 
documents inserted"),
+    @WritesAttribute(attribute = 
DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY, description = "Number of 
documents replaced"),
+    @WritesAttribute(attribute = 
DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY, description = "Number of 
documents skipped"),
+    @WritesAttribute(attribute = 
DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY, description = "Number of 
documents unchanged since they already existed"),
+    })
+@SeeAlso({PutRethinkDB.class,GetRethinkDB.class})
+public class DeleteRethinkDB extends AbstractRethinkDBProcessor {
+
+    public static AllowableValue RETURN_CHANGES_TRUE = new 
AllowableValue("true", "True", "Return changed document");
+    public static AllowableValue RETURN_CHANGES_FALSE = new 
AllowableValue("false", "False", "Do not return changed document");
+
+    protected static final PropertyDescriptor RETURN_CHANGES = new 
PropertyDescriptor.Builder()
+            .name("rethinkdb-return-result")
+            .displayName("Return deleted value")
+            .description("Return old value which were deleted")
+            .required(true)
+            .defaultValue(RETURN_CHANGES_TRUE.getValue())
+            .allowableValues(RETURN_CHANGES_TRUE, RETURN_CHANGES_FALSE)
+            .expressionLanguageSupported(true)
+            .build();
+
+    private static final Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    public static final String RETHINKDB_DELETE_RESULT_ERROR_KEY = 
"rethinkdb.delete.errors";
+    public static final String RETHINKDB_DELETE_RESULT_DELETED_KEY = 
"rethinkdb.delete.deleted";
+    public static final String RETHINKDB_DELETE_RESULT_INSERTED_KEY = 
"rethinkdb.delete.inserted";
+    public static final String RETHINKDB_DELETE_RESULT_REPLACED_KEY = 
"rethinkdb.delete.replaced";
+    public static final String RETHINKDB_DELETE_RESULT_SKIPPED_KEY = 
"rethinkdb.delete.skipped";
+    public static final String RETHINKDB_DELETE_RESULT_UNCHANGED_KEY = 
"rethinkdb.delete.unchanged";
+
+    public static final String RESULT_CHANGES_KEY = "changes";
+    public static final String RETURN_CHANGES_OPTION_KEY = "return_changes";
+
+    protected Gson gson = new Gson();
+
+    static {
+        final Set<Relationship> tempRelationships = new HashSet<>();
+        tempRelationships.add(REL_SUCCESS);
+        tempRelationships.add(REL_FAILURE);
+        tempRelationships.add(REL_NOT_FOUND);
+        relationships = Collections.unmodifiableSet(tempRelationships);
+
+        final List<PropertyDescriptor> tempDescriptors = new ArrayList<>();
+        tempDescriptors.add(DB_NAME);
+        tempDescriptors.add(DB_HOST);
+        tempDescriptors.add(DB_PORT);
+        tempDescriptors.add(USERNAME);
+        tempDescriptors.add(PASSWORD);
+        tempDescriptors.add(TABLE_NAME);
+        tempDescriptors.add(CHARSET);
+        tempDescriptors.add(RETHINKDB_DOCUMENT_ID);
+        tempDescriptors.add(RETURN_CHANGES);
+        tempDescriptors.add(DURABILITY);
+        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        Charset charset = 
Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
+        String id = 
context.getProperty(RETHINKDB_DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
+        String durablity = 
context.getProperty(DURABILITY).evaluateAttributeExpressions(flowFile).getValue();
+        Boolean returnChanges = 
context.getProperty(RETURN_CHANGES).evaluateAttributeExpressions(flowFile).asBoolean();
+
+        if ( StringUtils.isEmpty(id) ) {
+            getLogger().error(DOCUMENT_ID_EMPTY_MESSAGE);
+            flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, 
DOCUMENT_ID_EMPTY_MESSAGE);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        try {
+            long startTimeMillis = System.currentTimeMillis();
+            Map<String,Object> result = deleteDocument(id, durablity, 
returnChanges);
+            final long endTimeMillis = System.currentTimeMillis();
+
+            getLogger().debug("Json document {} deleted Result: {}", new 
Object[] {id, result});
+
+            flowFile = populateAttributes(session, flowFile, result);
+
+            Long deletedCount = 
((Long)result.get(RESULT_DELETED_KEY)).longValue();
+
+            if ( deletedCount == 0L ) {
+                getLogger().debug("Deleted count should be 1 but was " + 
deletedCount + " for document with id '" + id + "'");
+
+                flowFile = populateAttributes(session, flowFile, result);
+
+                flowFile = session.putAttribute(flowFile, 
RETHINKDB_ERROR_MESSAGE, "Deleted count should be 1 but was " + deletedCount + 
" for document with id '" + id + "'");
+                session.transfer(flowFile, REL_NOT_FOUND);
+                return;
+            }
+
+            if ( returnChanges ) {
+                String json = 
gson.toJson(((List)result.get(RESULT_CHANGES_KEY)).get(0));
+
+                byte [] documentBytes = json.getBytes(charset);
+
+                ByteArrayInputStream bais = new 
ByteArrayInputStream(documentBytes);
+                session.importFrom(bais, flowFile);
+
+                session.getProvenanceReporter().modifyContent(flowFile,
+                        new 
StringBuilder("rethinkdb://").append(databaseName).append("/").append(tableName).append("/").append(id).toString(),
+                        (endTimeMillis - startTimeMillis));
+            }
+
+            session.transfer(flowFile, REL_SUCCESS);
+
+
+        } catch (Exception exception) {
+            getLogger().error("Failed to delete document from RethinkDB due to 
error {}",
+                    new Object[]{exception.getLocalizedMessage()}, exception);
+            flowFile = session.putAttribute(flowFile, RETHINKDB_ERROR_MESSAGE, 
exception.getMessage());
+            session.transfer(flowFile, REL_FAILURE);
+            context.yield();
+        }
+    }
+
+    private FlowFile populateAttributes(final ProcessSession session, FlowFile 
flowFile,
+            Map<String, Object> result) {
+        Map<String,String> resultAttributes = new HashMap<>();
+        resultAttributes.put(RETHINKDB_DELETE_RESULT_ERROR_KEY, 
String.valueOf(result.get(RESULT_ERROR_KEY)));
+        resultAttributes.put(RETHINKDB_DELETE_RESULT_DELETED_KEY, 
String.valueOf(result.get(RESULT_DELETED_KEY)));
+        resultAttributes.put(RETHINKDB_DELETE_RESULT_INSERTED_KEY, 
String.valueOf(result.get(RESULT_INSERTED_KEY)));
+        resultAttributes.put(RETHINKDB_DELETE_RESULT_REPLACED_KEY, 
String.valueOf(result.get(RESULT_REPLACED_KEY)));
+        resultAttributes.put(RETHINKDB_DELETE_RESULT_SKIPPED_KEY, 
String.valueOf(result.get(RESULT_SKIPPED_KEY)));
+        resultAttributes.put(RETHINKDB_DELETE_RESULT_UNCHANGED_KEY, 
String.valueOf(result.get(RESULT_UNCHANGED_KEY)));
+        flowFile = session.putAllAttributes(flowFile, resultAttributes);
+        return flowFile;
+    }
+
+    protected Map<String,Object> deleteDocument(String id, String durablity, 
Boolean returnChanges) {
+        return 
getRdbTable().get(id).delete().optArg(DURABILITY_OPTION_KEY,durablity).optArg(RETURN_CHANGES_OPTION_KEY,
 returnChanges).run(rethinkDbConnection);
+    }
+
+    @OnStopped
+    public void close() {
+        super.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bb14115/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java
 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java
index ac31396..aaf1046 100644
--- 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java
+++ 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/GetRethinkDB.java
@@ -28,13 +28,12 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.AttributeExpression.ResultType;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
 
 import com.google.gson.Gson;
 
@@ -54,10 +53,9 @@ import java.util.Set;
 @WritesAttributes({
     @WritesAttribute(attribute = GetRethinkDB.RETHINKDB_ERROR_MESSAGE, 
description = "RethinkDB error message"),
     })
-@SeeAlso({PutRethinkDB.class})
+@SeeAlso({PutRethinkDB.class,DeleteRethinkDB.class})
 public class GetRethinkDB extends AbstractRethinkDBProcessor {
 
-    public static final String DOCUMENT_ID_EMPTY_MESSAGE = "Document Id cannot 
be empty";
     public static AllowableValue READ_MODE_SINGLE = new 
AllowableValue("single", "Single", "Read values from memory from primary 
replica (Default)");
     public static AllowableValue READ_MODE_MAJORITY = new 
AllowableValue("majority", "Majority", "Read values committed to disk on 
majority of replicas");
     public static AllowableValue READ_MODE_OUTDATED = new 
AllowableValue("outdated", "Outdated", "Read values from memory from an 
arbitrary replica ");
@@ -72,16 +70,6 @@ public class GetRethinkDB extends AbstractRethinkDBProcessor 
{
             .expressionLanguageSupported(true)
             .build();
 
-    public static final PropertyDescriptor RETHINKDB_DOCUMENT_ID = new 
PropertyDescriptor.Builder()
-            .displayName("Document Identifier")
-            .name("rethinkdb-document-identifier")
-            .description("A FlowFile attribute, or attribute expression used " 
+
-                "for determining RethinkDB key for the Flow File content")
-            .required(true)
-            
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING,
 true))
-            .expressionLanguageSupported(true)
-            .build();
-
     protected String READ_MODE_KEY = "read_mode";
 
     private static final Set<Relationship> relationships;
@@ -122,6 +110,7 @@ public class GetRethinkDB extends 
AbstractRethinkDBProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
+        maxDocumentsSize = 
context.getProperty(MAX_DOCUMENTS_SIZE).asDataSize(DataUnit.B).longValue();
         super.onScheduled(context);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bb14115/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java
 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java
index df2bf3c..ac57957 100644
--- 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java
+++ 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/java/org/apache/nifi/processors/rethinkdb/PutRethinkDB.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -62,16 +63,13 @@ import java.util.Set;
     @WritesAttribute(attribute = 
PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY, description = "First 
error while inserting documents"),
     @WritesAttribute(attribute = 
PutRethinkDB.RETHINKDB_INSERT_RESULT_WARNINGS_KEY, description = "Warning 
message in case of large number of ids being returned on insertion")
     })
-@SeeAlso({GetRethinkDB.class})
+@SeeAlso({GetRethinkDB.class,DeleteRethinkDB.class})
 public class PutRethinkDB extends AbstractRethinkDBProcessor {
 
     public static AllowableValue CONFLICT_STRATEGY_UPDATE = new 
AllowableValue("update", "Update", "Update the document having same id with new 
values");
     public static AllowableValue CONFLICT_STRATEGY_REPLACE = new 
AllowableValue("replace", "Replace", "Replace the document with having same id 
new document");
     public static AllowableValue CONFLICT_STRATEGY_ERROR = new 
AllowableValue("error", "Error", "Return error if the document with same id 
exists");
 
-    public static AllowableValue DURABILITY_SOFT = new AllowableValue("soft", 
"Soft", "Don't save document on disk before ack");
-    public static AllowableValue DURABILITY_HARD = new AllowableValue("hard", 
"Hard", "Save document on disk before ack");
-
     protected static final PropertyDescriptor CONFLICT_STRATEGY = new 
PropertyDescriptor.Builder()
             .name("rethinkdb-conflict-strategy")
             .displayName("Conflict strategy")
@@ -82,19 +80,6 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor 
{
             .expressionLanguageSupported(true)
             .build();
 
-    protected static final PropertyDescriptor DURABILITY = new 
PropertyDescriptor.Builder()
-            .name("rethinkdb-durability")
-            .displayName("Durablity of documents")
-            .description("Durability of documents being inserted")
-            .required(true)
-            .defaultValue("hard")
-            .allowableValues(DURABILITY_HARD, DURABILITY_SOFT)
-            .expressionLanguageSupported(true)
-            .build();
-
-    protected String CONFLICT_OPTION_KEY = "conflict";
-    protected String DURABILITY_OPTION_KEY = "durability";
-
     private static final Set<Relationship> relationships;
     private static final List<PropertyDescriptor> propertyDescriptors;
 
@@ -109,6 +94,8 @@ public class PutRethinkDB extends AbstractRethinkDBProcessor 
{
     public static final String RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY = 
"rethinkdb.insert.first_error";
     public static final String RETHINKDB_INSERT_RESULT_WARNINGS_KEY = 
"rethinkdb.insert.warnings";
 
+    public final String CONFLICT_OPTION_KEY = "conflict";
+
     static {
         final Set<Relationship> tempRelationships = new HashSet<>();
         tempRelationships.add(REL_SUCCESS);
@@ -141,6 +128,7 @@ public class PutRethinkDB extends 
AbstractRethinkDBProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
+        maxDocumentsSize = 
context.getProperty(MAX_DOCUMENTS_SIZE).asDataSize(DataUnit.B).longValue();
         super.onScheduled(context);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bb14115/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 2875277..0db3a56 100644
--- 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,3 +14,4 @@
 # limitations under the License.
 org.apache.nifi.processors.rethinkdb.PutRethinkDB
 org.apache.nifi.processors.rethinkdb.GetRethinkDB
+org.apache.nifi.processors.rethinkdb.DeleteRethinkDB

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bb14115/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITAbstractRethinkDBTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITAbstractRethinkDBTest.java
 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITAbstractRethinkDBTest.java
new file mode 100644
index 0000000..7131834
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITAbstractRethinkDBTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.rethinkdb;
+
+import org.apache.nifi.util.TestRunner;
+
+import com.rethinkdb.RethinkDB;
+import com.rethinkdb.net.Connection;
+
+/**
+ * Abstract base class for RethinkDB integration tests
+ */
+public class ITAbstractRethinkDBTest {
+    protected TestRunner runner;
+    protected Connection connection;
+    protected String dbName = "test";
+    protected String dbHost = "localhost";
+    protected String dbPort = "28015";
+    protected String user = "admin";
+    protected String password = "admin";
+    protected String table = "test";
+
+    public void setUp() throws Exception {
+        runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, dbName);
+        runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, dbHost);
+        runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, dbPort);
+        runner.setProperty(AbstractRethinkDBProcessor.USERNAME, user);
+        runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, password);
+        runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, table);
+        runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
+
+        connection = RethinkDB.r.connection().user(user, 
password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect();
+    }
+
+    public void tearDown() throws Exception {
+        runner = null;
+        connection.close();
+        connection = null;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bb14115/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITDeleteRethinkDBTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITDeleteRethinkDBTest.java
 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITDeleteRethinkDBTest.java
new file mode 100644
index 0000000..47869d3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITDeleteRethinkDBTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.rethinkdb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import com.rethinkdb.RethinkDB;
+
+import net.minidev.json.JSONObject;
+
+/**
+ * Integration test for deleting documents from RethinkDB. Please ensure that 
the RethinkDB is running
+ * on local host with default port and has database test with table test and 
user
+ * <code>admin</code> with password <code>admin</code> before running the 
integration tests or set the attributes in the
+ * test accordingly.
+ */
+@Ignore("Comment this out for running tests against a real instance of 
RethinkDB")
+public class ITDeleteRethinkDBTest extends ITAbstractRethinkDBTest {
+
+    @Before
+    public void setUp() throws Exception {
+        runner = TestRunners.newTestRunner(DeleteRethinkDB.class);
+        super.setUp();
+        runner.setProperty(DeleteRethinkDB.DURABILITY, "soft");
+        runner.setProperty(DeleteRethinkDB.RETURN_CHANGES, "true");
+
+        connection = RethinkDB.r.connection().user(user, 
password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect();
+        RethinkDB.r.db(dbName).table(table).delete().run(connection);
+        long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
+        assertEquals("Count should be same", 0L, count);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    @Test
+    public void testDeleteDocumentById() {
+        JSONObject message = new JSONObject();
+        message.put("id", "1");
+        message.put("value", "one");
+        RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
+
+        long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
+        assertEquals("Count should be same", 1L, count);
+
+        runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, 
"${rethinkdb.id}");
+
+        Map<String, String> props = new HashMap<>();
+        props.put("rethinkdb.id","1");
+
+        runner.enqueue(new byte [] {},props);
+        runner.run(1,true,true);
+        
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
+
+        String changeMessage = ("{\"old_val\":" + message + "}");
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
+        assertEquals("Flow file count should be same", 1, flowFiles.size());
+        assertEquals("Error should be null",null, 
flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+        assertEquals("Content should be same size", changeMessage.length(), 
flowFiles.get(0).getSize());
+        flowFiles.get(0).assertContentEquals(changeMessage);
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY),
 "1");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY),"0");
+
+        Map<String,Object> document = 
RethinkDB.r.db(dbName).table(table).get("1").run(connection);
+        assertEquals("Document should be null", document, null);
+    }
+
+    @Test
+    public void testDeleteDocumentByIdNoChanges() {
+        JSONObject message = new JSONObject();
+        message.put("id", "11");
+        message.put("value", "one");
+        RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
+
+        long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
+        assertEquals("Count should be same", 1L, count);
+
+        runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, 
"${rethinkdb.id}");
+        runner.setProperty(DeleteRethinkDB.RETURN_CHANGES, "false");
+
+        Map<String, String> props = new HashMap<>();
+        props.put("rethinkdb.id","11");
+
+        runner.enqueue(new byte [] {},props);
+        runner.run(1,true,true);
+        
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
+        assertEquals("Flow file count should be same", 1, flowFiles.size());
+        assertEquals("Error should be null",null, 
flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY),
 "1");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY),"0");
+        assertEquals(flowFiles.get(0).getSize(), 0);
+
+        Map<String,Object> document = 
RethinkDB.r.db(dbName).table(table).get("1").run(connection);
+        assertEquals("Document should be null", document, null);
+    }
+
+    @Test
+    public void testDeleteDocumentByIdNotFound() {
+        JSONObject message = new JSONObject();
+        message.put("id", "1");
+        message.put("value", "one");
+        RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
+
+        long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
+        assertEquals("Count should be same", 1L, count);
+
+        runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, 
"${rethinkdb.id}");
+
+        Map<String, String> props = new HashMap<>();
+        props.put("rethinkdb.id", String.valueOf(System.currentTimeMillis()));
+
+        runner.enqueue(new byte [] {},props);
+        runner.run(1,true,true);
+        
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_NOT_FOUND, 
1);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_NOT_FOUND);
+        assertEquals("Flow file count should be same", 1, flowFiles.size());
+        assertNotNull("Error should not be null", 
flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+        assertEquals("Content should be same size", 0, 
flowFiles.get(0).getSize());
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY),
 "0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY),"1");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY),"0");
+
+        Map<String,Object> document = 
RethinkDB.r.db(dbName).table(table).get("1").run(connection);
+        assertNotNull("Document should not be null", document);
+        assertEquals("id should be same", document.get("id"), "1");
+    }
+
+    @Test
+    public void testDeleteDocumentByHardCodedId() {
+        JSONObject message = new JSONObject();
+        message.put("id", "2");
+        message.put("value", "two");
+        RethinkDB.r.db(dbName).table(table).insert(message).run(connection);
+
+        long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
+        assertEquals("Count should be same", 1, count);
+
+        runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, 
"2");
+
+        Map<String, String> props = new HashMap<>();
+
+        runner.enqueue(new byte [] {},props);
+        runner.run(1,true,true);
+        
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
+
+        String changeMessage = ("{\"old_val\":" + message + "}");
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
+        assertEquals("Flow file count should be same", 1, flowFiles.size());
+        assertEquals("Error should be null",null, 
flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+        assertEquals("Content should be same size", changeMessage.length(), 
flowFiles.get(0).getSize());
+        flowFiles.get(0).assertContentEquals(changeMessage);
+
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_DELETED_KEY),
 "1");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_ERROR_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_INSERTED_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_REPLACED_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_SKIPPED_KEY),"0");
+        
assertEquals(flowFiles.get(0).getAttribute(DeleteRethinkDB.RETHINKDB_DELETE_RESULT_UNCHANGED_KEY),"0");
+
+        Map<String,Object> document = 
RethinkDB.r.db(dbName).table(table).get("2").run(connection);
+        assertEquals("Document should be null", document, null);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bb14115/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java
 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java
index f8e14b0..3888700 100644
--- 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java
+++ 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITGetRethinkDBTest.java
@@ -24,14 +24,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 import com.rethinkdb.RethinkDB;
-import com.rethinkdb.net.Connection;
 
 import net.minidev.json.JSONObject;
 
@@ -42,29 +40,13 @@ import net.minidev.json.JSONObject;
  * test accordingly.
  */
 @Ignore("Comment this out for running tests against a real instance of 
RethinkDB")
-public class ITGetRethinkDBTest {
-
-    private TestRunner runner;
-    private Connection connection;
-    private String dbName = "test";
-    private String dbHost = "localhost";
-    private String dbPort = "28015";
-    private String user = "admin";
-    private String password = "admin";
-    private String table = "test";
+public class ITGetRethinkDBTest extends ITAbstractRethinkDBTest {
 
     @Before
     public void setUp() throws Exception {
         runner = TestRunners.newTestRunner(GetRethinkDB.class);
-        runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, dbName);
-        runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, dbHost);
-        runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, dbPort);
-        runner.setProperty(AbstractRethinkDBProcessor.USERNAME, user);
-        runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, password);
-        runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, table);
-        runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
-
-        connection = RethinkDB.r.connection().user(user, 
password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect();
+        super.setUp();
+        runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 
KB");
         RethinkDB.r.db(dbName).table(table).delete().run(connection);
         long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
         assertEquals("Count should be same", 0L, count);
@@ -72,9 +54,7 @@ public class ITGetRethinkDBTest {
 
     @After
     public void tearDown() throws Exception {
-        runner = null;
-        connection.close();
-        connection = null;
+        super.tearDown();
     }
 
     @Test
@@ -88,6 +68,7 @@ public class ITGetRethinkDBTest {
         assertEquals("Count should be same", 1L, count);
 
         runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, 
"${rethinkdb.id}");
+        runner.assertValid();
 
         Map<String, String> props = new HashMap<>();
         props.put("rethinkdb.id","1");
@@ -114,6 +95,7 @@ public class ITGetRethinkDBTest {
         assertEquals("Count should be same", 1L, count);
 
         runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, 
"${rethinkdb.id}");
+        runner.assertValid();
 
         Map<String, String> props = new HashMap<>();
         props.put("rethinkdb.id", String.valueOf(System.currentTimeMillis()));
@@ -139,6 +121,7 @@ public class ITGetRethinkDBTest {
         assertEquals("Count should be same", 1, count);
 
         runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2");
+        runner.assertValid();
 
         Map<String, String> props = new HashMap<>();
 
@@ -165,6 +148,7 @@ public class ITGetRethinkDBTest {
 
         runner.setProperty(GetRethinkDB.RETHINKDB_DOCUMENT_ID, "2");
         runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, 
"2B");
+        runner.assertValid();
 
         Map<String, String> props = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bb14115/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITPutRethinkDBTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITPutRethinkDBTest.java
 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITPutRethinkDBTest.java
index 5260696..052d384 100644
--- 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITPutRethinkDBTest.java
+++ 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITPutRethinkDBTest.java
@@ -20,7 +20,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import java.util.List;
 import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.json.simple.JSONArray;
 import org.junit.After;
@@ -29,7 +28,6 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import com.rethinkdb.RethinkDB;
-import com.rethinkdb.net.Connection;
 
 import net.minidev.json.JSONObject;
 
@@ -40,43 +38,25 @@ import net.minidev.json.JSONObject;
  * test accordingly.
  */
 @Ignore("Comment this out for running tests against a real instance of 
RethinkDB")
-public class ITPutRethinkDBTest {
-    private TestRunner runner;
-    private Connection connection;
-    private String dbName = "test";
-    private String dbHost = "localhost";
-    private String dbPort = "28015";
-    private String user = "admin";
-    private String password = "admin";
-    private String table = "test";
+public class ITPutRethinkDBTest extends ITAbstractRethinkDBTest {
 
     @Before
     public void setUp() throws Exception {
         runner = TestRunners.newTestRunner(PutRethinkDB.class);
-        runner.setProperty(PutRethinkDB.DB_NAME, dbName);
-        runner.setProperty(PutRethinkDB.DB_HOST, dbHost);
-        runner.setProperty(PutRethinkDB.DB_PORT, dbPort);
-        runner.setProperty(PutRethinkDB.USERNAME, user);
-        runner.setProperty(PutRethinkDB.PASSWORD, password);
-        runner.setProperty(PutRethinkDB.TABLE_NAME, table);
-        runner.setProperty(PutRethinkDB.CHARSET, "UTF-8");
+        super.setUp();
+        runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "1 
KB");
         runner.setProperty(PutRethinkDB.CONFLICT_STRATEGY, 
PutRethinkDB.CONFLICT_STRATEGY_UPDATE);
         runner.setProperty(PutRethinkDB.DURABILITY, 
PutRethinkDB.DURABILITY_HARD);
-        runner.setProperty(PutRethinkDB.MAX_DOCUMENTS_SIZE, "1 KB");
-        runner.assertValid();
-
-        connection = RethinkDB.r.connection().user(user, 
password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect();
     }
 
     @After
     public void tearDown() throws Exception {
-        runner = null;
-        connection.close();
-        connection = null;
+        super.tearDown();
     }
 
     @Test
     public void testValidSingleMessage() {
+        runner.assertValid();
         RethinkDB.r.db(dbName).table(table).delete().run(connection);
         long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
         assertEquals("Count should be same", 0L, count);
@@ -104,6 +84,7 @@ public class ITPutRethinkDBTest {
 
     @Test
     public void testValidSingleMessageTwiceConflictUpdate() {
+        runner.assertValid();
         RethinkDB.r.db(dbName).table(table).delete().run(connection);
         long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
         assertEquals("Count should be same", 0L, count);
@@ -144,6 +125,7 @@ public class ITPutRethinkDBTest {
     @Test
     public void testValidSingleMessageTwiceConflictError() {
         runner.setProperty(PutRethinkDB.CONFLICT_STRATEGY, 
PutRethinkDB.CONFLICT_STRATEGY_ERROR);
+        runner.assertValid();
         RethinkDB.r.db(dbName).table(table).delete().run(connection);
         long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
         assertEquals("Count should be same", 0L, count);
@@ -183,6 +165,7 @@ public class ITPutRethinkDBTest {
 
     @Test
     public void testValidArrayMessage() {
+        runner.assertValid();
         RethinkDB.r.db(dbName).table(table).delete().run(connection);
         long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
         assertEquals("Count should be same", 0L, count);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0bb14115/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestDeleteRethinkDB.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestDeleteRethinkDB.java
 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestDeleteRethinkDB.java
new file mode 100644
index 0000000..4c06f48
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/TestDeleteRethinkDB.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.rethinkdb;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.rethinkdb.net.Connection;
+
+public class TestDeleteRethinkDB {
+    private static final String DOCUMENT_ID = "id1";
+    private TestRunner runner;
+    private AbstractRethinkDBProcessor mockDeleteRethinkDB;
+    private Map<String,Object> document = new HashMap<>();
+
+    @Before
+    public void setUp() throws Exception {
+        mockDeleteRethinkDB = new DeleteRethinkDB() {
+            @Override
+            protected Connection makeConnection() {
+                return null;
+            }
+
+            @Override
+            protected Map<String, Object> deleteDocument(String id, String 
durablity, Boolean returnChanges) {
+                return document;
+            }
+
+        };
+
+        document.put(DeleteRethinkDB.RESULT_DELETED_KEY, 1L);
+        document.put(DeleteRethinkDB.RESULT_ERROR_KEY, 0L);
+        document.put(DeleteRethinkDB.RESULT_CHANGES_KEY, Lists.asList(
+                "[{new_val=null, old_val={id=1, value=one}}]", new String[] 
{}));
+        document.put(DeleteRethinkDB.RESULT_INSERTED_KEY, 0L);
+        document.put(DeleteRethinkDB.RESULT_REPLACED_KEY, 0L);
+        document.put(DeleteRethinkDB.RESULT_SKIPPED_KEY, 0L);
+        document.put(DeleteRethinkDB.RESULT_UNCHANGED_KEY, 0L);
+        document.put(DeleteRethinkDB.RESULT_FIRST_ERROR_KEY, "");
+
+        runner = TestRunners.newTestRunner(mockDeleteRethinkDB);
+        runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, "test");
+        runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, "host1");
+        runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, "1234");
+        runner.setProperty(AbstractRethinkDBProcessor.USERNAME, "u1");
+        runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, "p1");
+        runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, "t1");
+        runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
+        runner.setProperty(AbstractRethinkDBProcessor.DURABILITY, "soft");
+        runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, 
"${rethinkdb.id}");
+        runner.assertValid();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        runner = null;
+    }
+
+    @Test
+    public void testDefaultValid() {
+        runner.assertValid();
+    }
+
+    @Test
+    public void testBlankHost() {
+        runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, "");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testEmptyPort() {
+        runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, "");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testEmptyDBName() {
+        runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, "");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testEmptyUsername() {
+        runner.setProperty(AbstractRethinkDBProcessor.USERNAME, "");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testEmptyPassword() {
+        runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, "p1");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testCharsetUTF8() {
+        runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testCharsetBlank() {
+        runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "");
+        runner.assertNotValid();
+    }
+    @Test
+    public void testZeroMaxDocumentSize() {
+        runner.setProperty(AbstractRethinkDBProcessor.MAX_DOCUMENTS_SIZE, "0");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testBlankDurability() {
+        runner.setProperty(DeleteRethinkDB.DURABILITY, "");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testNotFound() {
+        runner.assertValid();
+        document.put(DeleteRethinkDB.RESULT_DELETED_KEY, 0L);
+
+        HashMap<String,String> props = new HashMap<>();
+        props.put("rethinkdb.id", DOCUMENT_ID);
+
+        runner.enqueue(new byte[]{}, props);
+
+        runner.run(1,true,true);
+
+        
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_NOT_FOUND, 
1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_NOT_FOUND);
+        
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+        
flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,
+                "Deleted count should be 1 but was 0 for document with id 
'id1'");
+    }
+
+    @Test
+    public void testBlankId() {
+        runner.assertValid();
+        runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, 
"${rethinkdb.id}");
+
+        Map<String,String> props = new HashMap<>();
+
+        runner.enqueue(new byte[]{},props);
+        runner.run(1,true,true);
+        
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
+        
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+        
flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,AbstractRethinkDBProcessor.DOCUMENT_ID_EMPTY_MESSAGE);
+    }
+
+    @Test
+    public void testNullId() {
+        runner.assertValid();
+        runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, 
"${rethinkdb.id}");
+
+        Map<String,String> props = new HashMap<>();
+        props.put("rethinkdb.id", null);
+
+        runner.enqueue(new byte[]{},props);
+        runner.run(1,true,true);
+        
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
+        
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+        
flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,AbstractRethinkDBProcessor.DOCUMENT_ID_EMPTY_MESSAGE);
+    }
+
+    @Test
+    public void testValidSingleDelete() {
+        runner.assertValid();
+
+        HashMap<String,String> props = new HashMap<>();
+        props.put("rethinkdb.id", DOCUMENT_ID);
+
+        runner.enqueue(new byte[]{}, props);
+
+        runner.run(1,true,true);
+        
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_SUCCESS, 1);
+        Gson gson = new Gson();
+
+        String json = 
gson.toJson(((List)document.get(DeleteRethinkDB.RESULT_CHANGES_KEY)).get(0));
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_SUCCESS);
+        flowFiles.get(0).assertContentEquals(json.toString());
+        
assertNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+
+    }
+
+    @Test
+    public void testGetThrowsException() {
+        mockDeleteRethinkDB = new DeleteRethinkDB() {
+            @Override
+            protected Connection makeConnection() {
+                return null;
+            }
+
+            @Override
+            protected Map<String,Object> deleteDocument(String id, String 
durablity, Boolean returnChanges) {
+                throw new RuntimeException("testException");
+            }
+        };
+
+        runner = TestRunners.newTestRunner(mockDeleteRethinkDB);
+        runner.setProperty(AbstractRethinkDBProcessor.DB_NAME, "test");
+        runner.setProperty(AbstractRethinkDBProcessor.DB_HOST, "host1");
+        runner.setProperty(AbstractRethinkDBProcessor.DB_PORT, "1234");
+        runner.setProperty(AbstractRethinkDBProcessor.USERNAME, "u1");
+        runner.setProperty(AbstractRethinkDBProcessor.PASSWORD, "p1");
+        runner.setProperty(AbstractRethinkDBProcessor.TABLE_NAME, "t1");
+        runner.setProperty(AbstractRethinkDBProcessor.CHARSET, "UTF-8");
+
+        runner.setProperty(AbstractRethinkDBProcessor.RETHINKDB_DOCUMENT_ID, 
DOCUMENT_ID);
+
+        runner.assertValid();
+
+        HashMap<String,String> props = new HashMap<>();
+        props.put("rethinkdb.id", DOCUMENT_ID);
+
+        runner.enqueue(new byte[]{}, props);
+
+        runner.run(1,true,true);
+        
runner.assertAllFlowFilesTransferred(AbstractRethinkDBProcessor.REL_FAILURE, 1);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(AbstractRethinkDBProcessor.REL_FAILURE);
+        
assertNotNull(flowFiles.get(0).getAttribute(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE));
+        
flowFiles.get(0).assertAttributeEquals(AbstractRethinkDBProcessor.RETHINKDB_ERROR_MESSAGE,"testException");
+   }
+}
\ No newline at end of file

Reply via email to