This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9848eb4  NIFI-7007: Add update functionality to the PutCassandraRecord 
processor.
9848eb4 is described below

commit 9848eb4409fe672e7d321ea3ac3351e8a5972e14
Author: Wouter de Vries <[email protected]>
AuthorDate: Fri Jan 10 09:09:38 2020 +0100

    NIFI-7007: Add update functionality to the PutCassandraRecord processor.
    
    NIFI-7007: Add additional unit tests that hit non-happy path
    
    NIFI-7007: Use AllowableValue instead of string
    
    NIFI-7007: Add the use of attributes for the update method, statement type 
and batch statement type
    
    NIFI-7007: Add additional tests, mainly for the use of attributes
    
    NIFI-7007: add some ReadsAttribute properties to the PutCassandraRecord 
processor
    
    NIFI-7007: additional update keys validation logic
    
    NIFI-7007: fix imports
    
    NIFI-7007: Convert fieldValue to long in separate method
    
    NIFI-7007: Add new style of tests checking actual CQL output
    
    NIFI-7007: add license to new test file
    
    NIFI-7007: add customValidate to check for certain incompatible property 
combinations
    
    NIFI-7007: remove check on updateMethod and replace Set.of with java 8 
compatible replacmenet
    
    NIFI-7007: Add test for failure with empty update method via attributes
    
    NIFI-7007: remove unused variable
    
    NIFI-7007: Fix customValidate that incorrectly invalidated a valid config
    
    Fix Checkstyle
    
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #3977
---
 .../processors/cassandra/PutCassandraRecord.java   | 259 ++++++++++++++-
 .../cassandra/PutCassandraRecordTest.java          | 362 +++++++++++++++++++++
 .../cassandra/PutCassandraRecordUpdateTest.java    | 293 +++++++++++++++++
 3 files changed, 900 insertions(+), 14 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
index 8acc36a..37fefa8 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
@@ -19,14 +19,23 @@ package org.apache.nifi.processors.cassandra;
 import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.querybuilder.Assignment;
 import com.datastax.driver.core.querybuilder.Insert;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Update;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
