This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 6f27e4828c Accord metrics are isolated which cause existing 
coordination metrics to be empty, should also populate there as well
6f27e4828c is described below

commit 6f27e4828c8b68143d9c2af4bdbd94923f75347d
Author: David Capwell <[email protected]>
AuthorDate: Mon Oct 21 19:59:06 2024 -0700

    Accord metrics are isolated which cause existing coordination metrics to be 
empty, should also populate there as well
    
    patch by David Capwell; reviewed by Benedict Elliott Smith for 
CASSANDRA-20017
---
 .../cql3/statements/TransactionStatement.java      | 117 +++++++++------------
 .../cassandra/service/accord/AccordService.java    |  34 +++++-
 .../metrics/CoordinatorReadLatencyMetricTest.java  |  79 ++++++++++++--
 .../apache/cassandra/harry/model/SelectHelper.java |   2 +-
 .../apache/cassandra/cql3/ast/FunctionCall.java    |  10 ++
 .../unit/org/apache/cassandra/cql3/ast/Select.java |  22 +++-
 .../unit/org/apache/cassandra/cql3/ast/Symbol.java |   6 ++
 test/unit/org/apache/cassandra/cql3/ast/Txn.java   |  27 +++--
 8 files changed, 212 insertions(+), 85 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index e948d5dcaa..992d3af0f0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -37,8 +37,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import accord.api.Key;
 import accord.primitives.Keys;
@@ -95,8 +93,6 @@ import static 
org.apache.cassandra.service.accord.txn.TxnResult.Kind.retry_new_p
 
 public class TransactionStatement implements 
CQLStatement.CompositeCQLStatement, CQLStatement.ReturningCQLStatement
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(TransactionStatement.class);
-
     public static final String DUPLICATE_TUPLE_NAME_MESSAGE = "The name '%s' 
has already been used by a LET assignment.";
     public static final String INCOMPLETE_PARTITION_KEY_SELECT_MESSAGE = 
"SELECT must specify either all partition key elements. Partition key elements 
must be always specified with equality operators; %s %s";
     public static final String INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE = "SELECT 
must specify either all primary key elements or all partition key elements and 
LIMIT 1. In both cases partition key elements must be always specified with 
equality operators; %s %s";
@@ -380,82 +376,73 @@ public class TransactionStatement implements 
CQLStatement.CompositeCQLStatement,
     {
         checkTrue(DatabaseDescriptor.getAccordTransactionsEnabled(), 
TRANSACTIONS_DISABLED_MESSAGE);
 
-        try
-        {
-            // check again since now we have query options; note that 
statements are quaranted to be single partition reads at this point
-            for (NamedSelect assignment : assignments)
-                checkFalse(isSelectingMultipleClusterings(assignment.select, 
options), INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, "LET assignment", 
assignment.select.source);
+        // check again since now we have query options; note that statements 
are quaranted to be single partition reads at this point
+        for (NamedSelect assignment : assignments)
+            checkFalse(isSelectingMultipleClusterings(assignment.select, 
options), INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, "LET assignment", 
assignment.select.source);
 
-            Txn txn = createTxn(state.getClientState(), options);
+        Txn txn = createTxn(state.getClientState(), options);
 
-            TxnResult txnResult = AccordService.instance().coordinate(txn, 
options.getConsistency(), requestTime);
-            if (txnResult.kind() == retry_new_protocol)
-                throw new InvalidRequestException(UNSUPPORTED_MIGRATION);
-            TxnData data = (TxnData)txnResult;
+        TxnResult txnResult = AccordService.instance().coordinate(txn, 
options.getConsistency(), requestTime);
+        if (txnResult.kind() == retry_new_protocol)
+            throw new InvalidRequestException(UNSUPPORTED_MIGRATION);
+        TxnData data = (TxnData)txnResult;
 
-            if (returningSelect != null)
+        if (returningSelect != null)
+        {
+            @SuppressWarnings("unchecked")
+            SinglePartitionReadQuery.Group<SinglePartitionReadCommand> 
selectQuery = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) 
returningSelect.select.getQuery(options, 0);
+            Selection.Selectors selectors = 
returningSelect.select.getSelection().newSelectors(options);
+            ResultSetBuilder result = new ResultSetBuilder(resultMetadata, 
selectors, false);
+            if (selectQuery.queries.size() == 1)
+            {
+                FilteredPartition partition = 
data.get(TxnDataName.returning());
+                boolean reversed = selectQuery.queries.get(0).isReversed();
+                if (partition != null)
+                    
returningSelect.select.processPartition(partition.rowIterator(reversed), 
options, result, FBUtilities.nowInSeconds());
+            }
+            else
             {
-                @SuppressWarnings("unchecked")
-                SinglePartitionReadQuery.Group<SinglePartitionReadCommand> 
selectQuery = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>) 
returningSelect.select.getQuery(options, 0);
-                Selection.Selectors selectors = 
returningSelect.select.getSelection().newSelectors(options);
-                ResultSetBuilder result = new ResultSetBuilder(resultMetadata, 
selectors, false);
-                if (selectQuery.queries.size() == 1)
+                long nowInSec = FBUtilities.nowInSeconds();
+                for (int i = 0; i < selectQuery.queries.size(); i++)
                 {
-                    FilteredPartition partition = 
data.get(TxnDataName.returning());
-                    boolean reversed = selectQuery.queries.get(0).isReversed();
+                    FilteredPartition partition = 
data.get(TxnDataName.returning(i));
+                    boolean reversed = selectQuery.queries.get(i).isReversed();
                     if (partition != null)
-                        
returningSelect.select.processPartition(partition.rowIterator(reversed), 
options, result, FBUtilities.nowInSeconds());
-                }
-                else
-                {
-                    long nowInSec = FBUtilities.nowInSeconds();
-                    for (int i = 0; i < selectQuery.queries.size(); i++)
-                    {
-                        FilteredPartition partition = 
data.get(TxnDataName.returning(i));
-                        boolean reversed = 
selectQuery.queries.get(i).isReversed();
-                        if (partition != null)
-                            
returningSelect.select.processPartition(partition.rowIterator(reversed), 
options, result, nowInSec);
-                    }
+                        
returningSelect.select.processPartition(partition.rowIterator(reversed), 
options, result, nowInSec);
                 }
-                return new ResultMessage.Rows(result.build());
             }
+            return new ResultMessage.Rows(result.build());
+        }
 
