Repository: cassandra
Updated Branches:
  refs/heads/trunk 6da5fb56c -> 2aeed037e


Support light-weight transactions in cassandra-stress

patch by Jaydeepkumar Chovatia; reviewed by Dinesh Joshi and jasobrown for 
CASSANDRA-13529


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2aeed037
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2aeed037
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2aeed037

Branch: refs/heads/trunk
Commit: 2aeed037e0f105e72366e15afa012257e910a25d
Parents: 6da5fb5
Author: Jaydeepkumar Chovatia <chovatia.jayd...@gmail.com>
Authored: Fri May 12 17:23:44 2017 -0700
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Wed Jun 13 05:46:20 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 doc/source/tools/cassandra_stress.rst           |  18 ++
 doc/source/tools/stress-lwt-example.yaml        |  70 ++++++
 .../cql3/conditions/ColumnCondition.java        |   7 +-
 .../cql3/statements/ModificationStatement.java  |  11 +
 .../apache/cassandra/stress/StressProfile.java  |  46 +++-
 .../stress/generate/PartitionGenerator.java     |  10 +
 .../stress/operations/PartitionOperation.java   |  13 +-
 .../stress/operations/userdefined/CASQuery.java | 227 +++++++++++++++++++
 .../operations/userdefined/SchemaQuery.java     |  11 +-
 .../operations/userdefined/SchemaStatement.java |   6 +
 11 files changed, 403 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6819711..629df0c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Support light-weight transactions in cassandra-stress (CASSANDRA-13529)
  * Make AsyncOneResponse use the correct timeout (CASSANDRA-14509)
  * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467)
  * Add a virtual table to expose all running sstable tasks (CASSANDRA-14457)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/doc/source/tools/cassandra_stress.rst
----------------------------------------------------------------------
diff --git a/doc/source/tools/cassandra_stress.rst 
b/doc/source/tools/cassandra_stress.rst
index 322a981..bcac54e 100644
--- a/doc/source/tools/cassandra_stress.rst
+++ b/doc/source/tools/cassandra_stress.rst
@@ -220,6 +220,24 @@ Running a user mode test with multiple yaml files::
 This will run operations as specified in both the example.yaml and 
example2.yaml files. example.yaml and example2.yaml can reference the same table
  although care must be taken that the table definition is identical (data 
generation specs can be different).
 
+Lightweight transaction support
++++++++++++++++++++++++++++++++
+
+cassandra-stress supports lightweight transactions. In this it will first read 
current data from Cassandra and then uses read value(s)
+to fulfill lightweight transaction condition(s).
+
+Lightweight transaction update query::
+
+    queries:
+      regularupdate:
+          cql: update blogposts set author = ? where domain = ? and 
published_date = ?
+          fields: samerow
+      updatewithlwt:
+          cql: update blogposts set author = ? where domain = ? and 
published_date = ? IF body = ? AND url = ?
+          fields: samerow
+
+The full example can be found here :download:`yaml <./stress-lwt-example.yaml>`
+
 Graphing
 ^^^^^^^^
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/doc/source/tools/stress-lwt-example.yaml
----------------------------------------------------------------------
diff --git a/doc/source/tools/stress-lwt-example.yaml 
b/doc/source/tools/stress-lwt-example.yaml
new file mode 100644
index 0000000..fc5db08
--- /dev/null
+++ b/doc/source/tools/stress-lwt-example.yaml
@@ -0,0 +1,70 @@
+# Keyspace Name
+keyspace: stresscql
+
+# The CQL for creating a keyspace (optional if it already exists)
+# Would almost always be network topology unless running something locall
+keyspace_definition: |
+  CREATE KEYSPACE stresscql WITH replication = {'class': 'SimpleStrategy', 
'replication_factor': 1};
+
+# Table name
+table: blogposts
+
+# The CQL for creating a table you wish to stress (optional if it already 
exists)
+table_definition: |
+  CREATE TABLE blogposts (
+        domain text,
+        published_date timeuuid,
+        url text,
+        author text,
+        title text,
+        body text,
+        PRIMARY KEY(domain, published_date)
+  ) WITH CLUSTERING ORDER BY (published_date DESC) 
+    AND compaction = { 'class':'LeveledCompactionStrategy' } 
+    AND comment='A table to hold blog posts'
+
+### Column Distribution Specifications ###
+ 
+columnspec:
+  - name: domain
+    size: gaussian(5..100)       #domain names are relatively short
+    population: uniform(1..10M)  #10M possible domains to pick from
+
+  - name: published_date
+    cluster: fixed(1000)         #under each domain we will have max 1000 posts
+
+  - name: url
+    size: uniform(30..300)       
+
+  - name: title                  #titles shouldn't go beyond 200 chars
+    size: gaussian(10..200)
+
+  - name: author
+    size: uniform(5..20)         #author names should be short
+
+  - name: body
+    size: gaussian(100..5000)    #the body of the blog post can be long
+   
+### Batch Ratio Distribution Specifications ###
+
+insert:
+  partitions: fixed(1)            # Our partition key is the domain so only 
insert one per batch
+
+  select:    fixed(1)/1000        # We have 1000 posts per domain so 1/1000 
will allow 1 post per batch
+
+  batchtype: UNLOGGED             # Unlogged batches
+
+
+#
+# A list of queries you wish to run against the schema
+#
+queries:
+   singlepost:
+      cql: select * from blogposts where domain = ? LIMIT 1
+      fields: samerow
+   regularupdate:
+      cql: update blogposts set author = ? where domain = ? and published_date 
= ?
+      fields: samerow
+   updatewithlwt:
+      cql: update blogposts set author = ? where domain = ? and published_date 
= ? IF body = ? AND url = ?
+      fields: samerow

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java 
b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
index c9d7fe8..aa5c10d 100644
--- a/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
+++ b/src/java/org/apache/cassandra/cql3/conditions/ColumnCondition.java
@@ -849,7 +849,12 @@ public abstract class ColumnCondition
                 throw invalidRequest("Slice conditions ( %s ) are not 
supported on durations", operator);
             }
         }
