This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 6f27e4828c Accord metrics are isolated which cause existing
coordination metrics to be empty, should also populate there as well
6f27e4828c is described below
commit 6f27e4828c8b68143d9c2af4bdbd94923f75347d
Author: David Capwell <[email protected]>
AuthorDate: Mon Oct 21 19:59:06 2024 -0700
Accord metrics are isolated which cause existing coordination metrics to be
empty, should also populate there as well
patch by David Capwell; reviewed by Benedict Elliott Smith for
CASSANDRA-20017
---
.../cql3/statements/TransactionStatement.java | 117 +++++++++------------
.../cassandra/service/accord/AccordService.java | 34 +++++-
.../metrics/CoordinatorReadLatencyMetricTest.java | 79 ++++++++++++--
.../apache/cassandra/harry/model/SelectHelper.java | 2 +-
.../apache/cassandra/cql3/ast/FunctionCall.java | 10 ++
.../unit/org/apache/cassandra/cql3/ast/Select.java | 22 +++-
.../unit/org/apache/cassandra/cql3/ast/Symbol.java | 6 ++
test/unit/org/apache/cassandra/cql3/ast/Txn.java | 27 +++--
8 files changed, 212 insertions(+), 85 deletions(-)
diff --git
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index e948d5dcaa..992d3af0f0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -37,8 +37,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import accord.api.Key;
import accord.primitives.Keys;
@@ -95,8 +93,6 @@ import static
org.apache.cassandra.service.accord.txn.TxnResult.Kind.retry_new_p
public class TransactionStatement implements
CQLStatement.CompositeCQLStatement, CQLStatement.ReturningCQLStatement
{
- private static final Logger logger =
LoggerFactory.getLogger(TransactionStatement.class);
-
public static final String DUPLICATE_TUPLE_NAME_MESSAGE = "The name '%s'
has already been used by a LET assignment.";
public static final String INCOMPLETE_PARTITION_KEY_SELECT_MESSAGE =
"SELECT must specify either all partition key elements. Partition key elements
must be always specified with equality operators; %s %s";
public static final String INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE = "SELECT
must specify either all primary key elements or all partition key elements and
LIMIT 1. In both cases partition key elements must be always specified with
equality operators; %s %s";
@@ -380,82 +376,73 @@ public class TransactionStatement implements
CQLStatement.CompositeCQLStatement,
{
checkTrue(DatabaseDescriptor.getAccordTransactionsEnabled(),
TRANSACTIONS_DISABLED_MESSAGE);
- try
- {
- // check again since now we have query options; note that
statements are quaranted to be single partition reads at this point
- for (NamedSelect assignment : assignments)
- checkFalse(isSelectingMultipleClusterings(assignment.select,
options), INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, "LET assignment",
assignment.select.source);
+ // check again since now we have query options; note that statements
are quaranted to be single partition reads at this point
+ for (NamedSelect assignment : assignments)
+ checkFalse(isSelectingMultipleClusterings(assignment.select,
options), INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, "LET assignment",
assignment.select.source);
- Txn txn = createTxn(state.getClientState(), options);
+ Txn txn = createTxn(state.getClientState(), options);
- TxnResult txnResult = AccordService.instance().coordinate(txn,
options.getConsistency(), requestTime);
- if (txnResult.kind() == retry_new_protocol)
- throw new InvalidRequestException(UNSUPPORTED_MIGRATION);
- TxnData data = (TxnData)txnResult;
+ TxnResult txnResult = AccordService.instance().coordinate(txn,
options.getConsistency(), requestTime);
+ if (txnResult.kind() == retry_new_protocol)
+ throw new InvalidRequestException(UNSUPPORTED_MIGRATION);
+ TxnData data = (TxnData)txnResult;
- if (returningSelect != null)
+ if (returningSelect != null)
+ {
+ @SuppressWarnings("unchecked")
+ SinglePartitionReadQuery.Group<SinglePartitionReadCommand>
selectQuery = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>)
returningSelect.select.getQuery(options, 0);
+ Selection.Selectors selectors =
returningSelect.select.getSelection().newSelectors(options);
+ ResultSetBuilder result = new ResultSetBuilder(resultMetadata,
selectors, false);
+ if (selectQuery.queries.size() == 1)
+ {
+ FilteredPartition partition =
data.get(TxnDataName.returning());
+ boolean reversed = selectQuery.queries.get(0).isReversed();
+ if (partition != null)
+
returningSelect.select.processPartition(partition.rowIterator(reversed),
options, result, FBUtilities.nowInSeconds());
+ }
+ else
{
- @SuppressWarnings("unchecked")
- SinglePartitionReadQuery.Group<SinglePartitionReadCommand>
selectQuery = (SinglePartitionReadQuery.Group<SinglePartitionReadCommand>)
returningSelect.select.getQuery(options, 0);
- Selection.Selectors selectors =
returningSelect.select.getSelection().newSelectors(options);
- ResultSetBuilder result = new ResultSetBuilder(resultMetadata,
selectors, false);
- if (selectQuery.queries.size() == 1)
+ long nowInSec = FBUtilities.nowInSeconds();
+ for (int i = 0; i < selectQuery.queries.size(); i++)
{
- FilteredPartition partition =
data.get(TxnDataName.returning());
- boolean reversed = selectQuery.queries.get(0).isReversed();
+ FilteredPartition partition =
data.get(TxnDataName.returning(i));
+ boolean reversed = selectQuery.queries.get(i).isReversed();
if (partition != null)
-
returningSelect.select.processPartition(partition.rowIterator(reversed),
options, result, FBUtilities.nowInSeconds());
- }
- else
- {
- long nowInSec = FBUtilities.nowInSeconds();
- for (int i = 0; i < selectQuery.queries.size(); i++)
- {
- FilteredPartition partition =
data.get(TxnDataName.returning(i));
- boolean reversed =
selectQuery.queries.get(i).isReversed();
- if (partition != null)
-
returningSelect.select.processPartition(partition.rowIterator(reversed),
options, result, nowInSec);
- }
+
returningSelect.select.processPartition(partition.rowIterator(reversed),
options, result, nowInSec);
}
- return new ResultMessage.Rows(result.build());
}
+ return new ResultMessage.Rows(result.build());
+ }
- if (returningReferences != null)
- {
- List<AbstractType<?>> resultType = new
ArrayList<>(returningReferences.size());
- List<ColumnMetadata> columns = new
ArrayList<>(returningReferences.size());
-
- for (RowDataReference reference : returningReferences)
- {
- ColumnMetadata forMetadata = reference.toResultMetadata();
- resultType.add(forMetadata.type);
- columns.add(reference.column());
- }
+ if (returningReferences != null)
+ {
+ List<AbstractType<?>> resultType = new
ArrayList<>(returningReferences.size());
+ List<ColumnMetadata> columns = new
ArrayList<>(returningReferences.size());
- ResultSetBuilder result = new ResultSetBuilder(resultMetadata,
Selection.noopSelector(), false);
- result.newRow(options.getProtocolVersion(), null, null,
columns);
+ for (RowDataReference reference : returningReferences)
+ {
+ ColumnMetadata forMetadata = reference.toResultMetadata();
+ resultType.add(forMetadata.type);
+ columns.add(reference.column());
+ }
- for (int i = 0; i < returningReferences.size(); i++)
- {
- RowDataReference reference = returningReferences.get(i);
- TxnReference txnReference =
reference.toTxnReference(options);
- ByteBuffer buffer = txnReference.toByteBuffer(data,
resultType.get(i));
- result.add(buffer);
- }
+ ResultSetBuilder result = new ResultSetBuilder(resultMetadata,
Selection.noopSelector(), false);
+ result.newRow(options.getProtocolVersion(), null, null, columns);
- return new ResultMessage.Rows(result.build());
+ for (int i = 0; i < returningReferences.size(); i++)
+ {
+ RowDataReference reference = returningReferences.get(i);
+ TxnReference txnReference = reference.toTxnReference(options);
+ ByteBuffer buffer = txnReference.toByteBuffer(data,
resultType.get(i));
+ result.add(buffer);
}
- // In the case of a write-only transaction, just return and empty
result.
- // TODO: This could be modified to return an indication of whether
a condition (if present) succeeds.
- return new ResultMessage.Void();
- }
- catch (Throwable t)
- {
- //TODO remove before merge to trunk
- logger.error("Unexpected error with transaction: {}", t.toString());
- throw t;
+ return new ResultMessage.Rows(result.build());
}
+
+ // In the case of a write-only transaction, just return and empty
result.
+ // TODO: This could be modified to return an indication of whether a
condition (if present) succeeds.
+ return new ResultMessage.Void();
}
@Override
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index e73209da02..7c6806e203 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -128,6 +128,8 @@ import org.apache.cassandra.journal.Params;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.metrics.AccordClientRequestMetrics;
import org.apache.cassandra.metrics.TCMMetrics;
+import org.apache.cassandra.metrics.ClientRequestMetrics;
+import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
@@ -874,12 +876,24 @@ public class AccordService implements IAccordService,
Shutdownable
public @Nonnull AsyncTxnResult coordinateAsync(Txn txn, ConsistencyLevel
consistencyLevel, Dispatcher.RequestTime requestTime)
{
TxnId txnId = node.nextTxnId(txn.kind(), txn.keys().domain());
- AccordClientRequestMetrics metrics = txn.isWrite() ?
accordWriteMetrics : accordReadMetrics;
+ ClientRequestMetrics sharedMetrics;
+ AccordClientRequestMetrics metrics;
+ if (txn.isWrite())
+ {
+ sharedMetrics = ClientRequestsMetricsHolder.writeMetrics;
+ metrics = accordWriteMetrics;
+ }
+ else
+ {
+ sharedMetrics = ClientRequestsMetricsHolder.readMetrics;
+ metrics = accordReadMetrics;
+ }
metrics.keySize.update(txn.keys().size());
AsyncResult<Result> asyncResult = node.coordinate(txnId, txn);
AsyncTxnResult asyncTxnResult = new AsyncTxnResult(txnId);
asyncResult.addCallback((success, failure) -> {
long durationNanos = nanoTime() - requestTime.startedAtNanos();
+ sharedMetrics.addNano(durationNanos);
metrics.addNano(durationNanos);
Throwable cause = failure != null ?
Throwables.getRootCause(failure) : null;
if (success != null)
@@ -902,6 +916,7 @@ public class AccordService implements IAccordService,
Shutdownable
}
if (cause instanceof Preempted || cause instanceof Invalidated)
{
+ sharedMetrics.timeouts.mark();
metrics.preempted.mark();
//TODO need to improve
// Coordinator "could" query the accord state to see whats
going on but that doesn't exist yet.
@@ -909,6 +924,7 @@ public class AccordService implements IAccordService,
Shutdownable
asyncTxnResult.tryFailure(newPreempted(txnId, txn.isWrite(),
consistencyLevel));
return;
}
+ sharedMetrics.failures.mark();
if (cause instanceof TopologyMismatch)
{
metrics.topologyMismatches.mark();
@@ -924,7 +940,18 @@ public class AccordService implements IAccordService,
Shutdownable
@Override
public TxnResult getTxnResult(AsyncTxnResult asyncTxnResult, boolean
isWrite, @Nullable ConsistencyLevel consistencyLevel, Dispatcher.RequestTime
requestTime)
{
- AccordClientRequestMetrics metrics = isWrite ? accordWriteMetrics :
accordReadMetrics;
+ ClientRequestMetrics sharedMetrics;
+ AccordClientRequestMetrics metrics;
+ if (isWrite)
+ {
+ sharedMetrics = ClientRequestsMetricsHolder.writeMetrics;
+ metrics = accordWriteMetrics;
+ }
+ else
+ {
+ sharedMetrics = ClientRequestsMetricsHolder.readMetrics;
+ metrics = accordReadMetrics;
+ }
try
{
long deadlineNanos =
requestTime.computeDeadline(DatabaseDescriptor.getTransactionTimeout(NANOSECONDS));
@@ -939,6 +966,7 @@ public class AccordService implements IAccordService,
Shutdownable
{
// Mark here instead of in coordinate async since this is
where the request timeout actually occurs
metrics.timeouts.mark();
+ sharedMetrics.timeouts.mark();
cause.addSuppressed(e);
throw (RequestTimeoutException) cause;
}
@@ -950,11 +978,13 @@ public class AccordService implements IAccordService,
Shutdownable
catch (InterruptedException e)
{
metrics.failures.mark();
+ sharedMetrics.failures.mark();
throw new UncheckedInterruptedException(e);
}
catch (TimeoutException e)
{
metrics.timeouts.mark();
+ sharedMetrics.timeouts.mark();
throw newTimeout(asyncTxnResult.txnId, isWrite, consistencyLevel);
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java
b/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java
index ab3de57cb6..34bca20729 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/metrics/CoordinatorReadLatencyMetricTest.java
@@ -18,23 +18,68 @@
package org.apache.cassandra.distributed.test.metrics;
+import java.io.IOException;
import java.util.concurrent.TimeUnit;
+import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Test;
import org.apache.cassandra.config.Config;
+import org.apache.cassandra.cql3.ast.Select;
+import org.apache.cassandra.cql3.ast.Txn;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.metrics.ClientRequestsMetricsHolder;
+import org.apache.cassandra.service.consensus.TransactionalMode;
import org.apache.cassandra.service.paxos.Paxos;
+import org.assertj.core.api.Assertions;
+import static org.apache.cassandra.cql3.ast.Where.Inequalities.EQUAL;
+import static org.apache.cassandra.cql3.ast.Where.Inequalities.LESS_THAN;
import static org.junit.Assert.assertTrue;
public class CoordinatorReadLatencyMetricTest extends TestBaseImpl
{
+ @Test
+ public void singleRowTest() throws IOException
+ {
+ try (Cluster cluster = init(builder().withNodes(1).start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck
int, v int, PRIMARY KEY (pk, ck))"));
+ for (int i = 0; i < 100; i++)
+ cluster.coordinator(1).execute(withKeyspace("insert into
%s.tbl (pk, ck ,v) values (0, ?, 1)"), ConsistencyLevel.ALL, i);
+
+ var select = Select.builder()
+ //TODO (now, correctness, coverage): count(v)
breaks accord as we get mutliple rows rather than the count of rows...
+// .withSelection(FunctionCall.count("v"))
+ .withTable(KEYSPACE, "tbl")
+ .withWhere("pk", EQUAL, 0)
+ .withWhere("ck", LESS_THAN, 42)
+ .withLimit(1)
+ .build();
+
+ verifyTableLatency(cluster, 1, () -> verifyLatencyMetrics(cluster,
select.toCQL(), ConsistencyLevel.QUORUM));
+ cluster.get(1).runOnInstance(() ->
Paxos.setPaxosVariant(Config.PaxosVariant.v1));
+ verifyTableLatency(cluster, 1, () -> verifyLatencyMetrics(cluster,
select.toCQL(), ConsistencyLevel.SERIAL));
+ cluster.get(1).runOnInstance(() ->
Paxos.setPaxosVariant(Config.PaxosVariant.v2));
+ verifyTableLatency(cluster, 1, () -> verifyLatencyMetrics(cluster,
select.toCQL(), ConsistencyLevel.SERIAL));
+
+ cluster.schemaChange(withKeyspace("ALTER TABLE %s.tbl WITH " +
TransactionalMode.full.asCqlParam()));
+ verifyTableLatency(cluster, 1, () -> verifyLatencyMetrics(cluster,
Txn.wrap(select).toCQL(), ConsistencyLevel.QUORUM));
+
+ var let = Txn.builder()
+ .addLet("a", select)
+ .addReturnReferences("a.v")
+ .build();
+ verifyTableLatency(cluster, 1, () -> verifyLatencyMetrics(cluster,
let.toCQL(), ConsistencyLevel.QUORUM));
+ }
+ }
+
@Test
public void internalPagingWithAggregateTest() throws Throwable
{
@@ -91,16 +136,26 @@ public class CoordinatorReadLatencyMetricTest extends
TestBaseImpl
}
}
- private void verifyLatencyMetricsWhenPaging(Cluster cluster,
- int pagesize,
- int expectedQueries,
- String query,
- ConsistencyLevel
consistencyLevel)
+ private static void verifyLatencyMetricsWhenPaging(Cluster cluster,
+ int pagesize,
+ int expectedQueries,
+ String query,
+ ConsistencyLevel
consistencyLevel)
+ {
+ verifyLatencyMetrics(cluster, expectedQueries, () ->
cluster.coordinator(1).executeWithPaging(query, consistencyLevel, pagesize));
+ }
+
+ private static void verifyLatencyMetrics(Cluster cluster, String query,
ConsistencyLevel consistencyLevel)
+ {
+ verifyLatencyMetrics(cluster, 1, () ->
cluster.coordinator(1).execute(query, consistencyLevel));
+ }
+
+ private static void verifyLatencyMetrics(Cluster cluster, int
expectedQueries, Runnable query)
{
long countBefore = cluster.get(1).callOnInstance(() ->
ClientRequestsMetricsHolder.readMetrics.latency.getCount());
long totalLatencyBefore = cluster.get(1).callOnInstance(() ->
ClientRequestsMetricsHolder.readMetrics.totalLatency.getCount());
long startTime = System.nanoTime();
- cluster.coordinator(1).executeWithPaging(query, consistencyLevel,
pagesize);
+ query.run();
long elapsedTime = System.nanoTime() - startTime;
long countAfter = cluster.get(1).callOnInstance(() ->
ClientRequestsMetricsHolder.readMetrics.latency.getCount());
long totalLatencyAfter = cluster.get(1).callOnInstance(() ->
ClientRequestsMetricsHolder.readMetrics.totalLatency.getCount());
@@ -113,4 +168,16 @@ public class CoordinatorReadLatencyMetricTest extends
TestBaseImpl
totalLatencyRecorded <= elapsedTime);
}
+ private static void verifyTableLatency(Cluster cluster, int
expectedQueries, Runnable query)
+ {
+ IInvokableInstance inst = cluster.get(1);
+ LongSupplier tableMetric = () -> inst.callOnInstance(() ->
Keyspace.open("distributed_test_keyspace").getColumnFamilyStore("tbl").getMetrics().readLatency.latency.getCount());
+
+ long tableBefore = tableMetric.getAsLong();
+ query.run();
+ long tableAfter = tableMetric.getAsLong();
+
+ Assertions.assertThat(tableAfter -
tableBefore).isEqualTo(expectedQueries);
+ }
+
}
diff --git a/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java
b/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java
index e053ce7683..6b200e2dd9 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java
@@ -140,7 +140,7 @@ public class SelectHelper
private static SchemaSpec.AddRelationCallback
appendToBuilderCallback(Select.Builder builder)
{
return (spec, kind, value) ->
- builder.withWhere(toInequalities(kind), Reference.of(new
Symbol(spec.name, spec.type.asServerType())), new Bind(value,
spec.type.asServerType()));
+ builder.withWhere(Reference.of(new Symbol(spec.name,
spec.type.asServerType())), toInequalities(kind), new Bind(value,
spec.type.asServerType()));
}
private static Where.Inequalities toInequalities(Relation.RelationKind
kind)
diff --git a/test/unit/org/apache/cassandra/cql3/ast/FunctionCall.java
b/test/unit/org/apache/cassandra/cql3/ast/FunctionCall.java
index 0d325b5123..c136c6341d 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/FunctionCall.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/FunctionCall.java
@@ -79,4 +79,14 @@ public class FunctionCall implements Expression
{
return new FunctionCall("count(*)", Collections.emptyList(),
LongType.instance);
}
+
+ public static FunctionCall count(String symbol)
+ {
+ return count(Symbol.unknownType(symbol));
+ }
+
+ public static FunctionCall count(Symbol symbol)
+ {
+ return new FunctionCall("count",
Collections.singletonList(Reference.of(symbol)), LongType.instance);
+ }
}
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Select.java
b/test/unit/org/apache/cassandra/cql3/ast/Select.java
index bfe39c425d..9ac541ee74 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Select.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Select.java
@@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.schema.TableMetadata;
@@ -89,6 +90,11 @@ FROM [keyspace_name.] table_name
}
}
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
public Select withAllowFiltering()
{
return new Select(selections, source, where, orderBy, limit, true);
@@ -309,12 +315,22 @@ FROM [keyspace_name.] table_name
return this;
}
- public Builder withWhere(Where.Inequalities kind, ReferenceExpression
ref, Expression expression)
+ public Builder withWhere(ReferenceExpression ref, Where.Inequalities
kind, Expression expression)
{
where.where(kind, ref, expression);
return this;
}
+ public Builder withWhere(String name, Where.Inequalities kind, int
value)
+ {
+ return withWhere(kind, name, value, Int32Type.instance);
+ }
+
+ public <T> Builder withWhere(Where.Inequalities kind, String name, T
value, AbstractType<T> type)
+ {
+ return withWhere(Reference.of(new Symbol(name, type)), kind, new
Literal(value, type));
+ }
+
public Builder withIn(ReferenceExpression symbol, Expression...
expressions)
{
where.in(symbol, expressions);
@@ -333,12 +349,12 @@ FROM [keyspace_name.] table_name
public Builder withColumnEquals(String column, ByteBuffer value)
{
BytesType type = BytesType.instance;
- return withWhere(Where.Inequalities.EQUAL, Reference.of(new
Symbol(column, type)), new Bind(value, type));
+ return withWhere(Reference.of(new Symbol(column, type)),
Where.Inequalities.EQUAL, new Bind(value, type));
}
public Builder withColumnEquals(Symbol column, Expression value)
{
- return withWhere(Where.Inequalities.EQUAL, Reference.of(column),
value);
+ return withWhere(Reference.of(column), Where.Inequalities.EQUAL,
value);
}
public Builder withOrderByColumn(String name, AbstractType<?> type,
OrderBy.Ordering ordering)
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
b/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
index dc4c0aab3c..a3574599f1 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Symbol.java
@@ -25,6 +25,7 @@ import java.util.regex.Pattern;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.ReservedKeywords;
import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.schema.ColumnMetadata;
public class Symbol implements ReferenceExpression, Comparable<Symbol>
@@ -48,6 +49,11 @@ public class Symbol implements ReferenceExpression,
Comparable<Symbol>
return new Symbol(metadata.name.toString(), metadata.type.unwrap());
}
+ public static Symbol unknownType(String name)
+ {
+ return new Symbol(name, BytesType.instance);
+ }
+
@Override
public void toCQL(StringBuilder sb, int indent)
{
diff --git a/test/unit/org/apache/cassandra/cql3/ast/Txn.java
b/test/unit/org/apache/cassandra/cql3/ast/Txn.java
index 03c0588860..3ce90f5ce8 100644
--- a/test/unit/org/apache/cassandra/cql3/ast/Txn.java
+++ b/test/unit/org/apache/cassandra/cql3/ast/Txn.java
@@ -67,6 +67,11 @@ public class Txn implements Statement
return new Txn(Collections.emptyList(), Optional.empty(),
Optional.empty(), Collections.singletonList(mutation));
}
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
@Override
public void toCQL(StringBuilder sb, int indent)
{
@@ -171,12 +176,12 @@ public class Txn implements Statement
return !output.isPresent() && !ifBlock.isPresent() &&
mutations.isEmpty();
}
- public void addLet(String name, Select.Builder select)
+ public Builder addLet(String name, Select.Builder select)
{
- addLet(name, select.build());
+ return addLet(name, select.build());
}
- public void addLet(String name, Select select)
+ public Builder addLet(String name, Select select)
{
if (lets.containsKey(name))
throw new IllegalArgumentException("Let name " + name + "
already exists");
@@ -185,6 +190,7 @@ public class Txn implements Statement
Reference ref = Reference.of(new Symbol.UnquotedSymbol(name,
toNamedTuple(select)));
for (Expression e : select.selections)
addAllowedReference(ref.add(e));
+ return this;
}
private AbstractType<?> toNamedTuple(Select select)
@@ -200,23 +206,26 @@ public class Txn implements Statement
return new UserType(null, null, fieldNames, fieldTypes, false);
}
- private void addAllowedReference(Reference ref)
+ private Builder addAllowedReference(Reference ref)
{
allowedReferences.add(ref);
recursiveReferences(allowedReferences, ref);
+ return this;
}
- public void addReturn(Select select)
+ public Builder addReturn(Select select)
{
output = Optional.of(select);
+ return this;
}
- public void addReturnReferences(String... names)
+ public Builder addReturnReferences(String... names)
{
Select.Builder builder = new Select.Builder();
for (String name : names)
builder.withSelection(ref(name));
addReturn(builder.build());
+ return this;
}
private Reference ref(String name)
@@ -229,14 +238,16 @@ public class Txn implements Statement
return builder.build();
}
- public void addIf(If block)
+ public Builder addIf(If block)
{
ifBlock = Optional.of(block);
+ return this;
}
- public void addUpdate(Mutation mutation)
+ public Builder addUpdate(Mutation mutation)
{
this.mutations.add(Objects.requireNonNull(mutation));
+ return this;
}
public Txn build()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]