-            if (returningReferences != null)
-            {
-                List<AbstractType<?>> resultType = new 
ArrayList<>(returningReferences.size());
-                List<ColumnMetadata> columns = new 
ArrayList<>(returningReferences.size());
-
-                for (RowDataReference reference : returningReferences)
-                {
-                    ColumnMetadata forMetadata = reference.toResultMetadata();
-                    resultType.add(forMetadata.type);
-                    columns.add(reference.column());
-                }
+        if (returningReferences != null)
+        {
+            List<AbstractType<?>> resultType = new 
ArrayList<>(returningReferences.size());
+            List<ColumnMetadata> columns = new 
ArrayList<>(returningReferences.size());
 
-                ResultSetBuilder result = new ResultSetBuilder(resultMetadata, 
Selection.noopSelector(), false);
-                result.newRow(options.getProtocolVersion(), null, null, 
columns);
+            for (RowDataReference reference : returningReferences)
+            {
+                ColumnMetadata forMetadata = reference.toResultMetadata();
+                resultType.add(forMetadata.type);
+                columns.add(reference.column());
+            }
 
-                for (int i = 0; i < returningReferences.size(); i++)
-                {
-                    RowDataReference reference = returningReferences.get(i);
-                    TxnReference txnReference = 
reference.toTxnReference(options);
-                    ByteBuffer buffer = txnReference.toByteBuffer(data, 
resultType.get(i));
-                    result.add(buffer);
-                }
+            ResultSetBuilder result = new ResultSetBuilder(resultMetadata, 
Selection.noopSelector(), false);
+            result.newRow(options.getProtocolVersion(), null, null, columns);
 
-                return new ResultMessage.Rows(result.build());
+            for (int i = 0; i < returningReferences.size(); i++)
+            {
+                RowDataReference reference = returningReferences.get(i);
+                TxnReference txnReference = reference.toTxnReference(options);
+                ByteBuffer buffer = txnReference.toByteBuffer(data, 
resultType.get(i));
+                result.add(buffer);
             }
 
-            // In the case of a write-only transaction, just return and empty 
result.
-            // TODO: This could be modified to return an indication of whether 
a condition (if present) succeeds.
-            return new ResultMessage.Void();
-        }
-        catch (Throwable t)
-        {
-            //TODO remove before merge to trunk
-           logger.error("Unexpected error with transaction: {}", t.toString());
-           throw t;
+            return new ResultMessage.Rows(result.build());
         }
+
+        // In the case of a write-only transaction, just return and empty 
result.
+        // TODO: This could be modified to return an indication of whether a 
condition (if present) succeeds.
+        return new ResultMessage.Void();
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index e73209da02..7c6806e203 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -128,6 +128,8 @@ import org.apache.cassandra.journal.Params;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.AccordClientRequestMetrics;
 import org.apache.cassandra.metrics.TCMMetrics;
+import org.apache.cassandra.metrics.ClientRequestMetrics;
+import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageDelivery;
@@ -874,12 +876,24 @@ public class AccordService implements IAccordService, 
Shutdownable
     public @Nonnull AsyncTxnResult coordinateAsync(Txn txn, ConsistencyLevel 
consistencyLevel, Dispatcher.RequestTime requestTime)
     {
         TxnId txnId = node.nextTxnId(txn.kind(), txn.keys().domain());
-        AccordClientRequestMetrics metrics = txn.isWrite() ? 
accordWriteMetrics : accordReadMetrics;
+        ClientRequestMetrics sharedMetrics;
+        AccordClientRequestMetrics metrics;
+        if (txn.isWrite())
+        {
+            sharedMetrics = ClientRequestsMetricsHolder.writeMetrics;
+            metrics = accordWriteMetrics;
+        }
+        else
+        {
+            sharedMetrics = ClientRequestsMetricsHolder.readMetrics;
+            metrics = accordReadMetrics;
+        }
         metrics.keySize.update(txn.keys().size());
         AsyncResult<Result> asyncResult = node.coordinate(txnId, txn);
         AsyncTxnResult asyncTxnResult = new AsyncTxnResult(txnId);
         asyncResult.addCallback((success, failure) -> {
             long durationNanos = nanoTime() - requestTime.startedAtNanos();
+            sharedMetrics.addNano(durationNanos);
             metrics.addNano(durationNanos);
             Throwable cause = failure != null ? 
Throwables.getRootCause(failure) : null;
             if (success != null)
@@ -902,6 +916,7 @@ public class AccordService implements IAccordService, 
Shutdownable
             }
             if (cause instanceof Preempted || cause instanceof Invalidated)
             {
+                sharedMetrics.timeouts.mark();
                 metrics.preempted.mark();
                 //TODO need to improve
                 // Coordinator "could" query the accord state to see whats 
going on but that doesn't exist yet.
@@ -909,6 +924,7 @@ public class AccordService implements IAccordService, 
Shutdownable
                 asyncTxnResult.tryFailure(newPreempted(txnId, txn.isWrite(), 
consistencyLevel));
                 return;
             }
+            sharedMetrics.failures.mark();
             if (cause instanceof TopologyMismatch)
             {
                 metrics.topologyMismatches.mark();
@@ -924,7 +940,18 @@ public class AccordService implements IAccordService, 
Shutdownable
     @Override
     public TxnResult getTxnResult(AsyncTxnResult asyncTxnResult, boolean 
isWrite, @Nullable ConsistencyLevel consistencyLevel, Dispatcher.RequestTime 
requestTime)
     {
-        AccordClientRequestMetrics metrics = isWrite ? accordWriteMetrics : 
accordReadMetrics;
+        ClientRequestMetrics sharedMetrics;
+        AccordClientRequestMetrics metrics;
+        if (isWrite)
+        {
+            sharedMetrics = ClientRequestsMetricsHolder.writeMetrics;
+            metrics = accordWriteMetrics;
+        }
+        else
+        {
+            sharedMetrics = ClientRequestsMetricsHolder.readMetrics;
+            metrics = accordReadMetrics;
+        }
         try
         {
             long deadlineNanos = 
requestTime.computeDeadline(DatabaseDescriptor.getTransactionTimeout(NANOSECONDS));
@@ -939,6 +966,7 @@ public class AccordService implements IAccordService, 
Shutdownable
             {
                 // Mark here instead of in coordinate async since this is 
where the request timeout actually occurs
                 metrics.timeouts.mark();
+                sharedMetrics.timeouts.mark();
                 cause.addSuppressed(e);
                 throw (RequestTimeoutException) cause;
             }
@@ -950,11 +978,13 @@ public class AccordService implements IAccordService, 
Shutdownable
         catch (InterruptedException e)
         {
             metrics.failures.mark();
+            sharedMetrics.failures.mark();
             throw new UncheckedInterruptedException(e);
         }
         catch (TimeoutException e)
         {
             metrics.timeouts.mark();
+            sharedMetrics.timeouts.mark();
             throw newTimeout(asyncTxnResult.txnId, isWrite, consistencyLevel);
         }
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java
index ab3de57cb6..34bca20729 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java
@@ -18,23 +18,68 @@
 
 package org.apache.cassandra.distributed.test.metrics;
 
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import org.junit.Test;
 
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.cql3.ast.Select;
+import org.apache.cassandra.cql3.ast.Txn;
+import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
+import org.apache.cassandra.service.consensus.TransactionalMode;
 import org.apache.cassandra.service.paxos.Paxos;
+import org.assertj.core.api.Assertions;
 
+import static org.apache.cassandra.cql3.ast.Where.Inequalities.EQUAL;
+import static org.apache.cassandra.cql3.ast.Where.Inequalities.LESS_THAN;
 import static org.junit.Assert.assertTrue;
 
 public class CoordinatorReadLatencyMetricTest extends TestBaseImpl
 {
+    @Test
+    public void singleRowTest() throws IOException
+    {
+        try (Cluster cluster = init(builder().withNodes(1).start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck 
int, v int, PRIMARY KEY (pk, ck))"));
+            for (int i = 0; i < 100; i++)
+                cluster.coordinator(1).execute(withKeyspace("insert into 
%s.tbl (pk, ck ,v) values (0, ?, 1)"), ConsistencyLevel.ALL, i);
+
+            var select = Select.builder()
+                               //TODO (now, correctness, coverage): count(v) 
breaks accord as we get mutliple rows rather than the count of rows...
+//                               .withSelection(FunctionCall.count("v"))
+                               .withTable(KEYSPACE, "tbl")
+                               .withWhere("pk", EQUAL, 0)
+                               .withWhere("ck", LESS_THAN, 42)
+                               .withLimit(1)
+                               .build();
+
+            verifyTableLatency(cluster, 1, () -> verifyLatencyMetrics(cluster, 
select.toCQL(), ConsistencyLevel.QUORUM));
+            cluster.get(1).runOnInstance(() -> 
Paxos.setPaxosVariant(Config.PaxosVariant.v1));
+            verifyTableLatency(cluster, 1, () -> verifyLatencyMetrics(cluster, 
select.toCQL(), ConsistencyLevel.SERIAL));
+            cluster.get(1).runOnInstance(() -> 
Paxos.setPaxosVariant(Config.PaxosVariant.v2));
+            verifyTableLatency(cluster, 1, () -> verifyLatencyMetrics(cluster, 
select.toCQL(), ConsistencyLevel.SERIAL));
+
+            cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl WITH " + 
TransactionalMode.full.asCqlParam()));
+            verifyTableLatency(cluster, 1, () -> verifyLatencyMetrics(cluster, 
Txn.wrap(select).toCQL(), ConsistencyLevel.QUORUM));
+
+            var let = Txn.builder()
+                         .addLet("a", select)
+                         .addReturnReferences("a.v")
+                         .build();
+            verifyTableLatency(cluster, 1, () -> verifyLatencyMetrics(cluster, 
let.toCQL(), ConsistencyLevel.QUORUM));
+        }
+    }
+
     @Test
     public void internalPagingWithAggregateTest() throws Throwable
     {
@@ -91,16 +136,26 @@ public class CoordinatorReadLatencyMetricTest extends 
TestBaseImpl
         }
     }
 
-    private void verifyLatencyMetricsWhenPaging(Cluster cluster,
-                                                int pagesize,
-                                                int expectedQueries,
-                                                String query,
-                                                ConsistencyLevel 
consistencyLevel)
+    private static void verifyLatencyMetricsWhenPaging(Cluster cluster,
+                                                       int pagesize,
+                                                       int expectedQueries,
+                                                       String query,
+                                                       ConsistencyLevel 
consistencyLevel)
+    {
+        verifyLatencyMetrics(cluster, expectedQueries, () -> 
cluster.coordinator(1).executeWithPaging(query, consistencyLevel, pagesize));
+    }
+
+    private static void verifyLatencyMetrics(Cluster cluster, String query, 
ConsistencyLevel consistencyLevel)
+    {
+        verifyLatencyMetrics(cluster, 1, () -> 
cluster.coordinator(1).execute(query, consistencyLevel));
+    }
+
+    private static void verifyLatencyMetrics(Cluster cluster, int 
expectedQueries, Runnable query)
     {
         long countBefore = cluster.get(1).callOnInstance(() -> 
ClientRequestsMetricsHolder.readMetrics.latency.getCount());
         long totalLatencyBefore = cluster.get(1).callOnInstance(() -> 
ClientRequestsMetricsHolder.readMetrics.totalLatency.getCount());
         long startTime = System.nanoTime();
-        cluster.coordinator(1).executeWithPaging(query, consistencyLevel, 
pagesize);
+        query.run();
         long elapsedTime = System.nanoTime() - startTime;
         long countAfter = cluster.get(1).callOnInstance(() -> 
ClientRequestsMetricsHolder.readMetrics.latency.getCount());
         long totalLatencyAfter = cluster.get(1).callOnInstance(() -> 
ClientRequestsMetricsHolder.readMetrics.totalLatency.getCount());
@@ -113,4 +168,16 @@ public class CoordinatorReadLatencyMetricTest extends 
TestBaseImpl
                    totalLatencyRecorded <= elapsedTime);
     }
 
+    private static void verifyTableLatency(Cluster cluster, int 
expectedQueries, Runnable query)
+    {
+        IInvokableInstance inst = cluster.get(1);
+        LongSupplier tableMetric = () -> inst.callOnInstance(() -> 
Keyspace.open("distributed_test_keyspace").getColumnFamilyStore("tbl").getMetrics().readLatency.latency.getCount());
+
+        long tableBefore = tableMetric.getAsLong();
+        query.run();
+        long tableAfter = tableMetric.getAsLong();
+
+        Assertions.assertThat(tableAfter - 
tableBefore).isEqualTo(expectedQueries);
+    }
+
 }
diff --git a/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java 
b/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java
index e053ce7683..6b200e2dd9 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java
@@ -140,7 +140,7 @@ public class SelectHelper
     private static SchemaSpec.AddRelationCallback 
appendToBuilderCallback(Select.Builder builder)
     {
         return (spec, kind, value) ->
-               builder.withWhere(toInequalities(kind), Reference.of(new 
Symbol(spec.name, spec.type.asServerType())), new Bind(value, 
spec.type.asServerType()));
+               builder.withWhere(Reference.of(new Symbol(spec.name, 
spec.type.asServerType())), toInequalities(kind), new Bind(value, 
spec.type.asServerType()));
     }
 
     private static Where.Inequalities toInequalities(Relation.RelationKind 
kind)
diff --git a/test/unit/org/apache/cassandra/cql3/ast/FunctionCall.java 
b/test/unit/org/apache/cassandra/cql3/ast/FunctionCall.java
index 0d325b5123..c136c6341d 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/FunctionCall.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/FunctionCall.java
@@ -79,4 +79,14 @@ public class FunctionCall implements Expression
     {
         return new FunctionCall("count(*)", Collections.emptyList(), 
LongType.instance);
     }
+
+    public static FunctionCall count(String symbol)
+    {
+        return count(Symbol.unknownType(symbol));
+    }
+
+    public static FunctionCall count(Symbol symbol)
+    {
+        return new FunctionCall("count", 
Collections.singletonList(Reference.of(symbol)), LongType.instance);
+    }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Select.java 
b/test/unit/org/apache/cassandra/cql3/ast/Select.java
index bfe39c425d..9ac541ee74 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Select.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Select.java
@@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.schema.TableMetadata;
 
@@ -89,6 +90,11 @@ FROM [keyspace_name.] table_name
         }
     }
 
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
     public Select withAllowFiltering()
     {
         return new Select(selections, source, where, orderBy, limit, true);
@@ -309,12 +315,22 @@ FROM [keyspace_name.] table_name
             return this;
         }
 
-        public Builder withWhere(Where.Inequalities kind, ReferenceExpression 
ref, Expression expression)
+        public Builder withWhere(ReferenceExpression ref, Where.Inequalities 
kind, Expression expression)
         {
             where.where(kind, ref, expression);
             return this;
         }
 
+        public Builder withWhere(String name, Where.Inequalities kind, int 
value)
+        {
+            return withWhere(kind, name, value, Int32Type.instance);
+        }
+
+        public <T> Builder withWhere(Where.Inequalities kind, String name, T 
value, AbstractType<T> type)
+        {
+            return withWhere(Reference.of(new Symbol(name, type)), kind, new 
Literal(value, type));
+        }
+
         public Builder withIn(ReferenceExpression symbol, Expression... 
expressions)
         {
             where.in(symbol, expressions);
@@ -333,12 +349,12 @@ FROM [keyspace_name.] table_name
         public Builder withColumnEquals(String column, ByteBuffer value)
         {
             BytesType type = BytesType.instance;
-            return withWhere(Where.Inequalities.EQUAL, Reference.of(new 
Symbol(column, type)), new Bind(value, type));
+            return withWhere(Reference.of(new Symbol(column, type)), 
Where.Inequalities.EQUAL, new Bind(value, type));
         }
 
         public Builder withColumnEquals(Symbol column, Expression value)
         {
-            return withWhere(Where.Inequalities.EQUAL, Reference.of(column), 
value);
+            return withWhere(Reference.of(column), Where.Inequalities.EQUAL, 
value);
         }
 
         public Builder withOrderByColumn(String name, AbstractType<?> type, 
OrderBy.Ordering ordering)
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Symbol.java 
b/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
index dc4c0aab3c..a3574599f1 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
@@ -25,6 +25,7 @@ import java.util.regex.Pattern;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.ReservedKeywords;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.schema.ColumnMetadata;
 
 public class Symbol implements ReferenceExpression, Comparable<Symbol>
@@ -48,6 +49,11 @@ public class Symbol implements ReferenceExpression, 
Comparable<Symbol>
         return new Symbol(metadata.name.toString(), metadata.type.unwrap());
     }
 
+    public static Symbol unknownType(String name)
+    {
+        return new Symbol(name, BytesType.instance);
+    }
+
     @Override
     public void toCQL(StringBuilder sb, int indent)
     {
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Txn.java 
b/test/unit/org/apache/cassandra/cql3/ast/Txn.java
index 03c0588860..3ce90f5ce8 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Txn.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Txn.java
@@ -67,6 +67,11 @@ public class Txn implements Statement
         return new Txn(Collections.emptyList(), Optional.empty(), 
Optional.empty(), Collections.singletonList(mutation));
     }
 
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
     @Override
     public void toCQL(StringBuilder sb, int indent)
     {
@@ -171,12 +176,12 @@ public class Txn implements Statement
             return !output.isPresent() && !ifBlock.isPresent() && 
mutations.isEmpty();
         }
 
-        public void addLet(String name, Select.Builder select)
+        public Builder addLet(String name, Select.Builder select)
         {
-            addLet(name, select.build());
+            return addLet(name, select.build());
         }
 
-        public void addLet(String name, Select select)
+        public Builder addLet(String name, Select select)
         {
             if (lets.containsKey(name))
                 throw new IllegalArgumentException("Let name " + name + " 
already exists");
@@ -185,6 +190,7 @@ public class Txn implements Statement
             Reference ref = Reference.of(new Symbol.UnquotedSymbol(name, 
toNamedTuple(select)));
             for (Expression e : select.selections)
                 addAllowedReference(ref.add(e));
+            return this;
         }
 
         private AbstractType<?> toNamedTuple(Select select)
@@ -200,23 +206,26 @@ public class Txn implements Statement
             return new UserType(null, null, fieldNames, fieldTypes, false);
         }
 
-        private void addAllowedReference(Reference ref)
+        private Builder addAllowedReference(Reference ref)
         {
             allowedReferences.add(ref);
             recursiveReferences(allowedReferences, ref);
+            return this;
         }
 
-        public void addReturn(Select select)
+        public Builder addReturn(Select select)
         {
             output = Optional.of(select);
+            return this;
         }
 
-        public void addReturnReferences(String... names)
+        public Builder addReturnReferences(String... names)
         {
             Select.Builder builder = new Select.Builder();
             for (String name : names)
                 builder.withSelection(ref(name));
             addReturn(builder.build());
+            return this;
         }
 
         private Reference ref(String name)
@@ -229,14 +238,16 @@ public class Txn implements Statement
             return builder.build();
         }
 
-        public void addIf(If block)
+        public Builder addIf(If block)
         {
             ifBlock = Optional.of(block);
+            return this;
         }
 
-        public void addUpdate(Mutation mutation)
+        public Builder addUpdate(Mutation mutation)
         {
             this.mutations.add(Objects.requireNonNull(mutation));
+            return this;
         }
 
         public Txn build()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to