-        
+
+        public Term.Raw getValue()
+        {
+            return value;
+        }
+
         @Override
         public String toString()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index e02fd41..65e1e2d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -936,5 +937,15 @@ public abstract class ModificationStatement implements 
CQLStatement
         {
             return rawId.prepare(metadata);
         }
+
+        public List<Pair<ColumnMetadata.Raw, ColumnCondition.Raw>> 
getConditions()
+        {
+            ImmutableList.Builder<Pair<ColumnMetadata.Raw, 
ColumnCondition.Raw>> builder = 
ImmutableList.builderWithExpectedSize(conditions.size());
+
+            for (Pair<Raw, ColumnCondition.Raw> condition : conditions)
+                builder.add(Pair.create(condition.left, condition.right));
+
+            return builder.build();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java 
b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
index 2338873..cda9c58 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressProfile.java
@@ -42,14 +42,17 @@ import org.apache.cassandra.cql3.CQLFragmentParser;
 import org.apache.cassandra.cql3.CqlParser;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
 import org.apache.cassandra.exceptions.RequestValidationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.stress.generate.*;
 import org.apache.cassandra.stress.generate.values.*;
-import org.apache.cassandra.stress.operations.userdefined.TokenRangeQuery;
+import org.apache.cassandra.stress.operations.userdefined.CASQuery;
 import org.apache.cassandra.stress.operations.userdefined.SchemaInsert;
 import org.apache.cassandra.stress.operations.userdefined.SchemaQuery;
+import org.apache.cassandra.stress.operations.userdefined.SchemaStatement;
+import org.apache.cassandra.stress.operations.userdefined.TokenRangeQuery;
 import 
org.apache.cassandra.stress.operations.userdefined.ValidatingSchemaQuery;
 import org.apache.cassandra.stress.report.Timer;
 import org.apache.cassandra.stress.settings.*;
@@ -87,7 +90,7 @@ public class StressProfile implements Serializable
     transient volatile PreparedStatement insertStatement;
     transient volatile List<ValidatingSchemaQuery.Factory> validationFactories;
 
-    transient volatile Map<String, SchemaQuery.ArgSelect> argSelects;
+    transient volatile Map<String, SchemaStatement.ArgSelect> argSelects;
     transient volatile Map<String, PreparedStatement> queryStatements;
 
     private static final Pattern lowercaseAlphanumeric = 
Pattern.compile("[a-z0-9_]+");
@@ -367,13 +370,13 @@ public class StressProfile implements Serializable
                     JavaDriverClient jclient = 
settings.getJavaDriverClient(keyspaceName);
 
                     Map<String, PreparedStatement> stmts = new HashMap<>();
-                    Map<String, SchemaQuery.ArgSelect> args = new HashMap<>();
+                    Map<String, SchemaStatement.ArgSelect> args = new 
HashMap<>();
                     for (Map.Entry<String, StressYaml.QueryDef> e : 
queries.entrySet())
                     {
                         stmts.put(e.getKey().toLowerCase(), 
jclient.prepare(e.getValue().cql));
                         args.put(e.getKey().toLowerCase(), e.getValue().fields 
== null
-                                                                 ? 
SchemaQuery.ArgSelect.MULTIROW
-                                                                 : 
SchemaQuery.ArgSelect.valueOf(e.getValue().fields.toUpperCase()));
+                                ? SchemaStatement.ArgSelect.MULTIROW
+                                : 
SchemaStatement.ArgSelect.valueOf(e.getValue().fields.toUpperCase()));
                     }
                     queryStatements = stmts;
                     argSelects = args;