@@ -34,8 +43,8 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
@@ -45,18 +54,63 @@ import org.apache.nifi.util.StopWatch;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
 
 @Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @CapabilityDescription("This is a record aware processor that reads the 
content of the incoming FlowFile as individual records using the " +
         "configured 'Record Reader' and writes them to Apache Cassandra using 
native protocol version 3 or higher.")
+@ReadsAttributes({
+        @ReadsAttribute(attribute = "cql.statement.type", description = "If 
'Use cql.statement.type Attribute' is selected for the Statement " +
+                "Type property, the value of the cql.statement.type Attribute 
will be used to determine which type of statement (UPDATE, INSERT) " +
+                "will be generated and executed"),
+        @ReadsAttribute(attribute = "cql.update.method", description = "If 
'Use cql.update.method Attribute' is selected for the Update " +
+                "Method property, the value of the cql.update.method Attribute 
will be used to determine which operation (Set, Increment, Decrement) " +
+                "will be used to generate and execute the Update statement. 
Ignored if the Statement Type property is not set to UPDATE"),
+        @ReadsAttribute(attribute = "cql.batch.statement.type", description = 
"If 'Use cql.batch.statement.type Attribute' is selected for the Batch " +
+                "Statement Type property, the value of the 
cql.batch.statement.type Attribute will be used to determine which type of 
batch statement " +
+                "(LOGGED, UNLOGGED, COUNTER) will be generated and executed")
+})
 public class PutCassandraRecord extends AbstractCassandraProcessor {
+    static final AllowableValue UPDATE_TYPE = new AllowableValue("UPDATE", 
"UPDATE",
+            "Use an UPDATE statement.");
+    static final AllowableValue INSERT_TYPE = new AllowableValue("INSERT", 
"INSERT",
+            "Use an INSERT statement.");
+    static final AllowableValue STATEMENT_TYPE_USE_ATTR_TYPE = new 
AllowableValue("USE_ATTR", "Use cql.statement.type Attribute",
+            "The value of the cql.statement.type Attribute will be used to 
determine which type of statement (UPDATE, INSERT) " +
+                    "will be generated and executed");
+    static final String STATEMENT_TYPE_ATTRIBUTE = "cql.statement.type";
+
+    static final AllowableValue INCR_TYPE = new AllowableValue("INCREMENT", 
"Increment",
+            "Use an increment operation (+=) for the Update statement.");
+    static final AllowableValue SET_TYPE = new AllowableValue("SET", "Set",
+            "Use a set operation (=) for the Update statement.");
+    static final AllowableValue DECR_TYPE = new AllowableValue("DECREMENT", 
"Decrement",
+            "Use a decrement operation (-=) for the Update statement.");
+    static final AllowableValue UPDATE_METHOD_USE_ATTR_TYPE = new 
AllowableValue("USE_ATTR", "Use cql.update.method Attribute",
+            "The value of the cql.update.method Attribute will be used to 
determine which operation (Set, Increment, Decrement) " +
+                    "will be used to generate and execute the Update 
statement.");
+    static final String UPDATE_METHOD_ATTRIBUTE = "cql.update.method";
+
+    static final AllowableValue LOGGED_TYPE = new AllowableValue("LOGGED", 
"LOGGED",
+            "Use a LOGGED batch statement");
+    static final AllowableValue UNLOGGED_TYPE = new AllowableValue("UNLOGGED", 
"UNLOGGED",
+            "Use an UNLOGGED batch statement");
+    static final AllowableValue COUNTER_TYPE = new AllowableValue("COUNTER", 
"COUNTER",
+            "Use a COUNTER batch statement");
+    static final AllowableValue BATCH_STATEMENT_TYPE_USE_ATTR_TYPE = new 
AllowableValue("USE_ATTR", "Use cql.batch.statement.type Attribute",
+            "The value of the cql.batch.statement.type Attribute will be used 
to determine which type of batch statement (LOGGED, UNLOGGED or COUNTER) " +
+                    "will be used to generate and execute the Update 
statement.");
+    static final String BATCH_STATEMENT_TYPE_ATTRIBUTE = 
"cql.batch.statement.type";
 
     static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
             .name("put-cassandra-record-reader")
@@ -67,6 +121,36 @@ public class PutCassandraRecord extends 
AbstractCassandraProcessor {
             .required(true)
             .build();
 
+    static final PropertyDescriptor STATEMENT_TYPE = new 
PropertyDescriptor.Builder()
+            .name("put-cassandra-record-statement-type")
+            .displayName("Statement Type")
+            .description("Specifies the type of CQL Statement to generate.")
+            .required(true)
+            .defaultValue(INSERT_TYPE.getValue())
+            .allowableValues(UPDATE_TYPE, INSERT_TYPE, 
STATEMENT_TYPE_USE_ATTR_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_METHOD = new 
PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-method")
+            .displayName("Update Method")
+            .description("Specifies the method to use to SET the values. This 
property is used if the Statement Type is " +
+                    "UPDATE and ignored otherwise.")
+            .required(false)
+            .defaultValue(SET_TYPE.getValue())
+            .allowableValues(INCR_TYPE, DECR_TYPE, SET_TYPE, 
UPDATE_METHOD_USE_ATTR_TYPE)
+            .build();
+
+    static final PropertyDescriptor UPDATE_KEYS = new 
PropertyDescriptor.Builder()
+            .name("put-cassandra-record-update-keys")
+            .displayName("Update Keys")
+            .description("A comma-separated list of column names that uniquely 
identifies a row in the database for UPDATE statements. "
+                    + "If the Statement Type is UPDATE and this property is 
not set, the conversion to CQL will fail. "
+                    + "This property is ignored if the Statement Type is not 
UPDATE.")
+            .addValidator(StandardValidators.createListValidator(true, false, 
StandardValidators.NON_EMPTY_VALIDATOR))
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
     static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder()
             .name("put-cassandra-record-table")
             .displayName("Table name")
@@ -90,8 +174,8 @@ public class PutCassandraRecord extends 
AbstractCassandraProcessor {
             .name("put-cassandra-record-batch-statement-type")
             .displayName("Batch Statement Type")
             .description("Specifies the type of 'Batch Statement' to be used.")
-            .allowableValues(BatchStatement.Type.values())
-            .defaultValue(BatchStatement.Type.LOGGED.toString())
+            .allowableValues(LOGGED_TYPE, UNLOGGED_TYPE, COUNTER_TYPE, 
BATCH_STATEMENT_TYPE_USE_ATTR_TYPE)
+            .defaultValue(LOGGED_TYPE.getValue())
             .required(false)
             .build();
 
@@ -102,7 +186,7 @@ public class PutCassandraRecord extends 
AbstractCassandraProcessor {
             .build();
 
     private final static List<PropertyDescriptor> propertyDescriptors = 
Collections.unmodifiableList(Arrays.asList(
-            CONNECTION_PROVIDER_SERVICE, CONTACT_POINTS, KEYSPACE, TABLE, 
CLIENT_AUTH, USERNAME, PASSWORD,
+            CONNECTION_PROVIDER_SERVICE, CONTACT_POINTS, KEYSPACE, TABLE, 
STATEMENT_TYPE, UPDATE_KEYS, UPDATE_METHOD, CLIENT_AUTH, USERNAME, PASSWORD,
             RECORD_READER_FACTORY, BATCH_SIZE, CONSISTENCY_LEVEL, 
BATCH_STATEMENT_TYPE, PROP_SSL_CONTEXT_SERVICE));
 
     private final static Set<Relationship> relationships = 
Collections.unmodifiableSet(
@@ -129,8 +213,33 @@ public class PutCassandraRecord extends 
AbstractCassandraProcessor {
         final String cassandraTable = 
context.getProperty(TABLE).evaluateAttributeExpressions(inputFlowFile).getValue();
         final RecordReaderFactory recordParserFactory = 
context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
         final int batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-        final String batchStatementType = 
context.getProperty(BATCH_STATEMENT_TYPE).getValue();
         final String serialConsistencyLevel = 
context.getProperty(CONSISTENCY_LEVEL).getValue();
+        final String updateKeys = 
context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(inputFlowFile).getValue();
+
+        // Get the statement type from the attribute if necessary
+        final String statementTypeProperty = 
context.getProperty(STATEMENT_TYPE).getValue();
+        String statementType = statementTypeProperty;
+        if 
(STATEMENT_TYPE_USE_ATTR_TYPE.getValue().equals(statementTypeProperty)) {
+            statementType = 
inputFlowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
+        }
+
+        // Get the update method from the attribute if necessary
+        final String updateMethodProperty = 
context.getProperty(UPDATE_METHOD).getValue();
+        String updateMethod = updateMethodProperty;
+        if 
(UPDATE_METHOD_USE_ATTR_TYPE.getValue().equals(updateMethodProperty)) {
+            updateMethod = inputFlowFile.getAttribute(UPDATE_METHOD_ATTRIBUTE);
+        }
+
+
+        // Get the batch statement type from the attribute if necessary
+        final String batchStatementTypeProperty = 
context.getProperty(BATCH_STATEMENT_TYPE).getValue();
+        String batchStatementType = batchStatementTypeProperty;
+        if 
(BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue().equals(batchStatementTypeProperty))
 {
+            batchStatementType = 
inputFlowFile.getAttribute(BATCH_STATEMENT_TYPE_ATTRIBUTE).toUpperCase();
+        }
+        if (StringUtils.isEmpty(batchStatementType)) {
+            throw new IllegalArgumentException(format("Batch Statement Type is 
not specified, FlowFile %s", inputFlowFile));
+        }
 
         final BatchStatement batchStatement;
         final Session connectionSession = cassandraSession.get();
@@ -142,6 +251,24 @@ public class PutCassandraRecord extends 
AbstractCassandraProcessor {
         try (final InputStream inputStream = session.read(inputFlowFile);
              final RecordReader reader = 
recordParserFactory.createRecordReader(inputFlowFile, inputStream, 
getLogger())){
 
+            // throw an exception if statement type is not set
+            if (StringUtils.isEmpty(statementType)) {
+                throw new IllegalArgumentException(format("Statement Type is 
not specified, FlowFile %s", inputFlowFile));
+            }
+
+            // throw an exception if the statement type is set to update and 
updateKeys is empty
+            if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType) && 
StringUtils.isEmpty(updateKeys)) {
+                    throw new IllegalArgumentException(format("Update Keys are 
not specified, FlowFile %s", inputFlowFile));
+            }
+
+            // throw an exception if the Update Method is Increment or 
Decrement and the batch statement type is not UNLOGGED or COUNTER
+            if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod) || 
DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                if 
(!(UNLOGGED_TYPE.getValue().equalsIgnoreCase(batchStatementType) || 
COUNTER_TYPE.getValue().equalsIgnoreCase(batchStatementType))) {
+                    throw new 
IllegalArgumentException(format("Increment/Decrement Update Method can only be 
used with COUNTER " +
+                            "or UNLOGGED Batch Statement Type, FlowFile %s", 
inputFlowFile));
+                }
+            }
+
             final RecordSchema schema = reader.getSchema();
             Record record;
 
@@ -151,18 +278,16 @@ public class PutCassandraRecord extends 
AbstractCassandraProcessor {
             while((record = reader.nextRecord()) != null) {
                 Map<String, Object> recordContentMap = (Map<String, Object>) 
DataTypeUtils
                         .convertRecordFieldtoObject(record, 
RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
-                Insert insertQuery;
 
-                if (cassandraTable.contains(".")) {
-                    String keyspaceAndTable[] = cassandraTable.split("\\.");
-                    insertQuery = QueryBuilder.insertInto(keyspaceAndTable[0], 
keyspaceAndTable[1]);
+                Statement query;
+                if (INSERT_TYPE.getValue().equalsIgnoreCase(statementType)) {
+                    query = generateInsert(cassandraTable, schema, 
recordContentMap);
+                } else if 
(UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
+                    query = generateUpdate(cassandraTable, schema, updateKeys, 
updateMethod, recordContentMap);
                 } else {
-                    insertQuery = QueryBuilder.insertInto(cassandraTable);
+                    throw new IllegalArgumentException(format("Statement Type 
%s is not valid, FlowFile %s", statementType, inputFlowFile));
                 }
-                for (String fieldName : schema.getFieldNames()) {
-                    insertQuery.value(fieldName, 
recordContentMap.get(fieldName));
-                }
-                batchStatement.add(insertQuery);
+                batchStatement.add(query);
 
                 if (recordsAdded.incrementAndGet() == batchSize) {
                     connectionSession.execute(batchStatement);
@@ -193,6 +318,112 @@ public class PutCassandraRecord extends 
AbstractCassandraProcessor {
 
     }
 
+    protected Statement generateUpdate(String cassandraTable, RecordSchema 
schema, String updateKeys, String updateMethod, Map<String, Object> 
recordContentMap) {
+        Update updateQuery;
+
+        // Split up the update key names separated by a comma, should not be 
empty
+        final Set<String> updateKeyNames;
+        updateKeyNames = Arrays.stream(updateKeys.split(","))
+                .map(String::trim)
+                .filter(StringUtils::isNotEmpty)
+                .collect(Collectors.toSet());
+        if (updateKeyNames.isEmpty()) {
+            throw new IllegalArgumentException("No Update Keys were 
specified");
+        }
+
+        // Verify if all update keys are present in the record
+        for (String updateKey : updateKeyNames) {
+            if (!schema.getFieldNames().contains(updateKey)) {
+                throw new IllegalArgumentException("Update key '" + updateKey 
+ "' is not present in the record schema");
+            }
+        }
+
+        // Prepare keyspace/table names
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            updateQuery = QueryBuilder.update(keyspaceAndTable[0], 
keyspaceAndTable[1]);
+        } else {
+            updateQuery = QueryBuilder.update(cassandraTable);
+        }
+
+        // Loop through the field names, setting those that are not in the 
update key set, and using those
+        // in the update key set as conditions.
+        for (String fieldName : schema.getFieldNames()) {
+            Object fieldValue = recordContentMap.get(fieldName);
+
+            if (updateKeyNames.contains(fieldName)) {
+                updateQuery.where(QueryBuilder.eq(fieldName, fieldValue));
+            } else {
+                Assignment assignment;
+                if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.set(fieldName, fieldValue);
+                } else if 
(INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.incr(fieldName, 
convertFieldObjectToLong(fieldName, fieldValue));
+                } else if 
(DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                    assignment = QueryBuilder.decr(fieldName, 
convertFieldObjectToLong(fieldName, fieldValue));
+                } else {
+                    throw new IllegalArgumentException("Update Method '" + 
updateMethod + "' is not valid.");
+                }
+                updateQuery.with(assignment);
+            }
+        }
+        return updateQuery;
+    }
+
+    private Long convertFieldObjectToLong(String name, Object value) {
+        if (!(value instanceof Number)) {
+            throw new IllegalArgumentException("Field '" + name + "' is not of 
type Number");
+        }
+        return ((Number) value).longValue();
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        Set<ValidationResult> results = (Set<ValidationResult>) 
super.customValidate(validationContext);
+
+        String statementType = 
validationContext.getProperty(STATEMENT_TYPE).getValue();
+
+        if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
+            // Check that update keys are set
+            String updateKeys = 
validationContext.getProperty(UPDATE_KEYS).getValue();
+            if (StringUtils.isEmpty(updateKeys)) {
+                results.add(new ValidationResult.Builder().subject("Update 
statement configuration").valid(false).explanation(
+                        "if the Statement Type is set to Update, then the 
Update Keys must be specified as well").build());
+            }
+
+            // Check that if the update method is set to increment or 
decrement that the batch statement type is set to
+            // unlogged or counter (or USE_ATTR_TYPE, which we cannot check at 
this point).
+            String updateMethod = 
validationContext.getProperty(UPDATE_METHOD).getValue();
+            String batchStatementType = 
validationContext.getProperty(BATCH_STATEMENT_TYPE).getValue();
+            if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)
+                    || DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
+                if 
(!(COUNTER_TYPE.getValue().equalsIgnoreCase(batchStatementType)
+                        || 
UNLOGGED_TYPE.getValue().equalsIgnoreCase(batchStatementType)
+                        || 
BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue().equalsIgnoreCase(batchStatementType)))
 {
+                    results.add(new ValidationResult.Builder().subject("Update 
method configuration").valid(false).explanation(
+                            "if the Update Method is set to Increment or 
Decrement, then the Batch Statement Type must be set " +
+                                    "to either COUNTER or UNLOGGED").build());
+                }
+            }
+        }
+
+        return results;
+    }
+
+    private Statement generateInsert(String cassandraTable, RecordSchema 
schema, Map<String, Object> recordContentMap) {
+        Insert insertQuery;
+        if (cassandraTable.contains(".")) {
+            String[] keyspaceAndTable = cassandraTable.split("\\.");
+            insertQuery = QueryBuilder.insertInto(keyspaceAndTable[0], 
keyspaceAndTable[1]);
+        } else {
+            insertQuery = QueryBuilder.insertInto(cassandraTable);
+        }
+        for (String fieldName : schema.getFieldNames()) {
+            insertQuery.value(fieldName, recordContentMap.get(fieldName));
+        }
+        return insertQuery;
+    }
+
     @OnUnscheduled
     public void stop(ProcessContext context) {
         super.stop(context);
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
index 34a6973..f83dc20 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
@@ -35,7 +35,9 @@ import org.junit.Test;
 
 import javax.net.ssl.SSLContext;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -121,6 +123,366 @@ public class PutCassandraRecordTest {
     }
 
     @Test
+    public void testSimpleUpdate() throws InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.UPDATE_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, 
PutCassandraRecord.SET_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.COUNTER_TYPE);
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("goals", RecordFieldType.INT);
+
+        recordReader.addRecord("John Doe", 48, 1L);
+        recordReader.addRecord("Jane Doe", 47, 2L);
+        recordReader.addRecord("Sally Doe", 47, 0);
+
+        testRunner.enqueue("");
+        testRunner.run();
+
+        
testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testUpdateInvalidFieldType() throws InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.UPDATE_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, 
PutCassandraRecord.INCR_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.COUNTER_TYPE);
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("goals", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48,"1");
+        recordReader.addRecord("Jane Doe", 47, "1");
+        recordReader.addRecord("Sally Doe", 47, "1");
+
+        testRunner.enqueue("");
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
+    }
+
+    @Test
+    public void testUpdateEmptyUpdateKeys() throws InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.UPDATE_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, 
PutCassandraRecord.INCR_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.COUNTER_TYPE);
+
+        testRunner.assertNotValid();
+    }
+
+    @Test
+    public void testUpdateNullUpdateKeys() throws InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.UPDATE_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, 
PutCassandraRecord.SET_TYPE);
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.COUNTER_TYPE);
+
+        testRunner.assertNotValid();
+    }
+
+    @Test
+    public void testUpdateZeroLengthUpdateKeys() throws 
InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.UPDATE_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, 
PutCassandraRecord.INCR_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, ",");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.COUNTER_TYPE);
+
+        testRunner.assertValid();
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("goals", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, "1");
+        recordReader.addRecord("Jane Doe", 47, "1");
+        recordReader.addRecord("Sally Doe", 47, "1");
+
+        testRunner.enqueue("");
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
+    }
+
+    @Test
+    public void testUpdateSetLoggedBatch() throws InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.UPDATE_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, 
PutCassandraRecord.SET_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.LOGGED_TYPE);
+
+        testRunner.assertValid();
+    }
+
+    @Test
+    public void testUpdateCounterWrongBatchStatementType() throws 
InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.UPDATE_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, 
PutCassandraRecord.INCR_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.LOGGED_TYPE);
+
+        testRunner.assertNotValid();
+    }
+
+    @Test
+    public void testUpdateWithUpdateMethodAndKeyAttributes() throws 
InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.UPDATE_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, 
PutCassandraRecord.UPDATE_METHOD_USE_ATTR_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, 
"${cql.update.keys}");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.COUNTER_TYPE);
+
+        testRunner.assertValid();
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("goals", RecordFieldType.LONG);
+
+        recordReader.addRecord("John Doe", 48, 1L);
+        recordReader.addRecord("Jane Doe", 47, 1L);
+        recordReader.addRecord("Sally Doe", 47, 1L);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("cql.update.method", "Increment");
+        attributes.put("cql.update.keys", "name,age");
+        testRunner.enqueue("", attributes);
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 0);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
+    }
+
+    @Test
+    public void testInsertWithStatementAttribute() throws 
InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.STATEMENT_TYPE_USE_ATTR_TYPE);
+
+        testRunner.assertValid();
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("goals", RecordFieldType.LONG);
+
+        recordReader.addRecord("John Doe", 48, 1L);
+        recordReader.addRecord("Jane Doe", 47, 1L);
+        recordReader.addRecord("Sally Doe", 47, 1L);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("cql.statement.type", "Insert");
+        testRunner.enqueue("", attributes);
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 0);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
+    }
+
+    @Test
+    public void testInsertWithStatementAttributeInvalid() throws 
InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.STATEMENT_TYPE_USE_ATTR_TYPE);
+
+        testRunner.assertValid();
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("goals", RecordFieldType.LONG);
+
+        recordReader.addRecord("John Doe", 48, 1L);
+        recordReader.addRecord("Jane Doe", 47, 1L);
+        recordReader.addRecord("Sally Doe", 47, 1L);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("cql.statement.type", "invalid-type");
+        testRunner.enqueue("", attributes);
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
+    }
+
+    @Test
+    public void testInsertWithBatchStatementAttribute() throws 
InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.INSERT_TYPE);
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.BATCH_STATEMENT_TYPE_USE_ATTR_TYPE);
+
+        testRunner.assertValid();
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("goals", RecordFieldType.LONG);
+
+        recordReader.addRecord("John Doe", 48, 1L);
+        recordReader.addRecord("Jane Doe", 47, 1L);
+        recordReader.addRecord("Sally Doe", 47, 1L);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("cql.batch.statement.type", "counter");
+        testRunner.enqueue("", attributes);
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 0);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 1);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
+    }
+
+    @Test
+    public void testInsertWithBatchStatementAttributeInvalid() throws 
InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.INSERT_TYPE);
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.BATCH_STATEMENT_TYPE_USE_ATTR_TYPE);
+
+        testRunner.assertValid();
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("goals", RecordFieldType.LONG);
+
+        recordReader.addRecord("John Doe", 48, 1L);
+        recordReader.addRecord("Jane Doe", 47, 1L);
+        recordReader.addRecord("Sally Doe", 47, 1L);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("cql.batch.statement.type", "invalid-type");
+        testRunner.enqueue("", attributes);
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
+    }
+
+    @Test
+    public void testUpdateWithAttributesInvalidUpdateMethod() throws 
InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.UPDATE_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, 
PutCassandraRecord.UPDATE_METHOD_USE_ATTR_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, 
"${cql.update.keys}");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.COUNTER_TYPE);
+
+        testRunner.assertValid();
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("goals", RecordFieldType.INT);
+
+        recordReader.addRecord("John Doe", 48, 1L);
+        recordReader.addRecord("Jane Doe", 47, 1L);
+        recordReader.addRecord("Sally Doe", 47, 1L);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("cql.update.method", "invalid-method");
+        attributes.put("cql.update.keys", "name,age");
+        testRunner.enqueue("", attributes);
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
+    }
+
+    @Test
+    public void testUpdateWithAttributesIncompatibleBatchStatementType() 
throws InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.UPDATE_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, 
PutCassandraRecord.INCR_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.BATCH_STATEMENT_TYPE_USE_ATTR_TYPE);
+
+        testRunner.assertValid();
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("goals", RecordFieldType.INT);
+
+        recordReader.addRecord("John Doe", 48, 1L);
+        recordReader.addRecord("Jane Doe", 47, 1L);
+        recordReader.addRecord("Sally Doe", 47, 1L);
+
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("cql.batch.statement.type", "LOGGED");
+        testRunner.enqueue("", attributes);
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
+    }
+
+    @Test
+    public void testUpdateWithAttributesEmptyUpdateKeysAttribute() throws 
InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.UPDATE_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, 
PutCassandraRecord.UPDATE_METHOD_USE_ATTR_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, 
"${cql.update.keys}");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.COUNTER_TYPE);
+
+        testRunner.assertValid();
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("goals", RecordFieldType.LONG);
+
+        recordReader.addRecord("John Doe", 48, 1L);
+        recordReader.addRecord("Jane Doe", 47, 1L);
+        recordReader.addRecord("Sally Doe", 47, 1L);
+
+        HashMap<String, String> attributes = new HashMap<>();
+        attributes.put("cql.update.method", "Increment");
+        attributes.put("cql.update.keys", "");
+        testRunner.enqueue("", attributes);
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
+    }
+
+    @Test
+    public void testUpdateWithAttributesEmptyUpdateMethodAttribute() throws 
InitializationException {
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutCassandraRecord.STATEMENT_TYPE, 
PutCassandraRecord.UPDATE_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_METHOD, 
PutCassandraRecord.UPDATE_METHOD_USE_ATTR_TYPE);
+        testRunner.setProperty(PutCassandraRecord.UPDATE_KEYS, "name,age");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, 
PutCassandraRecord.COUNTER_TYPE);
+
+        testRunner.assertValid();
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("goals", RecordFieldType.LONG);
+
+        recordReader.addRecord("John Doe", 48, 1L);
+        recordReader.addRecord("Jane Doe", 47, 1L);
+        recordReader.addRecord("Sally Doe", 47, 1L);
+
+        HashMap<String, String> attributes = new HashMap<>();
+        attributes.put("cql.update.method", "");
+        testRunner.enqueue("", attributes);
+        testRunner.run();
+
+        testRunner.assertTransferCount(PutCassandraRecord.REL_FAILURE, 1);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_SUCCESS, 0);
+        testRunner.assertTransferCount(PutCassandraRecord.REL_RETRY, 0);
+    }
+
+    @Test
     public void testEL() throws InitializationException {
         testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, 
"${contact.points}");
         testRunner.setProperty(PutCassandraRecord.PASSWORD, "${pass}");
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordUpdateTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordUpdateTest.java
new file mode 100644
index 0000000..78bebf0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordUpdateTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.Statement;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+public class PutCassandraRecordUpdateTest {
+    private PutCassandraRecord testSubject;
+
+    @Mock
+    private RecordSchema schema;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
+        testSubject = new PutCassandraRecord();
+    }
+
+    @Test
+    public void testGenerateUpdateWithEmptyKeyList() {
+        Stream.of("", ",", ",,,").forEach(updateKeys -> testGenerateUpdate(
+                "keyspace.table",
+                updateKeys,
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                new IllegalArgumentException("No Update Keys were specified")
+        ));
+    }
+
+    @Test
+    public void testGenerateUpdateWithMissingKey() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField,missingKeyField",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                new IllegalArgumentException("Update key 'missingKeyField' is 
not present in the record schema")
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateWithInvalidUpdateMethod() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                "invalidUpdateMethod",
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("longField", 15L)
+                ),
+                new IllegalArgumentException("Update Method 
'invalidUpdateMethod' is not valid.")
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementString() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("stringField", "15")
+                ),
+                new IllegalArgumentException("Field 'stringField' is not of 
type Number")
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateSimpleTableName() {
+        testGenerateUpdate(
+                "table",
+                "keyField1",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField1", 1),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                "UPDATE table SET stringField='newStringValue' WHERE 
keyField1=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateKeyspacedTableName() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField1",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField1", 1),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                "UPDATE keyspace.table SET stringField='newStringValue' WHERE 
keyField1=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateMultipleKeys() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField1,keyField2,keyField3",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField1", 1),
+                        new Tuple<>("keyField2", "key2"),
+                        new Tuple<>("keyField3", 123L),
+                        new Tuple<>("stringField", "newStringValue")
+                ),
+                "UPDATE keyspace.table SET stringField='newStringValue' WHERE 
keyField1=1 AND keyField2='key2' AND keyField3=123;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementLong() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("longField", 15L)
+                ),
+                "UPDATE keyspace.table SET longField=longField+15 WHERE 
keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateDecrementLong() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.DECR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("longField", 15L)
+                ),
+                "UPDATE keyspace.table SET longField=longField-15 WHERE 
keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementInteger() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("integerField", 15)
+                ),
+                "UPDATE keyspace.table SET integerField=integerField+15 WHERE 
keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementFloat() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("floatField", 15.05F)
+                ),
+                "UPDATE keyspace.table SET floatField=floatField+15 WHERE 
keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementDouble() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("doubleField", 15.05D)
+                ),
+                "UPDATE keyspace.table SET doubleField=doubleField+15 WHERE 
keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateSetMultipleValues() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.SET_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("stringField", "newStringValue"),
+                        new Tuple<>("integerField", 15),
+                        new Tuple<>("longField", 67L)
+                ),
+                "UPDATE keyspace.table SET 
stringField='newStringValue',integerField=15,longField=67 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateIncrementMultipleValues() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.INCR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("integerField", 15),
+                        new Tuple<>("longField", 67L)
+                ),
+                "UPDATE keyspace.table SET 
integerField=integerField+15,longField=longField+67 WHERE keyField=1;"
+        );
+    }
+
+    @Test
+    public void testGenerateUpdateDecrementMultipleValues() {
+        testGenerateUpdate(
+                "keyspace.table",
+                "keyField",
+                PutCassandraRecord.DECR_TYPE.getValue(),
+                Arrays.asList(
+                        new Tuple<>("keyField", 1),
+                        new Tuple<>("integerField", 15),
+                        new Tuple<>("longField", 67L)
+                ),
+                "UPDATE keyspace.table SET 
integerField=integerField-15,longField=longField-67 WHERE keyField=1;"
+        );
+    }
+
+    private void testGenerateUpdate(String table, String updateKeys, String 
updateMethod, List<Tuple<String, Object>> records, String expected) {
+        Map<String, Object> recordContentMap = records.stream()
+                .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
+
+        List<String> fieldNames = 
records.stream().map(Tuple::getKey).collect(Collectors.toList());
+
+        when(schema.getFieldNames()).thenReturn(fieldNames);
+        Statement actual = testSubject.generateUpdate(table, schema, 
updateKeys, updateMethod, recordContentMap);
+
+        assertEquals(expected, actual.toString());
+    }
+
+    private <E extends Exception> void testGenerateUpdate(String table, String 
updateKeys, String updateMethod, List<Tuple<String, Object>> records, E 
expected) {
+        Map<String, Object> recordContentMap = records.stream()
+                .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
+
+        List<String> fieldNames = 
records.stream().map(Tuple::getKey).collect(Collectors.toList());
+
+        when(schema.getFieldNames()).thenReturn(fieldNames);
+        try {
+            testSubject.generateUpdate("keyspace.table", schema, updateKeys, 
updateMethod, recordContentMap);
+            fail();
+        } catch (Exception e) {
+            assertEquals(expected.getClass(), e.getClass());
+            assertEquals(expected.getMessage(), e.getMessage());
+        }
+    }
+}

Reply via email to