This is an automated email from the ASF dual-hosted git repository.
konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new eed4fbc8f7 Reduce heap memory allocations in different places along
the hot write path Avoid iterator allocations if possible Handle typical cases
(such as a single row, single table writes) more efficiently Add fast paths for
typical scenarios (like absense of views and triggers) Memorize things which
can be computed once
eed4fbc8f7 is described below
commit eed4fbc8f7789424d670c42a60f12150c48cd492
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Tue Mar 4 22:33:14 2025 +0300
Reduce heap memory allocations in different places along the hot write path
Avoid iterator allocations if possible
Handle typical cases (such as a single row, single table writes) more
efficiently
Add fast paths for typical scenarios (like absense of views and triggers)
Memorize things which can be computed once
Patch by Dmitry Konstantinov; reviewed by Chris Lohfink, Michael Semb
Wever, Vladimir Sitnikov for CASSANDRA-20167
---
CHANGES.txt | 1 +
.../org/apache/cassandra/concurrent/SEPWorker.java | 4 +-
src/java/org/apache/cassandra/cql3/Attributes.java | 4 +-
.../apache/cassandra/cql3/BatchQueryOptions.java | 6 +-
.../org/apache/cassandra/cql3/CQLStatement.java | 6 +-
.../org/apache/cassandra/cql3/QueryOptions.java | 139 +++++++++++----------
.../apache/cassandra/cql3/UpdateParameters.java | 4 +-
.../cassandra/cql3/VariableSpecifications.java | 15 +++
.../cassandra/cql3/statements/BatchStatement.java | 11 +-
.../cql3/statements/DescribeStatement.java | 4 +-
.../cql3/statements/ModificationStatement.java | 47 +++++--
.../cassandra/cql3/statements/SelectStatement.java | 5 +-
...ingleTableSinglePartitionUpdatesCollector.java} | 61 ++++-----
.../statements/SingleTableUpdatesCollector.java | 34 +++--
src/java/org/apache/cassandra/db/Keyspace.java | 9 +-
src/java/org/apache/cassandra/db/Mutation.java | 11 +-
.../cassandra/db/RegularAndStaticColumns.java | 16 ++-
.../apache/cassandra/db/filter/ColumnFilter.java | 9 ++
.../cassandra/db/partitions/PartitionUpdate.java | 55 ++++++--
.../org/apache/cassandra/db/rows/BTreeRow.java | 19 +++
.../apache/cassandra/db/rows/EncodingStats.java | 7 ++
src/java/org/apache/cassandra/db/view/View.java | 2 +
.../org/apache/cassandra/db/view/ViewManager.java | 17 ++-
.../cassandra/index/SecondaryIndexManager.java | 2 +
.../io/sstable/format/SortedTableWriter.java | 16 ++-
.../apache/cassandra/io/util/DataOutputBuffer.java | 12 ++
.../apache/cassandra/locator/ReplicaLayout.java | 2 +
.../service/AbstractWriteResponseHandler.java | 21 +++-
.../org/apache/cassandra/service/ClientWarn.java | 10 +-
.../org/apache/cassandra/service/StorageProxy.java | 29 +++--
.../cassandra/transport/CQLMessageHandler.java | 9 +-
.../org/apache/cassandra/transport/Envelope.java | 77 +++++++-----
.../org/apache/cassandra/transport/Flusher.java | 37 ++++--
.../org/apache/cassandra/transport/Message.java | 22 ++--
.../cassandra/cql3/QueryOptionsFlagsTest.java | 70 +++++++++++
.../cassandra/transport/CQLConnectionTest.java | 2 +-
.../transport/EnvelopeHeaderFlagsTest.java | 62 +++++++++
37 files changed, 616 insertions(+), 241 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 9b84f503c4..f790960a21 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Reduce memory allocations in miscellaneous places along the hot write path
(CASSANDRA-20167)
* Provide keystore_password_file and truststore_password_file options to read
credentials from a file (CASSANDRA-13428)
* Unregistering a node should also remove it from tokenMap if it is there and
recalculate the placements (CASSANDRA-20346)
* Fix PartitionUpdate.isEmpty deserialization issue to avoid potential
EOFException (CASSANDRA-20345)
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java
b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
index 93c01fa7a3..add2ef8b06 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -39,6 +39,7 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work>
implements Runnabl
private static final boolean SET_THREAD_NAME =
SET_SEP_THREAD_NAME.getBoolean();
final Long workerId;
+ final String workerIdThreadSuffix;
final Thread thread;
final SharedExecutorPool pool;
@@ -55,6 +56,7 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work>
implements Runnabl
{
this.pool = pool;
this.workerId = workerId;
+ this.workerIdThreadSuffix = '-' + workerId.toString();
thread = new FastThreadLocalThread(threadGroup, this,
threadGroup.getName() + "-Worker-" + workerId);
thread.setDaemon(true);
set(initialState);
@@ -122,7 +124,7 @@ final class SEPWorker extends
AtomicReference<SEPWorker.Work> implements Runnabl
if (assigned == null)
continue;
if (SET_THREAD_NAME)
- Thread.currentThread().setName(assigned.name + '-' +
workerId);
+ Thread.currentThread().setName(assigned.name +
workerIdThreadSuffix);
task = assigned.tasks.poll();
currentTask.lazySet(task);
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java
b/src/java/org/apache/cassandra/cql3/Attributes.java
index 492c23672a..493193b400 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -47,12 +47,14 @@ public class Attributes
*/
public static final int MAX_TTL = 20 * 365 * 24 * 60 * 60; // 20 years in
seconds
+ private static final Attributes NONE = new Attributes(null, null);
+
private final Term timestamp;
private final Term timeToLive;
public static Attributes none()
{
- return new Attributes(null, null);
+ return NONE;
}
private Attributes(Term timestamp, Term timeToLive)
diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
index 63be061e5e..dc38c4d0ce 100644
--- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import com.google.common.collect.ImmutableList;
+
import org.apache.cassandra.utils.MD5Digest;
import org.apache.cassandra.db.ConsistencyLevel;
@@ -54,7 +56,7 @@ public abstract class BatchQueryOptions
public abstract QueryOptions forStatement(int i);
- public void prepareStatement(int i, List<ColumnSpecification> boundNames)
+ public void prepareStatement(int i, ImmutableList<ColumnSpecification>
boundNames)
{
forStatement(i).prepare(boundNames);
}
@@ -128,7 +130,7 @@ public abstract class BatchQueryOptions
}
@Override
- public void prepareStatement(int i, List<ColumnSpecification>
boundNames)
+ public void prepareStatement(int i, ImmutableList<ColumnSpecification>
boundNames)
{
if (isPreparedStatement(i))
{
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java
b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index badf9c3428..349e79b30f 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.cql3;
import java.util.Collections;
import java.util.List;
+import com.google.common.collect.ImmutableList;
+
import org.apache.cassandra.audit.AuditLogContext;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.service.ClientState;
@@ -32,9 +34,9 @@ public interface CQLStatement
/**
* Returns all bind variables for the statement
*/
- default List<ColumnSpecification> getBindVariables()
+ default ImmutableList<ColumnSpecification> getBindVariables()
{
- return Collections.emptyList();
+ return ImmutableList.of();
}
/**
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java
b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index d3093cbb8e..c4e6e33b25 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -110,7 +110,7 @@ public abstract class QueryOptions
version);
}
- public static QueryOptions addColumnSpecifications(QueryOptions options,
List<ColumnSpecification> columnSpecs)
+ public static QueryOptions addColumnSpecifications(QueryOptions options,
ImmutableList<ColumnSpecification> columnSpecs)
{
return new OptionsWithColumnSpecifications(options, columnSpecs);
}
@@ -476,10 +476,10 @@ public abstract class QueryOptions
{
private final ImmutableList<ColumnSpecification> columnSpecs;
- OptionsWithColumnSpecifications(QueryOptions wrapped,
List<ColumnSpecification> columnSpecs)
+ OptionsWithColumnSpecifications(QueryOptions wrapped,
ImmutableList<ColumnSpecification> columnSpecs)
{
super(wrapped);
- this.columnSpecs = ImmutableList.copyOf(columnSpecs);
+ this.columnSpecs = columnSpecs;
}
@Override
@@ -568,9 +568,9 @@ public abstract class QueryOptions
}
}
- private static class Codec implements CBCodec<QueryOptions>
+ static class Codec implements CBCodec<QueryOptions>
{
- private enum Flag
+ enum Flag
{
// The order of that enum matters!!
VALUES,
@@ -583,40 +583,53 @@ public abstract class QueryOptions
KEYSPACE,
NOW_IN_SECONDS;
- private static final Flag[] ALL_VALUES = values();
+ private final int mask;
- public static EnumSet<Flag> deserialize(int flags)
+ Flag()
{
- EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
- for (int n = 0; n < ALL_VALUES.length; n++)
- {
- if ((flags & (1 << n)) != 0)
- set.add(ALL_VALUES[n]);
- }
- return set;
+ this.mask = 1 << this.ordinal();
+ }
+
+ public static int none()
+ {
+ return 0;
+ }
+
+ public static boolean isEmpty(int flags)
+ {
+ return flags == 0;
+ }
+
+ public static int add(int flags, Flag flagToAdd)
+ {
+ flags |= flagToAdd.mask;
+ return flags;
+ }
+
+ public static int remove(int flags, Flag flagToRemove)
+ {
+ flags &= ~ flagToRemove.mask;
+ return flags;
}
- public static int serialize(EnumSet<Flag> flags)
+ public static boolean contains(long flags, Flag flag)
{
- int i = 0;
- for (Flag flag : flags)
- i |= 1 << flag.ordinal();
- return i;
+ return (flags & flag.mask) != 0;
}
}
public QueryOptions decode(ByteBuf body, ProtocolVersion version)
{
ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
- EnumSet<Flag> flags =
Flag.deserialize(version.isGreaterOrEqualTo(ProtocolVersion.V5)
- ?
(int)body.readUnsignedInt()
- :
(int)body.readUnsignedByte());
+ int flags = version.isGreaterOrEqualTo(ProtocolVersion.V5)
+ ? (int)body.readUnsignedInt()
+ : (int)body.readUnsignedByte();
List<ByteBuffer> values = Collections.<ByteBuffer>emptyList();
List<String> names = null;
- if (flags.contains(Flag.VALUES))
+ if (Flag.contains(flags, Flag.VALUES))
{
- if (flags.contains(Flag.NAMES_FOR_VALUES))
+ if (Flag.contains(flags, Flag.NAMES_FOR_VALUES))
{
Pair<List<String>, List<ByteBuffer>> namesAndValues =
CBUtil.readNameAndValueList(body, version);
names = namesAndValues.left;
@@ -628,27 +641,27 @@ public abstract class QueryOptions
}
}
- boolean skipMetadata = flags.contains(Flag.SKIP_METADATA);
- flags.remove(Flag.VALUES);
- flags.remove(Flag.SKIP_METADATA);
+ boolean skipMetadata = Flag.contains(flags, Flag.SKIP_METADATA);
+ flags = Flag.remove(flags, Flag.VALUES);
+ flags = Flag.remove(flags, Flag.SKIP_METADATA);
SpecificOptions options = SpecificOptions.DEFAULT;
- if (!flags.isEmpty())
+ if (!Flag.isEmpty(flags))
{
- int pageSize = flags.contains(Flag.PAGE_SIZE) ? body.readInt()
: -1;
- PagingState pagingState = flags.contains(Flag.PAGING_STATE) ?
PagingState.deserialize(CBUtil.readValueNoCopy(body), version) : null;
- ConsistencyLevel serialConsistency =
flags.contains(Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) :
ConsistencyLevel.SERIAL;
+ int pageSize = Flag.contains(flags, Flag.PAGE_SIZE) ?
body.readInt() : -1;
+ PagingState pagingState = Flag.contains(flags,
Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValueNoCopy(body),
version) : null;
+ ConsistencyLevel serialConsistency = Flag.contains(flags,
Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) :
ConsistencyLevel.SERIAL;
long timestamp = Long.MIN_VALUE;
- if (flags.contains(Flag.TIMESTAMP))
+ if (Flag.contains(flags, Flag.TIMESTAMP))
{
long ts = body.readLong();
if (ts == Long.MIN_VALUE)
throw new ProtocolException(String.format("Out of
bound timestamp, must be in [%d, %d] (got %d)", Long.MIN_VALUE + 1,
Long.MAX_VALUE, ts));
timestamp = ts;
}
- String keyspace = flags.contains(Flag.KEYSPACE) ?
CBUtil.readString(body) : null;
- long nowInSeconds = flags.contains(Flag.NOW_IN_SECONDS) ?
CassandraUInt.toLong(body.readInt())
- :
UNSET_NOWINSEC;
+ String keyspace = Flag.contains(flags, Flag.KEYSPACE) ?
CBUtil.readString(body) : null;
+ long nowInSeconds = Flag.contains(flags, Flag.NOW_IN_SECONDS)
? CassandraUInt.toLong(body.readInt())
+
: UNSET_NOWINSEC;
options = new SpecificOptions(pageSize, pagingState,
serialConsistency, timestamp, keyspace, nowInSeconds);
}
@@ -660,25 +673,25 @@ public abstract class QueryOptions
{
CBUtil.writeConsistencyLevel(options.getConsistency(), dest);
- EnumSet<Flag> flags = gatherFlags(options, version);
+ int flags = gatherFlags(options, version);
if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
- dest.writeInt(Flag.serialize(flags));
+ dest.writeInt(flags);
else
- dest.writeByte((byte)Flag.serialize(flags));
+ dest.writeByte((byte) flags);
- if (flags.contains(Flag.VALUES))
+ if (Flag.contains(flags, Flag.VALUES))
CBUtil.writeValueList(options.getValues(), dest);
- if (flags.contains(Flag.PAGE_SIZE))
+ if (Flag.contains(flags, Flag.PAGE_SIZE))
dest.writeInt(options.getPageSize());
- if (flags.contains(Flag.PAGING_STATE))
+ if (Flag.contains(flags, Flag.PAGING_STATE))
CBUtil.writeValue(options.getPagingState().serialize(version),
dest);
- if (flags.contains(Flag.SERIAL_CONSISTENCY))
+ if (Flag.contains(flags, Flag.SERIAL_CONSISTENCY))
CBUtil.writeConsistencyLevel(options.getSerialConsistency(),
dest);
- if (flags.contains(Flag.TIMESTAMP))
+ if (Flag.contains(flags, Flag.TIMESTAMP))
dest.writeLong(options.getSpecificOptions().timestamp);
- if (flags.contains(Flag.KEYSPACE))
+ if (Flag.contains(flags, Flag.KEYSPACE))
CBUtil.writeAsciiString(options.getSpecificOptions().keyspace,
dest);
- if (flags.contains(Flag.NOW_IN_SECONDS))
+ if (Flag.contains(flags, Flag.NOW_IN_SECONDS))
dest.writeInt(CassandraUInt.fromLong(options.getSpecificOptions().nowInSeconds));
// Note that we don't really have to bother with NAMES_FOR_VALUES
server side,
@@ -692,49 +705,49 @@ public abstract class QueryOptions
size += CBUtil.sizeOfConsistencyLevel(options.getConsistency());
- EnumSet<Flag> flags = gatherFlags(options, version);
+ int flags = gatherFlags(options, version);
size += (version.isGreaterOrEqualTo(ProtocolVersion.V5) ? 4 : 1);
- if (flags.contains(Flag.VALUES))
+ if (Flag.contains(flags, Flag.VALUES))
size += CBUtil.sizeOfValueList(options.getValues());
- if (flags.contains(Flag.PAGE_SIZE))
+ if (Flag.contains(flags, Flag.PAGE_SIZE))
size += 4;
- if (flags.contains(Flag.PAGING_STATE))
+ if (Flag.contains(flags, Flag.PAGING_STATE))
size +=
CBUtil.sizeOfValue(options.getPagingState().serializedSize(version));
- if (flags.contains(Flag.SERIAL_CONSISTENCY))
+ if (Flag.contains(flags, Flag.SERIAL_CONSISTENCY))
size +=
CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
- if (flags.contains(Flag.TIMESTAMP))
+ if (Flag.contains(flags, Flag.TIMESTAMP))
size += 8;
- if (flags.contains(Flag.KEYSPACE))
+ if (Flag.contains(flags, Flag.KEYSPACE))
size +=
CBUtil.sizeOfAsciiString(options.getSpecificOptions().keyspace);
- if (flags.contains(Flag.NOW_IN_SECONDS))
+ if (Flag.contains(flags, Flag.NOW_IN_SECONDS))
size += 4;
return size;
}
- private EnumSet<Flag> gatherFlags(QueryOptions options,
ProtocolVersion version)
+ private int gatherFlags(QueryOptions options, ProtocolVersion version)
{
- EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
+ int flags = Flag.none();
if (options.getValues().size() > 0)
- flags.add(Flag.VALUES);
+ flags = Flag.add(flags, Flag.VALUES);
if (options.skipMetadata())
- flags.add(Flag.SKIP_METADATA);
+ flags = Flag.add(flags, Flag.SKIP_METADATA);
if (options.getPageSize() >= 0)
- flags.add(Flag.PAGE_SIZE);
+ flags = Flag.add(flags, Flag.PAGE_SIZE);
if (options.getPagingState() != null)
- flags.add(Flag.PAGING_STATE);
+ flags = Flag.add(flags, Flag.PAGING_STATE);
if (options.getSerialConsistency() != ConsistencyLevel.SERIAL)
- flags.add(Flag.SERIAL_CONSISTENCY);
+ flags = Flag.add(flags, Flag.SERIAL_CONSISTENCY);
if (options.getSpecificOptions().timestamp != Long.MIN_VALUE)
- flags.add(Flag.TIMESTAMP);
+ flags = Flag.add(flags, Flag.TIMESTAMP);
if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
{
if (options.getSpecificOptions().keyspace != null)
- flags.add(Flag.KEYSPACE);
+ flags = Flag.add(flags, Flag.KEYSPACE);
if (options.getSpecificOptions().nowInSeconds !=
UNSET_NOWINSEC)
- flags.add(Flag.NOW_IN_SECONDS);
+ flags = Flag.add(flags, Flag.NOW_IN_SECONDS);
}
return flags;
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 51df331b8d..d13d0b49a7 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -103,13 +103,13 @@ public class UpdateParameters
if (clustering == Clustering.STATIC_CLUSTERING)
{
if (staticBuilder == null)
- staticBuilder = BTreeRow.unsortedBuilder();
+ staticBuilder = BTreeRow.pooledUnsortedBuilder();
builder = staticBuilder;
}
else
{
if (regularBuilder == null)
- regularBuilder = BTreeRow.unsortedBuilder();
+ regularBuilder = BTreeRow.pooledUnsortedBuilder();
builder = regularBuilder;
}
diff --git a/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
b/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
index e58290eba9..504859cac4 100644
--- a/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
+++ b/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
@@ -21,6 +21,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import com.google.common.collect.ImmutableList;
+
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
@@ -28,6 +30,7 @@ public class VariableSpecifications
{
private final List<ColumnIdentifier> variableNames;
private final List<ColumnSpecification> specs;
+ private volatile ImmutableList<ColumnSpecification> immutableSpecs;
private final ColumnMetadata[] targetColumns;
public VariableSpecifications(List<ColumnIdentifier> variableNames)
@@ -56,6 +59,17 @@ public class VariableSpecifications
return specs;
}
+ public ImmutableList<ColumnSpecification> getImmutableBindVariables()
+ {
+ ImmutableList<ColumnSpecification> result = immutableSpecs;
+ if (result == null) // strong syncrhronization is not needed, it is ok
if sometimes we create several immutable lists
+ {
+ result = ImmutableList.copyOf(specs);
+ immutableSpecs = result;
+ }
+ return result;
+ }
+
/**
* Returns an array with the same length as the number of partition key
columns for the table corresponding
* to table. Each short in the array represents the bind index of the
marker that holds the value for that
@@ -87,6 +101,7 @@ public class VariableSpecifications
public void add(int bindIndex, ColumnSpecification spec)
{
+ assert immutableSpecs == null : "bind variable specs cannot be
modified once we started to use them";
if (spec instanceof ColumnMetadata)
targetColumns[bindIndex] = (ColumnMetadata) spec;
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index e5104376ce..bfec675464 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
@@ -137,9 +138,9 @@ public class BatchStatement implements CQLStatement
}
@Override
- public List<ColumnSpecification> getBindVariables()
+ public ImmutableList<ColumnSpecification> getBindVariables()
{
- return bindVariables.getBindVariables();
+ return bindVariables.getImmutableBindVariables();
}
@Override
@@ -419,8 +420,10 @@ public class BatchStatement implements CQLStatement
throw new InvalidRequestException("Invalid empty serial
consistency level");
ClientState clientState = queryState.getClientState();
-
Guardrails.writeConsistencyLevels.guard(EnumSet.of(options.getConsistency(),
options.getSerialConsistency()),
- clientState);
+ if (Guardrails.writeConsistencyLevels.enabled(clientState)) // to
avoid EnumSet allocation
+
Guardrails.writeConsistencyLevels.guard(EnumSet.of(options.getConsistency(),
+
options.getSerialConsistency()),
+ clientState);
for (int i = 0; i < statements.size(); i++ )
statements.get(i).validateDiskUsage(options.forStatement(i),
clientState);
diff --git
a/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
index 360d34b293..64b7862d6d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
@@ -107,9 +107,9 @@ public abstract class DescribeStatement<T> extends
CQLStatement.Raw implements C
return this;
}
- public final List<ColumnSpecification> getBindVariables()
+ public final ImmutableList<ColumnSpecification> getBindVariables()
{
- return Collections.emptyList();
+ return ImmutableList.of();
}
@Override
diff --git
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 7698c0b59c..ce9da9a538 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,6 +104,8 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
private final RegularAndStaticColumns requiresRead;
+ private final List<Function> functions;
+
public ModificationStatement(StatementType type,
VariableSpecifications bindVariables,
TableMetadata metadata,
@@ -179,12 +182,13 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
this.conditionColumns = conditionColumnsBuilder.build();
this.requiresRead = requiresReadBuilder.build();
+ this.functions = findAllFunctions();
}
@Override
- public List<ColumnSpecification> getBindVariables()
+ public ImmutableList<ColumnSpecification> getBindVariables()
{
- return bindVariables.getBindVariables();
+ return bindVariables.getImmutableBindVariables();
}
@Override
@@ -195,9 +199,19 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
@Override
public Iterable<Function> getFunctions()
+ {
+ return functions;
+ }
+
+ private List<Function> findAllFunctions()
{
List<Function> functions = new ArrayList<>();
addFunctionsTo(functions);
+ if (functions.isEmpty())
+ {
+ functions = Collections.emptyList(); // to avoid a new Iterator
object creation during each authorization
+ }
+
return functions;
}
@@ -518,8 +532,9 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
if (options.getConsistency() == null)
throw new InvalidRequestException("Invalid empty consistency
level");
-
Guardrails.writeConsistencyLevels.guard(EnumSet.of(options.getConsistency(),
options.getSerialConsistency()),
- queryState.getClientState());
+ if
(Guardrails.writeConsistencyLevels.enabled(queryState.getClientState())) // to
avoid EnumSet allocation
+
Guardrails.writeConsistencyLevels.guard(EnumSet.of(options.getConsistency(),
options.getSerialConsistency()),
+
queryState.getClientState());
return hasConditions()
? executeWithCondition(queryState, options, requestTime)
@@ -768,10 +783,18 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
Dispatcher.RequestTime
requestTime)
{
List<ByteBuffer> keys = buildPartitionKeyNames(options, state);
- HashMultiset<ByteBuffer> perPartitionKeyCounts =
HashMultiset.create(keys);
- SingleTableUpdatesCollector collector = new
SingleTableUpdatesCollector(metadata, updatedColumns, perPartitionKeyCounts);
- addUpdates(collector, keys, state, options, local, timestamp,
nowInSeconds, requestTime);
- return collector.toMutations(state);
+ if(keys.size() == 1)
+ {
+ SingleTableSinglePartitionUpdatesCollector collector = new
SingleTableSinglePartitionUpdatesCollector(metadata, updatedColumns);
+ addUpdates(collector, keys, state, options, local, timestamp,
nowInSeconds, requestTime);
+ return collector.toMutations(state);
+ } else
+ {
+ HashMultiset<ByteBuffer> perPartitionKeyCounts =
HashMultiset.create(keys);
+ SingleTableUpdatesCollector collector = new
SingleTableUpdatesCollector(metadata, updatedColumns, perPartitionKeyCounts);
+ addUpdates(collector, keys, state, options, local, timestamp,
nowInSeconds, requestTime);
+ return collector.toMutations(state);
+ }
}
final void addUpdates(UpdatesCollector collector,
@@ -807,8 +830,12 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
PartitionUpdate.Builder updateBuilder =
collector.getPartitionUpdateBuilder(metadata(), dk, options.getConsistency());
- for (Slice slice : slices)
- addUpdateForKey(updateBuilder, slice, params);
+ if (slices == Slices.ALL) // to avoid Slices iterator
allocation for a common case
+ addUpdateForKey(updateBuilder, Slice.ALL, params);
+ else
+ for (Slice slice : slices)
+ addUpdateForKey(updateBuilder, slice, params);
+
}
}
else
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 1762b776a7..209ba88f52 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -27,6 +27,7 @@ import javax.annotation.concurrent.ThreadSafe;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@@ -176,9 +177,9 @@ public class SelectStatement implements
CQLStatement.SingleKeyspaceCqlStatement
}
@Override
- public List<ColumnSpecification> getBindVariables()
+ public ImmutableList<ColumnSpecification> getBindVariables()
{
- return bindVariables.getBindVariables();
+ return bindVariables.getImmutableBindVariables();
}
@Override
diff --git
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
b/src/java/org/apache/cassandra/cql3/statements/SingleTableSinglePartitionUpdatesCollector.java
similarity index 60%
copy from
src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
copy to
src/java/org/apache/cassandra/cql3/statements/SingleTableSinglePartitionUpdatesCollector.java
index 5ff299eb88..c650ef0370 100644
---
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
+++
b/src/java/org/apache/cassandra/cql3/statements/SingleTableSinglePartitionUpdatesCollector.java
@@ -17,13 +17,8 @@
*/
package org.apache.cassandra.cql3.statements;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Maps;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.CounterMutation;
@@ -41,7 +36,7 @@ import org.apache.cassandra.service.ClientState;
/**
* Utility class to collect updates.
*/
-final class SingleTableUpdatesCollector implements UpdatesCollector
+final class SingleTableSinglePartitionUpdatesCollector implements
UpdatesCollector
{
/**
* the table to be updated
@@ -52,67 +47,59 @@ final class SingleTableUpdatesCollector implements
UpdatesCollector
* the columns to update
*/
private final RegularAndStaticColumns updatedColumns;
-
- /**
- * The number of updated rows per key.
- */
- private final HashMultiset<ByteBuffer> perPartitionKeyCounts;
-
/**
* the partition update builders per key
*/
- private final Map<ByteBuffer, PartitionUpdate.Builder> puBuilders;
+ private PartitionUpdate.Builder builder;
/**
* if it is a counter table, we will set this
*/
private ConsistencyLevel counterConsistencyLevel = null;
- SingleTableUpdatesCollector(TableMetadata metadata,
RegularAndStaticColumns updatedColumns, HashMultiset<ByteBuffer>
perPartitionKeyCounts)
+ SingleTableSinglePartitionUpdatesCollector(TableMetadata metadata,
RegularAndStaticColumns updatedColumns)
{
this.metadata = metadata;
this.updatedColumns = updatedColumns;
- this.perPartitionKeyCounts = perPartitionKeyCounts;
- this.puBuilders =
Maps.newHashMapWithExpectedSize(perPartitionKeyCounts.size());
}
public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata
metadata, DecoratedKey dk, ConsistencyLevel consistency)
{
if (metadata.isCounter())
counterConsistencyLevel = consistency;
- PartitionUpdate.Builder builder = puBuilders.get(dk.getKey());
if (builder == null)
{
- builder = new PartitionUpdate.Builder(metadata, dk,
updatedColumns, perPartitionKeyCounts.count(dk.getKey()));
- puBuilders.put(dk.getKey(), builder);
+ builder = new PartitionUpdate.Builder(metadata, dk,
updatedColumns, 1);
}
return builder;
}
/**
* Returns a collection containing all the mutations.
- * @return a collection containing all the mutations.
*/
@Override
public List<IMutation> toMutations(ClientState state)
{
- List<IMutation> ms = new ArrayList<>(puBuilders.size());
- for (PartitionUpdate.Builder builder : puBuilders.values())
- {
- IMutation mutation;
+ // it is possible that a modification statement does not create any
mutations
+ // for example: DELETE FROM some_table WHERE part_key = 1 AND
clust_key < 3 AND clust_key > 5
+ if (builder == null)
+ return Collections.emptyList();
+ return Collections.singletonList(createMutation(state, builder));
+ }
- if (metadata.isVirtual())
- mutation = new VirtualMutation(builder.build());
- else if (metadata.isCounter())
- mutation = new CounterMutation(new Mutation(builder.build()),
counterConsistencyLevel);
- else
- mutation = new Mutation(builder.build());
+ private IMutation createMutation(ClientState state,
PartitionUpdate.Builder builder)
+ {
+ IMutation mutation;
- mutation.validateIndexedColumns(state);
- mutation.validateSize(MessagingService.current_version,
CommitLogSegment.ENTRY_OVERHEAD_SIZE);
- ms.add(mutation);
- }
+ if (metadata.isVirtual())
+ mutation = new VirtualMutation(builder.build());
+ else if (metadata.isCounter())
+ mutation = new CounterMutation(new Mutation(builder.build()),
counterConsistencyLevel);
+ else
+ mutation = new Mutation(builder.build());
- return ms;
+ mutation.validateIndexedColumns(state);
+ mutation.validateSize(MessagingService.current_version,
CommitLogSegment.ENTRY_OVERHEAD_SIZE);
+ return mutation;
}
-}
+}
\ No newline at end of file
diff --git
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
index 5ff299eb88..2da6b89180 100644
---
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
+++
b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -96,23 +97,34 @@ final class SingleTableUpdatesCollector implements
UpdatesCollector
@Override
public List<IMutation> toMutations(ClientState state)
{
+ if (puBuilders.size() == 1)
+ {
+ PartitionUpdate.Builder builder =
puBuilders.values().iterator().next();
+ return Collections.singletonList(createMutation(state, builder));
+ }
List<IMutation> ms = new ArrayList<>(puBuilders.size());
for (PartitionUpdate.Builder builder : puBuilders.values())
{
- IMutation mutation;
-
- if (metadata.isVirtual())
- mutation = new VirtualMutation(builder.build());
- else if (metadata.isCounter())
- mutation = new CounterMutation(new Mutation(builder.build()),
counterConsistencyLevel);
- else
- mutation = new Mutation(builder.build());
-
- mutation.validateIndexedColumns(state);
- mutation.validateSize(MessagingService.current_version,
CommitLogSegment.ENTRY_OVERHEAD_SIZE);
+ IMutation mutation = createMutation(state, builder);
ms.add(mutation);
}
return ms;
}
+
+ private IMutation createMutation(ClientState state,
PartitionUpdate.Builder builder)
+ {
+ IMutation mutation;
+
+ if (metadata.isVirtual())
+ mutation = new VirtualMutation(builder.build());
+ else if (metadata.isCounter())
+ mutation = new CounterMutation(new Mutation(builder.build()),
counterConsistencyLevel);
+ else
+ mutation = new Mutation(builder.build());
+
+ mutation.validateIndexedColumns(state);
+ mutation.validateSize(MessagingService.current_version,
CommitLogSegment.ENTRY_OVERHEAD_SIZE);
+ return mutation;
+ }
}
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java
b/src/java/org/apache/cassandra/db/Keyspace.java
index af651570bb..f731139e44 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -453,11 +453,11 @@ public class Keyspace
Lock[] locks = null;
- boolean requiresViewUpdate = updateIndexes &&
viewManager.updatesAffectView(Collections.singleton(mutation), false);
+ boolean requiresViewUpdate = updateIndexes &&
viewManager.updatesAffectView(mutation, false);
if (requiresViewUpdate)
{
- mutation.viewLockAcquireStart.compareAndSet(0L,
currentTimeMillis());
+ Mutation.viewLockAcquireStartUpdater.compareAndSet(mutation, 0L,
currentTimeMillis());
// the order of lock acquisition doesn't matter (from a deadlock
perspective) because we only use tryLock()
Collection<TableId> tableIds = mutation.getTableIds();
@@ -534,7 +534,7 @@ public class Keyspace
}
}
- long acquireTime = currentTimeMillis() -
mutation.viewLockAcquireStart.get();
+ long acquireTime = currentTimeMillis() -
Mutation.viewLockAcquireStartUpdater.get(mutation);
// Metrics are only collected for droppable write operations
// Bulk non-droppable operations (e.g. commitlog replay, hint
delivery) are not measured
if (isDroppable)
@@ -553,10 +553,11 @@ public class Keyspace
logger.error("Attempting to mutate non-existant table {}
({}.{})", upd.metadata().id, upd.metadata().keyspace, upd.metadata().name);
continue;
}
- AtomicLong baseComplete = new AtomicLong(Long.MAX_VALUE);
+ AtomicLong baseComplete = null;
if (requiresViewUpdate)
{
+ baseComplete = new AtomicLong(Long.MAX_VALUE);
try
{
Tracing.trace("Creating materialized view mutations
from base table replica");
diff --git a/src/java/org/apache/cassandra/db/Mutation.java
b/src/java/org/apache/cassandra/db/Mutation.java
index 384f2b4972..0861bb64c4 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Supplier;
import com.google.common.base.Preconditions;
@@ -68,7 +68,10 @@ public class Mutation implements IMutation,
Supplier<Mutation>
// Time at which this mutation or the builder that built it was
instantiated
final long approxCreatedAtNanos;
// keep track of when mutation has started waiting for a MV partition lock
- final AtomicLong viewLockAcquireStart = new AtomicLong(0);
+
+ final static AtomicLongFieldUpdater<Mutation> viewLockAcquireStartUpdater =
+ AtomicLongFieldUpdater.newUpdater(Mutation.class,
"viewLockAcquireStart");
+ volatile long viewLockAcquireStart;
private final boolean cdcEnabled;
@@ -458,7 +461,7 @@ public class Mutation implements IMutation,
Supplier<Mutation>
try (DataOutputBuffer dob =
DataOutputBuffer.scratchBuffer.get())
{
serializeInternal(PartitionUpdate.serializer,
mutation, dob, version);
- serialization = new
CachedSerialization(dob.toByteArray());
+ serialization = new
CachedSerialization(dob.unsafeToByteArray());
}
catch (IOException e)
{
@@ -521,7 +524,7 @@ public class Mutation implements IMutation,
Supplier<Mutation>
//Only cache serializations that don't hit the limit
if (!teeIn.isLimitReached())
-
m.cachedSerializations[MessagingService.getVersionOrdinal(version)] = new
CachedSerialization(dob.toByteArray());
+
m.cachedSerializations[MessagingService.getVersionOrdinal(version)] = new
CachedSerialization(dob.unsafeToByteArray());
return m;
}
diff --git a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
index b6da183d01..4daea2f4f7 100644
--- a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
+++ b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
@@ -153,6 +153,8 @@ public class RegularAndStaticColumns implements
Iterable<ColumnMetadata>
private BTree.Builder<ColumnMetadata> regularColumns;
private BTree.Builder<ColumnMetadata> staticColumns;
+ private RegularAndStaticColumns lastAddedColumns;
+
public Builder add(ColumnMetadata c)
{
if (c.isStatic())
@@ -180,18 +182,24 @@ public class RegularAndStaticColumns implements
Iterable<ColumnMetadata>
public Builder addAll(RegularAndStaticColumns columns)
{
+ // for batch statements it is a frequent case when we have the
same columns in each inner prepared statement
+ // we use == instead of uquals to make the optimization check cheap
+ if (lastAddedColumns != null && lastAddedColumns == columns) {
+ return this;
+ }
if (regularColumns == null && !columns.regulars.isEmpty())
regularColumns = BTree.builder(naturalOrder());
- for (ColumnMetadata c : columns.regulars)
- regularColumns.add(c);
+ if (!columns.regulars.isEmpty())
+ regularColumns.addAll(columns.regulars);
if (staticColumns == null && !columns.statics.isEmpty())
staticColumns = BTree.builder(naturalOrder());
- for (ColumnMetadata c : columns.statics)
- staticColumns.add(c);
+ if (!columns.statics.isEmpty())
+ staticColumns.addAll(columns.statics);
+ lastAddedColumns = columns;
return this;
}
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index 90fc9f3a11..ae043039e2 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -180,6 +180,15 @@ public abstract class ColumnFilter
return new WildCardColumnFilter(metadata.regularAndStaticColumns());
}
+ /**
+ * A filter for a PartitionUpdate entity
+ * which we've just constructed and there no a real need to filter it
+ */
+ public static ColumnFilter all(RegularAndStaticColumns columns)
+ {
+ return new WildCardColumnFilter(columns);
+ }
+
/**
* A filter that only fetches/queries the provided columns.
* <p>
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index ff5d0f9035..00f26451c1 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -223,6 +223,18 @@ public class PartitionUpdate extends AbstractBTreePartition
return new PartitionUpdate(iterator.metadata(),
iterator.metadata().epoch, iterator.partitionKey(), holder, deletionInfo,
false);
}
+ /**
+ * An override of default AbstractBTreePartition iterator
+ * It is added as a performance optimization to avoid full-functional
filtering
+ * using org.apache.cassandra.db.Columns.inOrderInclusionTester() predicate
+ * when we iterate over row within a PartitionUpdate
+ */
+ @Override
+ public UnfilteredRowIterator unfilteredIterator()
+ {
+ return
unfilteredIterator(ColumnFilter.SelectionColumnFilter.all(columns()),
Slices.ALL, false);
+ }
+
public PartitionUpdate withOnlyPresentColumns()
{
@@ -884,7 +896,11 @@ public class PartitionUpdate extends AbstractBTreePartition
private final MutableDeletionInfo deletionInfo;
private final boolean canHaveShadowedData;
private Object[] tree = BTree.empty();
- private final BTree.Builder<Row> rowBuilder;
+
+ private Row firstRow;
+ private BTree.Builder<Row> rowBuilder;
+
+ private final int initialRowCapacity;
private Row staticRow = Rows.EMPTY_STATIC_ROW;
private final RegularAndStaticColumns columns;
private boolean isBuilt = false;
@@ -920,7 +936,7 @@ public class PartitionUpdate extends AbstractBTreePartition
this.metadata = metadata;
this.key = key;
this.columns = columns;
- this.rowBuilder = rowBuilder(initialRowCapacity);
+ this.initialRowCapacity = initialRowCapacity;
this.canHaveShadowedData = canHaveShadowedData;
this.deletionInfo = deletionInfo.mutableCopy();
this.staticRow = staticRow;
@@ -963,19 +979,25 @@ public class PartitionUpdate extends
AbstractBTreePartition
if (row.isStatic())
{
- // this assert is expensive, and possibly of limited value; we
should consider removing it
- // or introducing a new class of assertions for test purposes
- assert columns().statics.containsAll(row.columns()) :
columns().statics + " is not superset of " + row.columns();
staticRow = staticRow.isEmpty()
? row
: Rows.merge(staticRow, row);
}
else
{
- // this assert is expensive, and possibly of limited value; we
should consider removing it
- // or introducing a new class of assertions for test purposes
- assert columns().regulars.containsAll(row.columns()) :
columns().regulars + " is not superset of " + row.columns();
- rowBuilder.add(row);
+ if (firstRow == null)
+ {
+ firstRow = row;
+ }
+ else
+ {
+ if (rowBuilder == null)
+ {
+ rowBuilder = rowBuilder(initialRowCapacity);
+ rowBuilder.add(firstRow);
+ }
+ rowBuilder.add(row);
+ }
}
}
@@ -999,13 +1021,22 @@ public class PartitionUpdate extends
AbstractBTreePartition
return metadata;
}
+ private static final UpdateFunction<Row, Row> ROWS_MERGE_FUNCTION =
UpdateFunction.Simple.of(Rows::merge);
+
public PartitionUpdate build()
{
// assert that we are not calling build() several times
assert !isBuilt : "A PartitionUpdate.Builder should only get built
once";
- Object[] add = rowBuilder.build();
- Object[] merged = BTree.<Row, Row, Row>update(tree, add,
metadata.comparator,
-
UpdateFunction.Simple.of(Rows::merge));
+ Object[] add;
+ if (rowBuilder == null)
+ {
+ add = firstRow != null ? BTree.singleton(firstRow) :
BTree.empty();
+ }
+ else
+ {
+ add = rowBuilder.build();
+ }
+ Object[] merged = BTree.<Row, Row, Row>update(tree, add,
metadata.comparator, ROWS_MERGE_FUNCTION);
EncodingStats newStats =
EncodingStats.Collector.collect(staticRow, BTree.iterator(merged),
deletionInfo);
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 075a4f67fe..fe44fbde53 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.BTreeSearchIterator;
import org.apache.cassandra.utils.btree.UpdateFunction;
+import org.apache.cassandra.utils.caching.TinyThreadLocalPool;
import org.apache.cassandra.utils.memory.Cloner;
/**
@@ -560,6 +561,17 @@ public class BTreeRow extends AbstractRow
return new Builder(false);
}
+ private static final TinyThreadLocalPool<Builder> POOL = new
TinyThreadLocalPool<>();
+
+ public static Row.Builder pooledUnsortedBuilder() {
+ TinyThreadLocalPool.TinyPool<Builder> pool = POOL.get();
+ Builder builder = pool.poll();
+ if (builder == null)
+ builder = new Builder(false);
+ builder.pool = pool;
+ return builder;
+ }
+
// This is only used by PartitionUpdate.CounterMark but other uses should
be avoided as much as possible as it breaks our general
// assumption that Row objects are immutable. This method should go away
post-#6506 in particular.
// This method is in particular not exposed by the Row API on purpose.
@@ -818,6 +830,8 @@ public class BTreeRow extends AbstractRow
// For complex column at index i of 'columns', we store at
complexDeletions[i] its complex deletion.
+ private TinyThreadLocalPool.TinyPool<Builder> pool;
+
protected Builder(boolean isSorted)
{
cells_ = null;
@@ -873,6 +887,11 @@ public class BTreeRow extends AbstractRow
this.deletion = Deletion.LIVE;
this.cells_.reuse();
this.hasComplex = false;
+ if (pool != null)
+ {
+ pool.offer(this);
+ pool = null;
+ }
}
public void addPrimaryKeyLivenessInfo(LivenessInfo info)
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index d0f788ae5a..0acc67815b 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -109,6 +109,13 @@ public class EncodingStats implements IMeasurableMemory
? that.minTTL
: (that.minTTL == TTL_EPOCH ? this.minTTL :
Math.min(this.minTTL, that.minTTL));
+ // EncodingStats is immutable, so if the result feilds are the same as
in the current object we can avoid new object creation
+ // usually we merge an older object with a newer one and timestamp
usually grows, so chances to reuse the object are high
+ if (this.minTimestamp == minTimestamp
+ && this.minLocalDeletionTime == minDelTime
+ && this.minTTL == minTTL) {
+ return this;
+ }
return new EncodingStats(minTimestamp, minDelTime, minTTL);
}
diff --git a/src/java/org/apache/cassandra/db/view/View.java
b/src/java/org/apache/cassandra/db/view/View.java
index 30bad17b34..e926edb3a9 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -238,6 +238,8 @@ public class View
public static Iterable<ViewMetadata> findAll(String keyspace, String
baseTable)
{
KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspace);
+ if (ksm.views.isEmpty()) // memory optimization, to avoid a capturing
lambda allocation
+ return Collections.emptyList();
return Iterables.filter(ksm.views, view ->
view.baseTableName.equals(baseTable));
}
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java
b/src/java/org/apache/cassandra/db/view/ViewManager.java
index fe0f8236f4..06fdcb4db5 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -68,11 +68,25 @@ public class ViewManager
this.keyspace = keyspace;
}
+ public boolean updatesAffectView(IMutation mutation, boolean
coordinatorBatchlog)
+ {
+ if (!enableCoordinatorBatchlog && coordinatorBatchlog)
+ return false;
+
+ if (viewsByName.isEmpty())
+ return false;
+
+ return updatesAffectView(Collections.singleton(mutation),
coordinatorBatchlog);
+ }
+
public boolean updatesAffectView(Collection<? extends IMutation>
mutations, boolean coordinatorBatchlog)
{
if (!enableCoordinatorBatchlog && coordinatorBatchlog)
return false;
+ if (viewsByName.isEmpty())
+ return false;
+
ClusterMetadata metadata = ClusterMetadata.currentNullable();
for (IMutation mutation : mutations)
{
@@ -83,7 +97,8 @@ public class ViewManager
if (coordinatorBatchlog &&
keyspace.getReplicationStrategy().getReplicationFactor().allReplicas == 1)
continue;
- if (!forTable(update.metadata()).updatedViews(update,
metadata).isEmpty())
+ TableViews tableViews = forTable(update.metadata());
+ if (tableViews.hasViews() && !tableViews.updatedViews(update,
metadata).isEmpty())
return true;
}
}
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index c0123933b8..791293fbb9 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -1326,6 +1326,8 @@ public class SecondaryIndexManager implements
IndexRegistry, INotificationConsum
@Override
public void validate(PartitionUpdate update, ClientState state) throws
InvalidRequestException
{
+ if (indexes.isEmpty())
+ return;
for (Index index : indexes.values())
index.validate(update, state);
}
diff --git
a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
index 4fca9ca181..6fba07ba1e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
@@ -259,7 +259,8 @@ public abstract class SortedTableWriter<P extends
SortedTablePartitionWriter, I
protected void onStartPartition(DecoratedKey key)
{
- notifyObservers(o -> o.startPartition(key,
partitionWriter.getInitialPosition(), partitionWriter.getInitialPosition()));
+ if (hasObservers())
+ notifyObservers(o -> o.startPartition(key,
partitionWriter.getInitialPosition(), partitionWriter.getInitialPosition()));
}
protected void onStaticRow(Row row)
@@ -269,22 +270,29 @@ public abstract class SortedTableWriter<P extends
SortedTablePartitionWriter, I
protected void onRow(Row row)
{
- notifyObservers(o -> o.nextUnfilteredCluster(row));
+ if (hasObservers())
+ notifyObservers(o -> o.nextUnfilteredCluster(row));
}
protected void onRangeTombstoneMarker(RangeTombstoneMarker marker)
{
- notifyObservers(o -> o.nextUnfilteredCluster(marker));
+ if (hasObservers())
+ notifyObservers(o -> o.nextUnfilteredCluster(marker));
}
protected abstract AbstractRowIndexEntry createRowIndexEntry(DecoratedKey
key, DeletionTime partitionLevelDeletion, long finishResult) throws IOException;
protected final void notifyObservers(Consumer<SSTableFlushObserver> action)
{
- if (observers != null && !observers.isEmpty())
+ if (hasObservers())
observers.forEach(action);
}
+ private boolean hasObservers()
+ {
+ return observers != null && !observers.isEmpty();
+ }
+
@Override
public void mark()
{
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index 3e157b6b9a..3cb5db0f00 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -290,6 +290,18 @@ public class DataOutputBuffer extends
BufferedDataOutputStreamPlus
return result;
}
+ /**
+ * If the calling logic knows that no new calls to this object will happen
after calling this
+ * method, then this method can avoid the ByteBuffer copying done in
{@link #buffer()}.
+ */
+ public byte[] unsafeToByteArray()
+ {
+ ByteBuffer buffer = unsafeGetBufferAndFlip();
+ byte[] result = new byte[buffer.remaining()];
+ buffer.get(result);
+ return result;
+ }
+
public String asString()
{
try
diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
index 751737fafb..f0069f2555 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
@@ -301,6 +301,8 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
*/
static <E extends Endpoints<E>> boolean haveWriteConflicts(E natural, E
pending)
{
+ if (pending.isEmpty())
+ return false;
Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
for (InetAddressAndPort pendingEndpoint : pending.endpoints())
{
diff --git
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 2b7342d571..343bac1c4f 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.service;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -77,7 +78,7 @@ public abstract class AbstractWriteResponseHandler<T>
implements RequestCallback
private static final
AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class,
"failures");
private volatile int failures = 0;
- private final Map<InetAddressAndPort, RequestFailureReason>
failureReasonByEndpoint;
+ private volatile Map<InetAddressAndPort, RequestFailureReason>
failureReasonByEndpoint;
private final Dispatcher.RequestTime requestTime;
private @Nullable final Supplier<Mutation> hintOnFailure;
@@ -106,7 +107,6 @@ public abstract class AbstractWriteResponseHandler<T>
implements RequestCallback
this.callback = callback;
this.writeType = writeType;
this.hintOnFailure = hintOnFailure;
- this.failureReasonByEndpoint = new ConcurrentHashMap<>();
this.requestTime = requestTime;
}
@@ -129,12 +129,12 @@ public abstract class AbstractWriteResponseHandler<T>
implements RequestCallback
if (blockFor() + failures > candidateReplicaCount())
{
- if
(RequestCallback.isTimeout(this.failureReasonByEndpoint.keySet().stream()
+ if
(RequestCallback.isTimeout(this.getFailureReasonByEndpointMap().keySet().stream()
.filter(this::waitingFor) // DatacenterWriteResponseHandler filters errors from
remote DCs
-
.collect(Collectors.toMap(Function.identity(),
this.failureReasonByEndpoint::get))))
+
.collect(Collectors.toMap(Function.identity(),
this.getFailureReasonByEndpointMap()::get))))
throwTimeout();
- throw new WriteFailureException(replicaPlan.consistencyLevel(),
ackCount(), blockFor(), writeType, this.failureReasonByEndpoint);
+ throw new WriteFailureException(replicaPlan.consistencyLevel(),
ackCount(), blockFor(), writeType, this.getFailureReasonByEndpointMap());
}
if (replicaPlan.stillAppliesTo(ClusterMetadata.current()))
@@ -303,6 +303,12 @@ public abstract class AbstractWriteResponseHandler<T>
implements RequestCallback
? failuresUpdater.incrementAndGet(this)
: failures;
+ if (failureReasonByEndpoint == null)
+ synchronized (this)
+ {
+ if (failureReasonByEndpoint == null)
+ failureReasonByEndpoint = new ConcurrentHashMap<>();
+ }
failureReasonByEndpoint.put(from, failureReason);
logFailureOrTimeoutToIdealCLDelegate();
@@ -377,4 +383,9 @@ public abstract class AbstractWriteResponseHandler<T>
implements RequestCallback
throw new UncheckedInterruptedException(e);
}
}
+
+ private Map<InetAddressAndPort, RequestFailureReason>
getFailureReasonByEndpointMap()
+ {
+ return failureReasonByEndpoint != null ? failureReasonByEndpoint :
Collections.emptyMap();
+ }
}
diff --git a/src/java/org/apache/cassandra/service/ClientWarn.java
b/src/java/org/apache/cassandra/service/ClientWarn.java
index 13cb21d6b2..de8e84da13 100644
--- a/src/java/org/apache/cassandra/service/ClientWarn.java
+++ b/src/java/org/apache/cassandra/service/ClientWarn.java
@@ -77,7 +77,7 @@ public class ClientWarn extends ExecutorLocals.Impl
public List<String> getWarnings()
{
State state = get();
- if (state == null || state.warnings.isEmpty())
+ if (state == null || state.warnings == null ||
state.warnings.isEmpty())
return null;
return state.warnings;
}
@@ -92,10 +92,16 @@ public class ClientWarn extends ExecutorLocals.Impl
private boolean collecting = true;
// This must be a thread-safe list. Even though it's wrapped in a
ThreadLocal, it's propagated to each thread
// from shared state, so multiple threads can reference the same State.
- private final List<String> warnings = new CopyOnWriteArrayList<>();
+ private volatile List<String> warnings;
private void add(String warning)
{
+ if (warnings == null)
+ synchronized (this) {
+ if (warnings == null) {
+ warnings = new CopyOnWriteArrayList<>();
+ }
+ }
if (collecting && warnings.size() < FBUtilities.MAX_UNSIGNED_SHORT)
warnings.add(maybeTruncate(warning));
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 31e60d68e6..f7219a4b58 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -880,24 +880,25 @@ public class StorageProxy implements StorageProxyMBean
Tracing.trace("Determining replicas for mutation");
final String localDataCenter =
DatabaseDescriptor.getLocator().local().datacenter;
- List<AbstractWriteResponseHandler<IMutation>> responseHandlers = new
ArrayList<>(mutations.size());
+ AbstractWriteResponseHandler<IMutation>[] responseHandlers = new
AbstractWriteResponseHandler[mutations.size()];
WriteType plainWriteType = mutations.size() <= 1 ? WriteType.SIMPLE :
WriteType.UNLOGGED_BATCH;
try
{
+ int j = 0;
for (IMutation mutation : mutations)
{
if (mutation instanceof CounterMutation)
-
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter,
requestTime));
+ responseHandlers[j++] =
mutateCounter((CounterMutation)mutation, localDataCenter, requestTime);
else
- responseHandlers.add(performWrite(mutation,
consistencyLevel, localDataCenter, standardWritePerformer, null,
plainWriteType, requestTime));
+ responseHandlers[j++] = performWrite(mutation,
consistencyLevel, localDataCenter, standardWritePerformer, null,
plainWriteType, requestTime);
}
// upgrade to full quorum any failed cheap quorums
for (int i = 0 ; i < mutations.size() ; ++i)
{
if (!(mutations.get(i) instanceof CounterMutation)) // at the
moment, only non-counter writes support cheap quorums
-
responseHandlers.get(i).maybeTryAdditionalReplicas(mutations.get(i),
standardWritePerformer, localDataCenter);
+
responseHandlers[i].maybeTryAdditionalReplicas(mutations.get(i),
standardWritePerformer, localDataCenter);
}
// wait for writes. throws TimeoutException if necessary
@@ -1275,14 +1276,28 @@ public class StorageProxy implements StorageProxyMBean
{
//We could potentially pass a callback into performWrite. And add
callback provision for mutateCounter or mutateAtomically (sendToHintedEndPoints)
//However, Trade off between write metric per CF accuracy vs
performance hit due to callbacks. Similar issue exists with
CoordinatorReadLatency metric.
- Set<ColumnFamilyStore> uniqueColumnFamilyStores = new HashSet<>();
+ Set<ColumnFamilyStore> uniqueColumnFamilyStores = null;
+ // very frequently we update just a single table
+ // so an allocation of uniqueColumnFamilyStores set can be avoided
+ ColumnFamilyStore firstColumnFamilyStore = null;
for (IMutation mutation : mutations)
{
+ Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
for (TableId tableId : mutation.getTableIds())
{
- ColumnFamilyStore store =
Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(tableId);
- if (uniqueColumnFamilyStores.add(store))
+ ColumnFamilyStore store =
keyspace.getColumnFamilyStore(tableId);
+ if (firstColumnFamilyStore == null)
+ {
store.metric.coordinatorWriteLatency.update(latency,
NANOSECONDS);
+ firstColumnFamilyStore = store;
+ }
+ else if (!firstColumnFamilyStore.equals(store))
+ {
+ if (uniqueColumnFamilyStores == null)
+ uniqueColumnFamilyStores = new HashSet<>();
+ if (uniqueColumnFamilyStores.add(store))
+
store.metric.coordinatorWriteLatency.update(latency, NANOSECONDS);
+ }
}
}
}
diff --git a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
index 87d809c6c0..65e67925ad 100644
--- a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
+++ b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
@@ -75,7 +75,7 @@ import static
org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
* has exceeded the maximum number of allowed permits. The choices are to
either pause reads from the incoming socket
* and allow TCP backpressure to do the work, or to throw an explict exception
and rely on the client to back off.
*/
-public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
+public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler implements Flusher.OnFlushCleanup<Envelope>
{
private static final Logger logger =
LoggerFactory.getLogger(CQLMessageHandler.class);
private static final NoSpamLogger noSpamLogger =
NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
@@ -374,7 +374,7 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
ByteBuffer buf = bytes.get();
int idx = buf.position() + Envelope.Header.LENGTH;
final int end = idx + Ints.checkedCast(header.bodySizeInBytes);
- ByteBuf body = Unpooled.wrappedBuffer(buf.slice());
+ ByteBuf body = Unpooled.wrappedBuffer(buf); // buf.slice() is not
needed: Unpooled.wrappedBuffer does ByteBuffer.slice inside
body.readerIndex(Envelope.Header.LENGTH);
body.retain();
buf.position(end);
@@ -492,10 +492,11 @@ public class CQLMessageHandler<M extends Message> extends
AbstractMessageHandler
responseFrame,
request.getSource(),
payloadAllocator,
- this::release);
+ this);
}
- private void release(Flusher.FlushItem<Envelope> flushItem)
+ @Override
+ public void cleanup(Flusher.FlushItem<Envelope> flushItem)
{
release(flushItem.request.header);
flushItem.request.release();
diff --git a/src/java/org/apache/cassandra/transport/Envelope.java
b/src/java/org/apache/cassandra/transport/Envelope.java
index 99c6e135af..b425abc63e 100644
--- a/src/java/org/apache/cassandra/transport/Envelope.java
+++ b/src/java/org/apache/cassandra/transport/Envelope.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.transport;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.EnumSet;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
@@ -79,7 +78,7 @@ public class Envelope
return new Envelope(header,
Unpooled.wrappedBuffer(ByteBufferUtil.clone(body.nioBuffer())));
}
- public static Envelope create(Message.Type type, int streamId,
ProtocolVersion version, EnumSet<Header.Flag> flags, ByteBuf body)
+ public static Envelope create(Message.Type type, int streamId,
ProtocolVersion version, int flags, ByteBuf body)
{
Header header = new Header(version, flags, streamId, type,
body.readableBytes());
return new Envelope(header, body);
@@ -92,7 +91,7 @@ public class Envelope
Message.Type type = header.type;
buf.writeByte(type.direction.addToVersion(header.version.asInt()));
- buf.writeByte(Header.Flag.serialize(header.flags));
+ buf.writeByte(header.flags);
// Continue to support writing pre-v3 headers so that we can give
proper error messages to drivers that
// connect with the v1/v2 protocol. See CASSANDRA-11464.
@@ -110,7 +109,7 @@ public class Envelope
public void encodeHeaderInto(ByteBuffer buf)
{
buf.put((byte)
header.type.direction.addToVersion(header.version.asInt()));
- buf.put((byte) Envelope.Header.Flag.serialize(header.flags));
+ buf.put((byte) header.flags);
if (header.version.isGreaterOrEqualTo(ProtocolVersion.V3))
buf.putShort((short) header.streamId);
@@ -125,7 +124,11 @@ public class Envelope
public void encodeInto(ByteBuffer buf)
{
encodeHeaderInto(buf);
- buf.put(body.nioBuffer());
+ // an alternative logic for : buf.put(body.nioBuffer()) without
ByteBuffer slicing
+ int originalLimit = buf.limit();
+ buf.limit(buf.position() + body.readableBytes());
+ body.readBytes(buf);
+ buf.limit(originalLimit);
}
public static class Header
@@ -136,12 +139,12 @@ public class Envelope
public static final int BODY_LENGTH_SIZE = 4;
public final ProtocolVersion version;
- public final EnumSet<Flag> flags;
+ public int flags;
public final int streamId;
public final Message.Type type;
public final long bodySizeInBytes;
- private Header(ProtocolVersion version, EnumSet<Flag> flags, int
streamId, Message.Type type, long bodySizeInBytes)
+ private Header(ProtocolVersion version, int flags, int streamId,
Message.Type type, long bodySizeInBytes)
{
this.version = version;
this.flags = flags;
@@ -150,6 +153,16 @@ public class Envelope
this.bodySizeInBytes = bodySizeInBytes;
}
+ public void addFlag(Flag flag)
+ {
+ this.flags = Flag.add(this.flags, flag);
+ }
+
+ public boolean hasFlag(Flag flag)
+ {
+ return Flag.contains(this.flags, flag);
+ }
+
public enum Flag
{
// The order of that enum matters!!
@@ -159,25 +172,27 @@ public class Envelope
WARNING,
USE_BETA;
- private static final Flag[] ALL_VALUES = values();
+ private final int mask;
- public static EnumSet<Flag> deserialize(int flags)
+ Flag()
{
- EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
- for (int n = 0; n < ALL_VALUES.length; n++)
- {
- if ((flags & (1 << n)) != 0)
- set.add(ALL_VALUES[n]);
- }
- return set;
+ this.mask = 1 << this.ordinal();
}
- public static int serialize(EnumSet<Flag> flags)
+ public static int none()
{
- int i = 0;
- for (Flag flag : flags)
- i |= 1 << flag.ordinal();
- return i;
+ return 0;
+ }
+
+ public static int add(int flags, Flag flagToAdd)
+ {
+ flags |= flagToAdd.mask;
+ return flags;
+ }
+
+ public static boolean contains(long flags, Flag flag)
+ {
+ return (flags & flag.mask) != 0;
}
}
}
@@ -236,15 +251,14 @@ public class Envelope
Message.Direction direction =
Message.Direction.extractFromVersion(firstByte);
Message.Type type;
ProtocolVersion version;
- EnumSet<Header.Flag> decodedFlags;
try
{
// This throws a protocol exception if the version number is
unsupported,
// the opcode is unknown or invalid flags are set for the
version
version = ProtocolVersion.decode(versionNum,
DatabaseDescriptor.getNativeTransportAllowOlderProtocols());
- decodedFlags = decodeFlags(version, flags);
+ validateFlags(version, flags);
type = Message.Type.fromOpcode(opcode, direction);
- return new HeaderExtractionResult.Success(new Header(version,
decodedFlags, streamId, type, bodyLength));
+ return new HeaderExtractionResult.Success(new Header(version,
flags, streamId, type, bodyLength));
}
catch (ProtocolException e)
{
@@ -372,7 +386,7 @@ public class Envelope
return null;
int flags = buffer.getByte(idx++);
- EnumSet<Header.Flag> decodedFlags = decodeFlags(version, flags);
+ validateFlags(version, flags);
int streamId = buffer.getShort(idx);
idx += 2;
@@ -417,17 +431,14 @@ public class Envelope
idx += bodyLength;
buffer.readerIndex(idx);
- return new Envelope(new Header(version, decodedFlags, streamId,
type, bodyLength), body);
+ return new Envelope(new Header(version, flags, streamId, type,
bodyLength), body);
}
- private EnumSet<Header.Flag> decodeFlags(ProtocolVersion version, int
flags)
+ private void validateFlags(ProtocolVersion version, int flags)
{
- EnumSet<Header.Flag> decodedFlags = Header.Flag.deserialize(flags);
-
- if (version.isBeta() &&
!decodedFlags.contains(Header.Flag.USE_BETA))
+ if (version.isBeta() && !Header.Flag.contains(flags,
Header.Flag.USE_BETA))
throw new ProtocolException(String.format("Beta version of the
protocol used (%s), but USE_BETA flag is unset", version),
version);
- return decodedFlags;
}
@Override
@@ -488,7 +499,7 @@ public class Envelope
{
Connection connection =
ctx.channel().attr(Connection.attributeKey).get();
- if (!source.header.flags.contains(Header.Flag.COMPRESSED) ||
connection == null)
+ if (!source.header.hasFlag(Header.Flag.COMPRESSED) || connection
== null)
{
results.add(source);
return;
@@ -529,7 +540,7 @@ public class Envelope
results.add(source);
return;
}
- source.header.flags.add(Header.Flag.COMPRESSED);
+ source.header.addFlag(Header.Flag.COMPRESSED);
results.add(compressor.compress(source));
}
}
diff --git a/src/java/org/apache/cassandra/transport/Flusher.java
b/src/java/org/apache/cassandra/transport/Flusher.java
index 50261de036..ebf708158a 100644
--- a/src/java/org/apache/cassandra/transport/Flusher.java
+++ b/src/java/org/apache/cassandra/transport/Flusher.java
@@ -23,7 +23,6 @@ import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -49,6 +48,9 @@ abstract class Flusher implements Runnable
Math.min(BufferPool.NORMAL_CHUNK_SIZE,
FrameEncoder.Payload.MAX_SIZE -
Math.max(FrameEncoderCrc.HEADER_AND_TRAILER_LENGTH,
FrameEncoderLZ4.HEADER_AND_TRAILER_LENGTH));
+ interface OnFlushCleanup<T> {
+ void cleanup(FlushItem<T> item);
+ }
static class FlushItem<T>
{
enum Kind {FRAMED, UNFRAMED}
@@ -57,9 +59,9 @@ abstract class Flusher implements Runnable
final Channel channel;
final T response;
final Envelope request;
- final Consumer<FlushItem<T>> tidy;
+ final OnFlushCleanup<T> tidy;
- FlushItem(Kind kind, Channel channel, T response, Envelope request,
Consumer<FlushItem<T>> tidy)
+ FlushItem(Kind kind, Channel channel, T response, Envelope request,
OnFlushCleanup<T> tidy)
{
this.kind = kind;
this.channel = channel;
@@ -70,7 +72,7 @@ abstract class Flusher implements Runnable
void release()
{
- tidy.accept(this);
+ tidy.cleanup(this);
}
static class Framed extends FlushItem<Envelope>
@@ -80,7 +82,7 @@ abstract class Flusher implements Runnable
Envelope response,
Envelope request,
FrameEncoder.PayloadAllocator allocator,
- Consumer<FlushItem<Envelope>> tidy)
+ OnFlushCleanup<Envelope> tidy)
{
super(Kind.FRAMED, channel, response, request, tidy);
this.allocator = allocator;
@@ -89,7 +91,7 @@ abstract class Flusher implements Runnable
static class Unframed extends FlushItem<Response>
{
- Unframed(Channel channel, Response response, Envelope request,
Consumer<FlushItem<Response>> tidy)
+ Unframed(Channel channel, Response response, Envelope request,
OnFlushCleanup<Response> tidy)
{
super(Kind.UNFRAMED, channel, response, request, tidy);
}
@@ -156,8 +158,14 @@ abstract class Flusher implements Runnable
}
else
{
- payloads.computeIfAbsent(flush.channel, channel -> new
FlushBuffer(channel, flush.allocator, 5))
- .add(flush.response);
+ FlushBuffer flushBuffer = payloads.get(flush.channel);
+ if (flushBuffer == null)
+ {
+ flushBuffer = new FlushBuffer(flush.channel, flush.allocator,
5);
+ payloads.put(flushBuffer.channel, flushBuffer);
+ }
+
+ flushBuffer.add(flush.response);
}
}
@@ -226,8 +234,9 @@ abstract class Flusher implements Runnable
protected void flushWrittenChannels()
{
// flush the channels pre-V5 to which messages were written in
writeSingleResponse
- for (Channel channel : channels)
- channel.flush();
+ if (!channels.isEmpty())
+ for (Channel channel : channels)
+ channel.flush();
// Framed messages (V5) are grouped by channel, now encode them into
payloads, write and flush
for (FlushBuffer buffer : payloads.values())
@@ -247,8 +256,11 @@ abstract class Flusher implements Runnable
// collated into frames, and so their buffers can be released
immediately after flushing.
// In V4 however, the buffers containing each CQL envelope are emitted
from Envelope.Encoder
// and so releasing them is handled by Netty internally.
- for (FlushItem<?> item : processed)
+ for (int i = 0; i < processed.size(); i++)
+ {
+ FlushItem<?> item = processed.get(i);
item.release();
+ }
payloads.clear();
channels.clear();
@@ -298,8 +310,9 @@ abstract class Flusher implements Runnable
int writtenBytes = 0;
int messagesToWrite = this.size();
FrameEncoder.Payload sending = allocate(sizeInBytes,
messagesToWrite);
- for (Envelope f : this)
+ for (int i = 0; i < this.size(); i++)
{
+ Envelope f = this.get(i);
messageSize = envelopeSize(f.header);
if (sending.remaining() < messageSize)
{
diff --git a/src/java/org/apache/cassandra/transport/Message.java
b/src/java/org/apache/cassandra/transport/Message.java
index ed853c0cbd..47d4289b07 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.transport;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
-import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.*;
+import org.apache.cassandra.transport.Envelope.Header.Flag;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.ReflectionUtils;
@@ -328,7 +328,7 @@ public abstract class Message
public Envelope encode(ProtocolVersion version)
{
- EnumSet<Envelope.Header.Flag> flags =
EnumSet.noneOf(Envelope.Header.Flag.class);
+ int flags = Flag.none();
@SuppressWarnings("unchecked")
Codec<Message> codec = (Codec<Message>)this.type.codec;
try
@@ -366,24 +366,24 @@ public abstract class Message
if (tracingId != null)
{
CBUtil.writeUUID(tracingId, body);
- flags.add(Envelope.Header.Flag.TRACING);
+ flags = Flag.add(flags, Flag.TRACING);
}
if (warnings != null)
{
CBUtil.writeStringList(warnings, body);
- flags.add(Envelope.Header.Flag.WARNING);
+ flags = Flag.add(flags, Flag.WARNING);
}
if (customPayload != null)
{
CBUtil.writeBytesMap(customPayload, body);
- flags.add(Envelope.Header.Flag.CUSTOM_PAYLOAD);
+ flags = Flag.add(flags, Flag.CUSTOM_PAYLOAD);
}
}
else
{
assert this instanceof Request;
if (((Request)this).isTracingRequested())
- flags.add(Envelope.Header.Flag.TRACING);
+ flags = Flag.add(flags, Flag.TRACING);
Map<String, ByteBuffer> payload = getCustomPayload();
if (payload != null)
messageSize += CBUtil.sizeOfBytesMap(payload);
@@ -391,7 +391,7 @@ public abstract class Message
if (payload != null)
{
CBUtil.writeBytesMap(payload, body);
- flags.add(Envelope.Header.Flag.CUSTOM_PAYLOAD);
+ flags = Flag.add(flags, Flag.CUSTOM_PAYLOAD);
}
}
@@ -412,7 +412,7 @@ public abstract class Message
: forcedProtocolVersion;
if (responseVersion.isBeta())
- flags.add(Envelope.Header.Flag.USE_BETA);
+ flags = Flag.add(flags, Flag.USE_BETA);
return Envelope.create(type, getStreamId(), responseVersion,
flags, body);
}
@@ -427,9 +427,9 @@ public abstract class Message
static Message decodeMessage(Channel channel, Envelope inbound)
{
boolean isRequest = inbound.header.type.direction ==
Direction.REQUEST;
- boolean isTracing =
inbound.header.flags.contains(Envelope.Header.Flag.TRACING);
- boolean isCustomPayload =
inbound.header.flags.contains(Envelope.Header.Flag.CUSTOM_PAYLOAD);
- boolean hasWarning =
inbound.header.flags.contains(Envelope.Header.Flag.WARNING);
+ boolean isTracing = inbound.header.hasFlag(Flag.TRACING);
+ boolean isCustomPayload =
inbound.header.hasFlag(Flag.CUSTOM_PAYLOAD);
+ boolean hasWarning = inbound.header.hasFlag(Flag.WARNING);
TimeUUID tracingId = isRequest || !isTracing ? null :
CBUtil.readTimeUUID(inbound.body);
List<String> warnings = isRequest || !hasWarning ? null :
CBUtil.readStringList(inbound.body);
diff --git a/test/unit/org/apache/cassandra/cql3/QueryOptionsFlagsTest.java
b/test/unit/org/apache/cassandra/cql3/QueryOptionsFlagsTest.java
new file mode 100644
index 0000000000..58b6f103f7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/QueryOptionsFlagsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.QueryOptions.Codec.Flag;
+
+public class QueryOptionsFlagsTest
+{
+ @Test
+ public void checkFlagOperations()
+ {
+ int flags = Flag.none();
+ for (Flag flag : Flag.values())
+ {
+ flags = Flag.add(flags, flag);
+ Assert.assertTrue(Flag.contains(flags, flag));
+ for (int i = flag.ordinal() + 1; i < Flag.values().length; i++)
+ Assert.assertFalse(Flag.contains(flags, Flag.values()[i]));
+ }
+ for (Flag flag : Flag.values())
+ {
+ flags = Flag.remove(flags, flag);
+ Assert.assertFalse(Flag.contains(flags, flag));
+ for (int i = flag.ordinal() + 1; i < Flag.values().length; i++)
+ Assert.assertTrue(Flag.contains(flags, Flag.values()[i]));
+ }
+
+ }
+
+ @Test
+ public void checkFlagEncoding()
+ {
+ int flags = Flag.none();
+ flags = Flag.add(flags, Flag.VALUES);
+ flags = Flag.add(flags, Flag.PAGING_STATE);
+ flags = Flag.add(flags, Flag.TIMESTAMP);
+
+ Assert.assertEquals(flags, 0x0001 | 0x0008 | 0x0020);
+ }
+
+ @Test
+ public void checkFlagDecoding()
+ {
+ int flags = 0x0001 | 0x0040 | 0x0004 | 0x0100;
+ Assert.assertTrue(Flag.contains(flags, Flag.VALUES));
+ Assert.assertTrue(Flag.contains(flags, Flag.NAMES_FOR_VALUES));
+ Assert.assertTrue(Flag.contains(flags, Flag.PAGE_SIZE));
+ Assert.assertFalse(Flag.contains(flags, Flag.SKIP_METADATA));
+ Assert.assertTrue(Flag.contains(flags, Flag.NOW_IN_SECONDS));
+ }
+}
diff --git a/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
b/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
index e01128a6d7..a3c9bfe407 100644
--- a/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
+++ b/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
@@ -524,7 +524,7 @@ public class CQLConnectionTest
return Envelope.create(type,
streamId,
ProtocolVersion.V5,
- EnumSet.of(Envelope.Header.Flag.USE_BETA),
+
Envelope.Header.Flag.add(Envelope.Header.Flag.none(),
Envelope.Header.Flag.USE_BETA),
Unpooled.wrappedBuffer(bytes));
}
diff --git
a/test/unit/org/apache/cassandra/transport/EnvelopeHeaderFlagsTest.java
b/test/unit/org/apache/cassandra/transport/EnvelopeHeaderFlagsTest.java
new file mode 100644
index 0000000000..de31d1bedb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/EnvelopeHeaderFlagsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.transport.Envelope.Header.Flag;
+
+public class EnvelopeHeaderFlagsTest
+{
+ @Test
+ public void checkFlagOperations()
+ {
+ int flags = Flag.none();
+ for (Flag flag : Flag.values())
+ {
+ flags = Flag.add(flags, flag);
+ Assert.assertTrue(Flag.contains(flags, flag));
+ for (int i = flag.ordinal() + 1; i < Flag.values().length; i++)
+ Assert.assertFalse(Flag.contains(flags, Flag.values()[i]));
+ }
+ }
+
+ @Test
+ public void checkFlagEncoding()
+ {
+ int flags = Flag.none();
+ flags = Flag.add(flags, Flag.COMPRESSED);
+ flags = Flag.add(flags, Flag.TRACING);
+ flags = Flag.add(flags, Flag.USE_BETA);
+
+ Assert.assertEquals(flags, 0x01 | 0x02 | 0x10);
+ }
+
+ @Test
+ public void checkFlagDecoding()
+ {
+ int flags = 0x02 | 0x08 | 0x10;
+ Assert.assertFalse(Flag.contains(flags, Flag.COMPRESSED));
+ Assert.assertTrue(Flag.contains(flags, Flag.TRACING));
+ Assert.assertFalse(Flag.contains(flags, Flag.CUSTOM_PAYLOAD));
+ Assert.assertTrue(Flag.contains(flags, Flag.WARNING));
+ Assert.assertTrue(Flag.contains(flags, Flag.USE_BETA));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]