@@ -381,9 +384,42 @@ public class StressProfile implements Serializable
             }
         }
 
+        if (dynamicConditionExists(queryStatements.get(name)))
+            return new CASQuery(timer, settings, generator, seeds, 
queryStatements.get(name), settings.command.consistencyLevel, 
argSelects.get(name), tableName);
+
         return new SchemaQuery(timer, settings, generator, seeds, 
queryStatements.get(name), settings.command.consistencyLevel, 
argSelects.get(name));
     }
 
+    static boolean dynamicConditionExists(PreparedStatement statement) throws 
IllegalArgumentException
+    {
+        if (statement == null)
+            return false;
+
+        if (!statement.getQueryString().toUpperCase().startsWith("UPDATE"))
+            return false;
+
+        ModificationStatement.Parsed modificationStatement;
+        try
+        {
+            modificationStatement = 
CQLFragmentParser.parseAnyUnhandled(CqlParser::updateStatement,
+                                                                        
statement.getQueryString());
+        }
+        catch (RecognitionException e)
+        {
+            throw new IllegalArgumentException("could not parse update query:" 
+ statement.getQueryString(), e);
+        }
+
+        /*
+         * here we differentiate between static vs dynamic conditions:
+         *  - static condition example: if col1 = NULL
+         *  - dynamic condition example: if col1 = ?
+         *  for static condition we don't have to replace value, no extra work 
involved.
+         *  for dynamic condition we have to read existing db value and then
+         *  use current db values during the update.
+         */
+        return 
modificationStatement.getConditions().stream().anyMatch(condition -> 
condition.right.getValue().getText().equals("?"));
+    }
+
     public Operation getBulkReadQueries(String name, Timer timer, 
StressSettings settings, TokenRangeIterator tokenRangeIterator, boolean 
isWarmup)
     {
         StressYaml.TokenRangeQueryDef def = tokenRangeQueries.get(name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
index 1230065..882b8b4 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/generate/PartitionGenerator.java
@@ -83,6 +83,16 @@ public class PartitionGenerator
         return !(index < 0 || index < clusteringComponents.size());
     }
 
+    public List<Generator> getPartitionKey()
+    {
+        return Collections.unmodifiableList(partitionKey);
+    }
+
+    public List<Generator> getClusteringComponents()
+    {
+        return Collections.unmodifiableList(clusteringComponents);
+    }
+
     public int indexOf(String name)
     {
         Integer i = indexMap.get(name);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
index bad0a94..55c6872 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/PartitionOperation.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.stress.operations;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.cassandra.stress.Operation;
@@ -75,6 +76,16 @@ public abstract class PartitionOperation extends Operation
         this.spec = spec;
     }
 
+    public DataSpec getDataSpecification()
+    {
+        return spec;
+    }
+
+    public List<PartitionIterator> getPartitions()
+    {
+        return Collections.unmodifiableList(partitions);
+    }
+
     public int ready(WorkManager permits)
     {
         int partitionCount = (int) spec.partitionCount.next();
@@ -86,7 +97,7 @@ public abstract class PartitionOperation extends Operation
 
         int i = 0;
         boolean success = true;
-        for (; i < partitionCount && success ; i++)
+        for (; i < partitionCount && success; i++)
         {
             if (i >= partitionCache.size())
                 
partitionCache.add(PartitionIterator.get(spec.partitionGenerator, 
spec.seedManager));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/CASQuery.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/CASQuery.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/CASQuery.java
new file mode 100644
index 0000000..e7d0fe3
--- /dev/null
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/CASQuery.java
@@ -0,0 +1,227 @@
+package org.apache.cassandra.stress.operations.userdefined;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.LocalDate;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import org.antlr.runtime.RecognitionException;
+import org.apache.cassandra.cql3.CQLFragmentParser;
+import org.apache.cassandra.cql3.CqlParser;
+import org.apache.cassandra.cql3.conditions.ColumnCondition;
+import org.apache.cassandra.cql3.statements.ModificationStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.stress.generate.DistributionFixed;
+import org.apache.cassandra.stress.generate.PartitionGenerator;
+import org.apache.cassandra.stress.generate.Row;
+import org.apache.cassandra.stress.generate.SeedManager;
+import org.apache.cassandra.stress.generate.values.Generator;
+import org.apache.cassandra.stress.report.Timer;
+import org.apache.cassandra.stress.settings.StressSettings;
+import org.apache.cassandra.stress.util.JavaDriverClient;
+import org.apache.cassandra.utils.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class CASQuery extends SchemaStatement
+{
+    private final ImmutableList<Integer> keysIndex;
+    private final ImmutableMap<Integer, Integer> casConditionArgFreqMap;
+    private final String readQuery;
+
+    private PreparedStatement casReadConditionStatement;
+
+    public CASQuery(Timer timer, StressSettings settings, PartitionGenerator 
generator, SeedManager seedManager, PreparedStatement statement, 
ConsistencyLevel cl, ArgSelect argSelect, final String tableName)
+    {
+        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == 
SchemaStatement.ArgSelect.MULTIROW ? statement.getVariables().size() : 1), 
statement,
+              
statement.getVariables().asList().stream().map(ColumnDefinitions.Definition::getName).collect(Collectors.toList()),
 cl);
+
+        if (argSelect != SchemaStatement.ArgSelect.SAMEROW)
+            throw new IllegalArgumentException("CAS is supported only for type 
'samerow'");
+
+        ModificationStatement.Parsed modificationStatement;
+        try
+        {
+            modificationStatement = 
CQLFragmentParser.parseAnyUnhandled(CqlParser::updateStatement,
+                    statement.getQueryString());
+        }
+        catch (RecognitionException e)
+        {
+            throw new IllegalArgumentException("could not parse update query:" 
+ statement.getQueryString(), e);
+        }
+
+        final List<Pair<ColumnMetadata.Raw, ColumnCondition.Raw>> 
casConditionList = modificationStatement.getConditions();
+        List<Integer> casConditionIndex = new ArrayList<>();
+
+        boolean first = true;
+        StringBuilder casReadConditionQuery = new StringBuilder();
+        casReadConditionQuery.append("SELECT ");
+        for (final Pair<ColumnMetadata.Raw, ColumnCondition.Raw> condition : 
casConditionList)
+        {
+            if (!condition.right.getValue().getText().equals("?"))
+            {
+                //condition uses static value, ignore it
+                continue;
+            }
+            if (!first)
+            {
+                casReadConditionQuery.append(", ");
+            }
+            casReadConditionQuery.append(condition.left.rawText());
+            
casConditionIndex.add(getDataSpecification().partitionGenerator.indexOf(condition.left.rawText()));
+            first = false;
+        }
+        casReadConditionQuery.append(" FROM ").append(tableName).append(" 
WHERE ");
+
+        first = true;
+        ImmutableList.Builder<Integer> keysBuilder = ImmutableList.builder();
+        for (final Generator key : 
getDataSpecification().partitionGenerator.getPartitionKey())
+        {
+            if (!first)
+            {
+                casReadConditionQuery.append(" AND ");
+            }
+            casReadConditionQuery.append(key.name).append(" = ? ");
+            
keysBuilder.add(getDataSpecification().partitionGenerator.indexOf(key.name));
+            first = false;
+        }
+        for (final Generator clusteringKey : 
getDataSpecification().partitionGenerator.getClusteringComponents())
+        {
+            casReadConditionQuery.append(" AND 
").append(clusteringKey.name).append(" = ? ");
+            
keysBuilder.add(getDataSpecification().partitionGenerator.indexOf(clusteringKey.name));
+        }
+        keysIndex = keysBuilder.build();
+        readQuery = casReadConditionQuery.toString();
+
+        ImmutableMap.Builder<Integer, Integer> builder = 
ImmutableMap.builderWithExpectedSize(casConditionIndex.size());
+        for (final Integer oneConditionIndex : casConditionIndex)
+        {
+            builder.put(oneConditionIndex, 
Math.toIntExact(Arrays.stream(argumentIndex).filter((x) -> x == 
oneConditionIndex).count()));
+        }
+        casConditionArgFreqMap = builder.build();
+    }
+
+    private class JavaDriverRun extends Runner
+    {
+        final JavaDriverClient client;
+
+        private JavaDriverRun(JavaDriverClient client)
+        {
+            this.client = client;
+            casReadConditionStatement = client.prepare(readQuery);
+        }
+
+        public boolean run()
+        {
+            ResultSet rs = client.getSession().execute(bind(client));
+            rowCount = rs.all().size();
+            partitionCount = Math.min(1, rowCount);
+            return true;
+        }
+    }
+
+    @Override
+    public void run(JavaDriverClient client) throws IOException
+    {
+        timeWithRetry(new JavaDriverRun(client));
+    }
+
+    private BoundStatement bind(JavaDriverClient client)
+    {
+        final Object keys[] = new Object[keysIndex.size()];
+        final Row row = getPartitions().get(0).next();
+
+        for (int i = 0; i < keysIndex.size(); i++)
+        {
+            keys[i] = row.get(keysIndex.get(i));
+        }
+
+        //get current db values for all the coluns which are part of dynamic 
conditions
+        ResultSet rs = 
client.getSession().execute(casReadConditionStatement.bind(keys));
+        final Object casDbValues[] = new Object[casConditionArgFreqMap.size()];
+
+        final com.datastax.driver.core.Row casDbValue = rs.one();
+        if (casDbValue != null)
+        {
+            for (int i = 0; i < casConditionArgFreqMap.size(); i++)
+            {
+                casDbValues[i] = casDbValue.getObject(i);
+            }
+        }
+        //now bind db values for dynamic conditions in actual CAS update 
operation
+        return prepare(row, casDbValues);
+    }
+
+    private BoundStatement prepare(final Row row, final Object[] casDbValues)
+    {
+        final Map<Integer, Integer> localMapping = new 
HashMap<>(casConditionArgFreqMap);
+        int conditionIndexTracker = 0;
+        for (int i = 0; i < argumentIndex.length; i++)
+        {
+            boolean replace = false;
+            Integer count = localMapping.get(argumentIndex[i]);
+            if (count != null)
+            {
+                count--;
+                localMapping.put(argumentIndex[i], count);
+                if (count == 0)
+                {
+                    replace = true;
+                }
+            }
+
+            if (replace)
+            {
+                bindBuffer[i] = casDbValues[conditionIndexTracker++];
+            }
+            else
+            {
+                Object value = row.get(argumentIndex[i]);
+                if (definitions.getType(i).getName() == 
DataType.date().getName())
+                {
+                    // the java driver only accepts 
com.datastax.driver.core.LocalDate for CQL type "DATE"
+                    value = LocalDate.fromDaysSinceEpoch((Integer) value);
+                }
+
+                bindBuffer[i] = value;
+            }
+
+            if (bindBuffer[i] == null && 
!getDataSpecification().partitionGenerator.permitNulls(argumentIndex[i]))
+            {
+                throw new IllegalStateException();
+            }
+        }
+        return statement.bind(bindBuffer);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
index f0b332c..cba9ce4 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaQuery.java
@@ -22,9 +22,6 @@ package org.apache.cassandra.stress.operations.userdefined;
 
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Random;
 import java.util.stream.Collectors;
 
@@ -39,19 +36,13 @@ import org.apache.cassandra.stress.util.JavaDriverClient;
 
 public class SchemaQuery extends SchemaStatement
 {
-    public static enum ArgSelect
-    {
-        MULTIROW, SAMEROW;
-        //TODO: FIRSTROW, LASTROW
-    }
-
     final ArgSelect argSelect;
     final Object[][] randomBuffer;
     final Random random = new Random();
 
     public SchemaQuery(Timer timer, StressSettings settings, 
PartitionGenerator generator, SeedManager seedManager, PreparedStatement 
statement, ConsistencyLevel cl, ArgSelect argSelect)
     {
-        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == 
ArgSelect.MULTIROW ? statement.getVariables().size() : 1), statement,
+        super(timer, settings, new DataSpec(generator, seedManager, new 
DistributionFixed(1), settings.insert.rowPopulationRatio.get(), argSelect == 
SchemaStatement.ArgSelect.MULTIROW ? statement.getVariables().size() : 1), 
statement,
               statement.getVariables().asList().stream().map(d -> 
d.getName()).collect(Collectors.toList()), cl);
         this.argSelect = argSelect;
         randomBuffer = new Object[argumentIndex.length][argumentIndex.length];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2aeed037/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
index 6bd3fd5..334e6c5 100644
--- 
a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
+++ 
b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/SchemaStatement.java
@@ -38,6 +38,12 @@ import org.apache.cassandra.stress.util.JavaDriverClient;
 
 public abstract class SchemaStatement extends PartitionOperation
 {
+    public enum ArgSelect
+    {
+        MULTIROW, SAMEROW;
+        //TODO: FIRSTROW, LASTROW
+    }
+
     final PreparedStatement statement;
     final ConsistencyLevel cl;
     final int[] argumentIndex;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to