This is an automated email from the ASF dual-hosted git repository.

konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eed4fbc8f7 Reduce heap memory allocations in different places along 
the hot write path Avoid iterator allocations if possible Handle typical cases 
(such as a single row, single table writes) more efficiently Add fast paths for 
typical scenarios (like absense of views and triggers) Memorize things which 
can be computed once
eed4fbc8f7 is described below

commit eed4fbc8f7789424d670c42a60f12150c48cd492
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Tue Mar 4 22:33:14 2025 +0300

    Reduce heap memory allocations in different places along the hot write path
    Avoid iterator allocations if possible
    Handle typical cases (such as a single row, single table writes) more 
efficiently
    Add fast paths for typical scenarios (like absense of views and triggers)
    Memorize things which can be computed once
    
    Patch by Dmitry Konstantinov; reviewed by Chris Lohfink, Michael Semb 
Wever, Vladimir Sitnikov for CASSANDRA-20167
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/concurrent/SEPWorker.java |   4 +-
 src/java/org/apache/cassandra/cql3/Attributes.java |   4 +-
 .../apache/cassandra/cql3/BatchQueryOptions.java   |   6 +-
 .../org/apache/cassandra/cql3/CQLStatement.java    |   6 +-
 .../org/apache/cassandra/cql3/QueryOptions.java    | 139 +++++++++++----------
 .../apache/cassandra/cql3/UpdateParameters.java    |   4 +-
 .../cassandra/cql3/VariableSpecifications.java     |  15 +++
 .../cassandra/cql3/statements/BatchStatement.java  |  11 +-
 .../cql3/statements/DescribeStatement.java         |   4 +-
 .../cql3/statements/ModificationStatement.java     |  47 +++++--
 .../cassandra/cql3/statements/SelectStatement.java |   5 +-
 ...ingleTableSinglePartitionUpdatesCollector.java} |  61 ++++-----
 .../statements/SingleTableUpdatesCollector.java    |  34 +++--
 src/java/org/apache/cassandra/db/Keyspace.java     |   9 +-
 src/java/org/apache/cassandra/db/Mutation.java     |  11 +-
 .../cassandra/db/RegularAndStaticColumns.java      |  16 ++-
 .../apache/cassandra/db/filter/ColumnFilter.java   |   9 ++
 .../cassandra/db/partitions/PartitionUpdate.java   |  55 ++++++--
 .../org/apache/cassandra/db/rows/BTreeRow.java     |  19 +++
 .../apache/cassandra/db/rows/EncodingStats.java    |   7 ++
 src/java/org/apache/cassandra/db/view/View.java    |   2 +
 .../org/apache/cassandra/db/view/ViewManager.java  |  17 ++-
 .../cassandra/index/SecondaryIndexManager.java     |   2 +
 .../io/sstable/format/SortedTableWriter.java       |  16 ++-
 .../apache/cassandra/io/util/DataOutputBuffer.java |  12 ++
 .../apache/cassandra/locator/ReplicaLayout.java    |   2 +
 .../service/AbstractWriteResponseHandler.java      |  21 +++-
 .../org/apache/cassandra/service/ClientWarn.java   |  10 +-
 .../org/apache/cassandra/service/StorageProxy.java |  29 +++--
 .../cassandra/transport/CQLMessageHandler.java     |   9 +-
 .../org/apache/cassandra/transport/Envelope.java   |  77 +++++++-----
 .../org/apache/cassandra/transport/Flusher.java    |  37 ++++--
 .../org/apache/cassandra/transport/Message.java    |  22 ++--
 .../cassandra/cql3/QueryOptionsFlagsTest.java      |  70 +++++++++++
 .../cassandra/transport/CQLConnectionTest.java     |   2 +-
 .../transport/EnvelopeHeaderFlagsTest.java         |  62 +++++++++
 37 files changed, 616 insertions(+), 241 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9b84f503c4..f790960a21 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Reduce memory allocations in miscellaneous places along the hot write path 
(CASSANDRA-20167)
  * Provide keystore_password_file and truststore_password_file options to read 
credentials from a file (CASSANDRA-13428)
  * Unregistering a node should also remove it from tokenMap if it is there and 
recalculate the placements (CASSANDRA-20346)
  * Fix PartitionUpdate.isEmpty deserialization issue to avoid potential 
EOFException (CASSANDRA-20345)
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java 
b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
index 93c01fa7a3..add2ef8b06 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -39,6 +39,7 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> 
implements Runnabl
     private static final boolean SET_THREAD_NAME = 
SET_SEP_THREAD_NAME.getBoolean();
 
     final Long workerId;
+    final String workerIdThreadSuffix;
     final Thread thread;
     final SharedExecutorPool pool;
 
@@ -55,6 +56,7 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> 
implements Runnabl
     {
         this.pool = pool;
         this.workerId = workerId;
+        this.workerIdThreadSuffix = '-' + workerId.toString();
         thread = new FastThreadLocalThread(threadGroup, this, 
threadGroup.getName() + "-Worker-" + workerId);
         thread.setDaemon(true);
         set(initialState);
@@ -122,7 +124,7 @@ final class SEPWorker extends 
AtomicReference<SEPWorker.Work> implements Runnabl
                 if (assigned == null)
                     continue;
                 if (SET_THREAD_NAME)
-                    Thread.currentThread().setName(assigned.name + '-' + 
workerId);
+                    Thread.currentThread().setName(assigned.name + 
workerIdThreadSuffix);
 
                 task = assigned.tasks.poll();
                 currentTask.lazySet(task);
diff --git a/src/java/org/apache/cassandra/cql3/Attributes.java 
b/src/java/org/apache/cassandra/cql3/Attributes.java
index 492c23672a..493193b400 100644
--- a/src/java/org/apache/cassandra/cql3/Attributes.java
+++ b/src/java/org/apache/cassandra/cql3/Attributes.java
@@ -47,12 +47,14 @@ public class Attributes
      */
     public static final int MAX_TTL = 20 * 365 * 24 * 60 * 60; // 20 years in 
seconds
 
+    private static final Attributes NONE = new Attributes(null, null);
+
     private final Term timestamp;
     private final Term timeToLive;
 
     public static Attributes none()
     {
-        return new Attributes(null, null);
+        return NONE;
     }
 
     private Attributes(Term timestamp, Term timeToLive)
diff --git a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java 
b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
index 63be061e5e..dc38c4d0ce 100644
--- a/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.cassandra.utils.MD5Digest;
 
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -54,7 +56,7 @@ public abstract class BatchQueryOptions
 
     public abstract QueryOptions forStatement(int i);
 
-    public void prepareStatement(int i, List<ColumnSpecification> boundNames)
+    public void prepareStatement(int i, ImmutableList<ColumnSpecification> 
boundNames)
     {
         forStatement(i).prepare(boundNames);
     }
@@ -128,7 +130,7 @@ public abstract class BatchQueryOptions
         }
 
         @Override
-        public void prepareStatement(int i, List<ColumnSpecification> 
boundNames)
+        public void prepareStatement(int i, ImmutableList<ColumnSpecification> 
boundNames)
         {
             if (isPreparedStatement(i))
             {
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java 
b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index badf9c3428..349e79b30f 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.cql3;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.cassandra.audit.AuditLogContext;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.service.ClientState;
@@ -32,9 +34,9 @@ public interface CQLStatement
     /**
      * Returns all bind variables for the statement
      */
-    default List<ColumnSpecification> getBindVariables()
+    default ImmutableList<ColumnSpecification> getBindVariables()
     {
-        return Collections.emptyList();
+        return ImmutableList.of();
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java 
b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index d3093cbb8e..c4e6e33b25 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -110,7 +110,7 @@ public abstract class QueryOptions
                                        version);
     }
 
-    public static QueryOptions addColumnSpecifications(QueryOptions options, 
List<ColumnSpecification> columnSpecs)
+    public static QueryOptions addColumnSpecifications(QueryOptions options, 
ImmutableList<ColumnSpecification> columnSpecs)
     {
         return new OptionsWithColumnSpecifications(options, columnSpecs);
     }
@@ -476,10 +476,10 @@ public abstract class QueryOptions
     {
         private final ImmutableList<ColumnSpecification> columnSpecs;
 
-        OptionsWithColumnSpecifications(QueryOptions wrapped, 
List<ColumnSpecification> columnSpecs)
+        OptionsWithColumnSpecifications(QueryOptions wrapped, 
ImmutableList<ColumnSpecification> columnSpecs)
         {
             super(wrapped);
-            this.columnSpecs = ImmutableList.copyOf(columnSpecs);
+            this.columnSpecs = columnSpecs;
         }
 
         @Override
@@ -568,9 +568,9 @@ public abstract class QueryOptions
         }
     }
 
-    private static class Codec implements CBCodec<QueryOptions>
+    static class Codec implements CBCodec<QueryOptions>
     {
-        private enum Flag
+        enum Flag
         {
             // The order of that enum matters!!
             VALUES,
@@ -583,40 +583,53 @@ public abstract class QueryOptions
             KEYSPACE,
             NOW_IN_SECONDS;
 
-            private static final Flag[] ALL_VALUES = values();
+            private final int mask;
 
-            public static EnumSet<Flag> deserialize(int flags)
+            Flag()
             {
-                EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
-                for (int n = 0; n < ALL_VALUES.length; n++)
-                {
-                    if ((flags & (1 << n)) != 0)
-                        set.add(ALL_VALUES[n]);
-                }
-                return set;
+                this.mask = 1 << this.ordinal();
+            }
+
+            public static int none()
+            {
+                return 0;
+            }
+
+            public static boolean isEmpty(int flags)
+            {
+                return flags == 0;
+            }
+
+            public static int add(int flags, Flag flagToAdd)
+            {
+                flags |= flagToAdd.mask;
+                return flags;
+            }
+
+            public static int remove(int flags, Flag flagToRemove)
+            {
+                flags &= ~ flagToRemove.mask;
+                return flags;
             }
 
-            public static int serialize(EnumSet<Flag> flags)
+            public static boolean contains(long flags, Flag flag)
             {
-                int i = 0;
-                for (Flag flag : flags)
-                    i |= 1 << flag.ordinal();
-                return i;
+                return (flags & flag.mask) != 0;
             }
         }
 
         public QueryOptions decode(ByteBuf body, ProtocolVersion version)
         {
             ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
-            EnumSet<Flag> flags = 
Flag.deserialize(version.isGreaterOrEqualTo(ProtocolVersion.V5)
-                                                   ? 
(int)body.readUnsignedInt()
-                                                   : 
(int)body.readUnsignedByte());
+            int flags = version.isGreaterOrEqualTo(ProtocolVersion.V5)
+                        ? (int)body.readUnsignedInt()
+                        : (int)body.readUnsignedByte();
 
             List<ByteBuffer> values = Collections.<ByteBuffer>emptyList();
             List<String> names = null;
-            if (flags.contains(Flag.VALUES))
+            if (Flag.contains(flags, Flag.VALUES))
             {
-                if (flags.contains(Flag.NAMES_FOR_VALUES))
+                if (Flag.contains(flags, Flag.NAMES_FOR_VALUES))
                 {
                     Pair<List<String>, List<ByteBuffer>> namesAndValues = 
CBUtil.readNameAndValueList(body, version);
                     names = namesAndValues.left;
@@ -628,27 +641,27 @@ public abstract class QueryOptions
                 }
             }
 
-            boolean skipMetadata = flags.contains(Flag.SKIP_METADATA);
-            flags.remove(Flag.VALUES);
-            flags.remove(Flag.SKIP_METADATA);
+            boolean skipMetadata = Flag.contains(flags, Flag.SKIP_METADATA);
+            flags = Flag.remove(flags, Flag.VALUES);
+            flags = Flag.remove(flags, Flag.SKIP_METADATA);
 
             SpecificOptions options = SpecificOptions.DEFAULT;
-            if (!flags.isEmpty())
+            if (!Flag.isEmpty(flags))
             {
-                int pageSize = flags.contains(Flag.PAGE_SIZE) ? body.readInt() 
: -1;
-                PagingState pagingState = flags.contains(Flag.PAGING_STATE) ? 
PagingState.deserialize(CBUtil.readValueNoCopy(body), version) : null;
-                ConsistencyLevel serialConsistency = 
flags.contains(Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) : 
ConsistencyLevel.SERIAL;
+                int pageSize = Flag.contains(flags, Flag.PAGE_SIZE) ? 
body.readInt() : -1;
+                PagingState pagingState = Flag.contains(flags, 
Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValueNoCopy(body), 
version) : null;
+                ConsistencyLevel serialConsistency = Flag.contains(flags, 
Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) : 
ConsistencyLevel.SERIAL;
                 long timestamp = Long.MIN_VALUE;
-                if (flags.contains(Flag.TIMESTAMP))
+                if (Flag.contains(flags, Flag.TIMESTAMP))
                 {
                     long ts = body.readLong();
                     if (ts == Long.MIN_VALUE)
                         throw new ProtocolException(String.format("Out of 
bound timestamp, must be in [%d, %d] (got %d)", Long.MIN_VALUE + 1, 
Long.MAX_VALUE, ts));
                     timestamp = ts;
                 }
-                String keyspace = flags.contains(Flag.KEYSPACE) ? 
CBUtil.readString(body) : null;
-                long nowInSeconds = flags.contains(Flag.NOW_IN_SECONDS) ? 
CassandraUInt.toLong(body.readInt())
-                                                                        : 
UNSET_NOWINSEC;
+                String keyspace = Flag.contains(flags, Flag.KEYSPACE) ? 
CBUtil.readString(body) : null;
+                long nowInSeconds = Flag.contains(flags, Flag.NOW_IN_SECONDS) 
? CassandraUInt.toLong(body.readInt())
+                                                                              
: UNSET_NOWINSEC;
                 options = new SpecificOptions(pageSize, pagingState, 
serialConsistency, timestamp, keyspace, nowInSeconds);
             }
 
@@ -660,25 +673,25 @@ public abstract class QueryOptions
         {
             CBUtil.writeConsistencyLevel(options.getConsistency(), dest);
 
-            EnumSet<Flag> flags = gatherFlags(options, version);
+            int flags = gatherFlags(options, version);
             if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
-                dest.writeInt(Flag.serialize(flags));
+                dest.writeInt(flags);
             else
-                dest.writeByte((byte)Flag.serialize(flags));
+                dest.writeByte((byte) flags);
 
-            if (flags.contains(Flag.VALUES))
+            if (Flag.contains(flags, Flag.VALUES))
                 CBUtil.writeValueList(options.getValues(), dest);
-            if (flags.contains(Flag.PAGE_SIZE))
+            if (Flag.contains(flags, Flag.PAGE_SIZE))
                 dest.writeInt(options.getPageSize());
-            if (flags.contains(Flag.PAGING_STATE))
+            if (Flag.contains(flags, Flag.PAGING_STATE))
                 CBUtil.writeValue(options.getPagingState().serialize(version), 
dest);
-            if (flags.contains(Flag.SERIAL_CONSISTENCY))
+            if (Flag.contains(flags, Flag.SERIAL_CONSISTENCY))
                 CBUtil.writeConsistencyLevel(options.getSerialConsistency(), 
dest);
-            if (flags.contains(Flag.TIMESTAMP))
+            if (Flag.contains(flags, Flag.TIMESTAMP))
                 dest.writeLong(options.getSpecificOptions().timestamp);
-            if (flags.contains(Flag.KEYSPACE))
+            if (Flag.contains(flags, Flag.KEYSPACE))
                 CBUtil.writeAsciiString(options.getSpecificOptions().keyspace, 
dest);
-            if (flags.contains(Flag.NOW_IN_SECONDS))
+            if (Flag.contains(flags, Flag.NOW_IN_SECONDS))
                 
dest.writeInt(CassandraUInt.fromLong(options.getSpecificOptions().nowInSeconds));
 
             // Note that we don't really have to bother with NAMES_FOR_VALUES 
server side,
@@ -692,49 +705,49 @@ public abstract class QueryOptions
 
             size += CBUtil.sizeOfConsistencyLevel(options.getConsistency());
 
-            EnumSet<Flag> flags = gatherFlags(options, version);
+            int flags = gatherFlags(options, version);
             size += (version.isGreaterOrEqualTo(ProtocolVersion.V5) ? 4 : 1);
 
-            if (flags.contains(Flag.VALUES))
+            if (Flag.contains(flags, Flag.VALUES))
                 size += CBUtil.sizeOfValueList(options.getValues());
-            if (flags.contains(Flag.PAGE_SIZE))
+            if (Flag.contains(flags, Flag.PAGE_SIZE))
                 size += 4;
-            if (flags.contains(Flag.PAGING_STATE))
+            if (Flag.contains(flags, Flag.PAGING_STATE))
                 size += 
CBUtil.sizeOfValue(options.getPagingState().serializedSize(version));
-            if (flags.contains(Flag.SERIAL_CONSISTENCY))
+            if (Flag.contains(flags, Flag.SERIAL_CONSISTENCY))
                 size += 
CBUtil.sizeOfConsistencyLevel(options.getSerialConsistency());
-            if (flags.contains(Flag.TIMESTAMP))
+            if (Flag.contains(flags, Flag.TIMESTAMP))
                 size += 8;
-            if (flags.contains(Flag.KEYSPACE))
+            if (Flag.contains(flags, Flag.KEYSPACE))
                 size += 
CBUtil.sizeOfAsciiString(options.getSpecificOptions().keyspace);
-            if (flags.contains(Flag.NOW_IN_SECONDS))
+            if (Flag.contains(flags, Flag.NOW_IN_SECONDS))
                 size += 4;
 
             return size;
         }
 
-        private EnumSet<Flag> gatherFlags(QueryOptions options, 
ProtocolVersion version)
+        private int gatherFlags(QueryOptions options, ProtocolVersion version)
         {
-            EnumSet<Flag> flags = EnumSet.noneOf(Flag.class);
+            int flags = Flag.none();
             if (options.getValues().size() > 0)
-                flags.add(Flag.VALUES);
+                flags = Flag.add(flags, Flag.VALUES);
             if (options.skipMetadata())
-                flags.add(Flag.SKIP_METADATA);
+                flags = Flag.add(flags, Flag.SKIP_METADATA);
             if (options.getPageSize() >= 0)
-                flags.add(Flag.PAGE_SIZE);
+                flags = Flag.add(flags, Flag.PAGE_SIZE);
             if (options.getPagingState() != null)
-                flags.add(Flag.PAGING_STATE);
+                flags = Flag.add(flags, Flag.PAGING_STATE);
             if (options.getSerialConsistency() != ConsistencyLevel.SERIAL)
-                flags.add(Flag.SERIAL_CONSISTENCY);
+                flags = Flag.add(flags, Flag.SERIAL_CONSISTENCY);
             if (options.getSpecificOptions().timestamp != Long.MIN_VALUE)
-                flags.add(Flag.TIMESTAMP);
+                flags = Flag.add(flags, Flag.TIMESTAMP);
 
             if (version.isGreaterOrEqualTo(ProtocolVersion.V5))
             {
                 if (options.getSpecificOptions().keyspace != null)
-                    flags.add(Flag.KEYSPACE);
+                    flags = Flag.add(flags, Flag.KEYSPACE);
                 if (options.getSpecificOptions().nowInSeconds != 
UNSET_NOWINSEC)
-                    flags.add(Flag.NOW_IN_SECONDS);
+                    flags = Flag.add(flags, Flag.NOW_IN_SECONDS);
             }
 
             return flags;
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java 
b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 51df331b8d..d13d0b49a7 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -103,13 +103,13 @@ public class UpdateParameters
         if (clustering == Clustering.STATIC_CLUSTERING)
         {
             if (staticBuilder == null)
-                staticBuilder = BTreeRow.unsortedBuilder();
+                staticBuilder = BTreeRow.pooledUnsortedBuilder();
             builder = staticBuilder;
         }
         else
         {
             if (regularBuilder == null)
-                regularBuilder = BTreeRow.unsortedBuilder();
+                regularBuilder = BTreeRow.pooledUnsortedBuilder();
             builder = regularBuilder;
         }
 
diff --git a/src/java/org/apache/cassandra/cql3/VariableSpecifications.java 
b/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
index e58290eba9..504859cac4 100644
--- a/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
+++ b/src/java/org/apache/cassandra/cql3/VariableSpecifications.java
@@ -21,6 +21,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.collect.ImmutableList;
+
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 
@@ -28,6 +30,7 @@ public class VariableSpecifications
 {
     private final List<ColumnIdentifier> variableNames;
     private final List<ColumnSpecification> specs;
+    private volatile ImmutableList<ColumnSpecification> immutableSpecs;
     private final ColumnMetadata[] targetColumns;
 
     public VariableSpecifications(List<ColumnIdentifier> variableNames)
@@ -56,6 +59,17 @@ public class VariableSpecifications
         return specs;
     }
 
+    public ImmutableList<ColumnSpecification> getImmutableBindVariables()
+    {
+        ImmutableList<ColumnSpecification> result = immutableSpecs;
+        if (result == null) // strong syncrhronization is not needed, it is ok 
if sometimes we create several immutable lists
+        {
+            result = ImmutableList.copyOf(specs);
+            immutableSpecs = result;
+        }
+        return result;
+    }
+
     /**
      * Returns an array with the same length as the number of partition key 
columns for the table corresponding
      * to table.  Each short in the array represents the bind index of the 
marker that holds the value for that
@@ -87,6 +101,7 @@ public class VariableSpecifications
 
     public void add(int bindIndex, ColumnSpecification spec)
     {
+        assert immutableSpecs == null : "bind variable specs cannot be 
modified once we started to use them";
         if (spec instanceof ColumnMetadata)
             targetColumns[bindIndex] = (ColumnMetadata) spec;
 
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index e5104376ce..bfec675464 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import org.slf4j.Logger;
@@ -137,9 +138,9 @@ public class BatchStatement implements CQLStatement
     }
 
     @Override
-    public List<ColumnSpecification> getBindVariables()
+    public ImmutableList<ColumnSpecification> getBindVariables()
     {
-        return bindVariables.getBindVariables();
+        return bindVariables.getImmutableBindVariables();
     }
 
     @Override
@@ -419,8 +420,10 @@ public class BatchStatement implements CQLStatement
             throw new InvalidRequestException("Invalid empty serial 
consistency level");
 
         ClientState clientState = queryState.getClientState();
-        
Guardrails.writeConsistencyLevels.guard(EnumSet.of(options.getConsistency(), 
options.getSerialConsistency()),
-                                                clientState);
+        if (Guardrails.writeConsistencyLevels.enabled(clientState)) // to 
avoid EnumSet allocation
+            
Guardrails.writeConsistencyLevels.guard(EnumSet.of(options.getConsistency(),
+                                                               
options.getSerialConsistency()),
+                                                    clientState);
 
         for (int i = 0; i < statements.size(); i++ )
             statements.get(i).validateDiskUsage(options.forStatement(i), 
clientState);
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
index 360d34b293..64b7862d6d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DescribeStatement.java
@@ -107,9 +107,9 @@ public abstract class DescribeStatement<T> extends 
CQLStatement.Raw implements C
         return this;
     }
 
-    public final List<ColumnSpecification> getBindVariables()
+    public final ImmutableList<ColumnSpecification> getBindVariables()
     {
-        return Collections.emptyList();
+        return ImmutableList.of();
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 7698c0b59c..ce9da9a538 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,6 +104,8 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
 
     private final RegularAndStaticColumns requiresRead;
 
+    private final List<Function> functions;
+
     public ModificationStatement(StatementType type,
                                  VariableSpecifications bindVariables,
                                  TableMetadata metadata,
@@ -179,12 +182,13 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
 
         this.conditionColumns = conditionColumnsBuilder.build();
         this.requiresRead = requiresReadBuilder.build();
+        this.functions = findAllFunctions();
     }
 
     @Override
-    public List<ColumnSpecification> getBindVariables()
+    public ImmutableList<ColumnSpecification> getBindVariables()
     {
-        return bindVariables.getBindVariables();
+        return bindVariables.getImmutableBindVariables();
     }
 
     @Override
@@ -195,9 +199,19 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
 
     @Override
     public Iterable<Function> getFunctions()
+    {
+        return functions;
+    }
+
+    private List<Function> findAllFunctions()
     {
         List<Function> functions = new ArrayList<>();
         addFunctionsTo(functions);
+        if (functions.isEmpty())
+        {
+            functions = Collections.emptyList(); // to avoid a new Iterator 
object creation during each authorization
+        }
+
         return functions;
     }
 
@@ -518,8 +532,9 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
         if (options.getConsistency() == null)
             throw new InvalidRequestException("Invalid empty consistency 
level");
 
-        
Guardrails.writeConsistencyLevels.guard(EnumSet.of(options.getConsistency(), 
options.getSerialConsistency()),
-                                                queryState.getClientState());
+        if 
(Guardrails.writeConsistencyLevels.enabled(queryState.getClientState())) // to 
avoid EnumSet allocation
+            
Guardrails.writeConsistencyLevels.guard(EnumSet.of(options.getConsistency(), 
options.getSerialConsistency()),
+                                                    
queryState.getClientState());
 
         return hasConditions()
              ? executeWithCondition(queryState, options, requestTime)
@@ -768,10 +783,18 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
                                                    Dispatcher.RequestTime 
requestTime)
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(options, state);
-        HashMultiset<ByteBuffer> perPartitionKeyCounts = 
HashMultiset.create(keys);
-        SingleTableUpdatesCollector collector = new 
SingleTableUpdatesCollector(metadata, updatedColumns, perPartitionKeyCounts);
-        addUpdates(collector, keys, state, options, local, timestamp, 
nowInSeconds, requestTime);
-        return collector.toMutations(state);
+        if(keys.size() == 1)
+        {
+            SingleTableSinglePartitionUpdatesCollector collector = new 
SingleTableSinglePartitionUpdatesCollector(metadata, updatedColumns);
+            addUpdates(collector, keys, state, options, local, timestamp, 
nowInSeconds, requestTime);
+            return collector.toMutations(state);
+        } else
+        {
+            HashMultiset<ByteBuffer> perPartitionKeyCounts = 
HashMultiset.create(keys);
+            SingleTableUpdatesCollector collector = new 
SingleTableUpdatesCollector(metadata, updatedColumns, perPartitionKeyCounts);
+            addUpdates(collector, keys, state, options, local, timestamp, 
nowInSeconds, requestTime);
+            return collector.toMutations(state);
+        }
     }
 
     final void addUpdates(UpdatesCollector collector,
@@ -807,8 +830,12 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
 
                 PartitionUpdate.Builder updateBuilder = 
collector.getPartitionUpdateBuilder(metadata(), dk, options.getConsistency());
 
-                for (Slice slice : slices)
-                    addUpdateForKey(updateBuilder, slice, params);
+                if (slices == Slices.ALL) // to avoid Slices iterator 
allocation for a common case
+                    addUpdateForKey(updateBuilder, Slice.ALL, params);
+                else
+                    for (Slice slice : slices)
+                        addUpdateForKey(updateBuilder, slice, params);
+
             }
         }
         else
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 1762b776a7..209ba88f52 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -27,6 +27,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
@@ -176,9 +177,9 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
     }
 
     @Override
-    public List<ColumnSpecification> getBindVariables()
+    public ImmutableList<ColumnSpecification> getBindVariables()
     {
-        return bindVariables.getBindVariables();
+        return bindVariables.getImmutableBindVariables();
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
 
b/src/java/org/apache/cassandra/cql3/statements/SingleTableSinglePartitionUpdatesCollector.java
similarity index 60%
copy from 
src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
copy to 
src/java/org/apache/cassandra/cql3/statements/SingleTableSinglePartitionUpdatesCollector.java
index 5ff299eb88..c650ef0370 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/SingleTableSinglePartitionUpdatesCollector.java
@@ -17,13 +17,8 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Maps;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.CounterMutation;
@@ -41,7 +36,7 @@ import org.apache.cassandra.service.ClientState;
 /**
  * Utility class to collect updates.
  */
-final class SingleTableUpdatesCollector implements UpdatesCollector
+final class SingleTableSinglePartitionUpdatesCollector implements 
UpdatesCollector
 {
     /**
      * the table to be updated
@@ -52,67 +47,59 @@ final class SingleTableUpdatesCollector implements 
UpdatesCollector
      * the columns to update
      */
     private final RegularAndStaticColumns updatedColumns;
-
-    /**
-     * The number of updated rows per key.
-     */
-    private final HashMultiset<ByteBuffer> perPartitionKeyCounts;
-
     /**
      * the partition update builders per key
      */
-    private final Map<ByteBuffer, PartitionUpdate.Builder> puBuilders;
+    private PartitionUpdate.Builder builder;
 
     /**
      * if it is a counter table, we will set this
      */
     private ConsistencyLevel counterConsistencyLevel = null;
 
-    SingleTableUpdatesCollector(TableMetadata metadata, 
RegularAndStaticColumns updatedColumns, HashMultiset<ByteBuffer> 
perPartitionKeyCounts)
+    SingleTableSinglePartitionUpdatesCollector(TableMetadata metadata, 
RegularAndStaticColumns updatedColumns)
     {
         this.metadata = metadata;
         this.updatedColumns = updatedColumns;
-        this.perPartitionKeyCounts = perPartitionKeyCounts;
-        this.puBuilders = 
Maps.newHashMapWithExpectedSize(perPartitionKeyCounts.size());
     }
 
     public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata 
metadata, DecoratedKey dk, ConsistencyLevel consistency)
     {
         if (metadata.isCounter())
             counterConsistencyLevel = consistency;
-        PartitionUpdate.Builder builder = puBuilders.get(dk.getKey());
         if (builder == null)
         {
-            builder = new PartitionUpdate.Builder(metadata, dk, 
updatedColumns, perPartitionKeyCounts.count(dk.getKey()));
-            puBuilders.put(dk.getKey(), builder);
+            builder = new PartitionUpdate.Builder(metadata, dk, 
updatedColumns, 1);
         }
         return builder;
     }
 
     /**
      * Returns a collection containing all the mutations.
-     * @return a collection containing all the mutations.
      */
     @Override
     public List<IMutation> toMutations(ClientState state)
     {
-        List<IMutation> ms = new ArrayList<>(puBuilders.size());
-        for (PartitionUpdate.Builder builder : puBuilders.values())
-        {
-            IMutation mutation;
+        // it is possible that a modification statement does not create any 
mutations
+        // for example: DELETE FROM some_table WHERE part_key = 1 AND 
clust_key < 3 AND clust_key > 5
+        if (builder == null)
+            return Collections.emptyList();
+        return Collections.singletonList(createMutation(state, builder));
+    }
 
-            if (metadata.isVirtual())
-                mutation = new VirtualMutation(builder.build());
-            else if (metadata.isCounter())
-                mutation = new CounterMutation(new Mutation(builder.build()), 
counterConsistencyLevel);
-            else
-                mutation = new Mutation(builder.build());
+    private IMutation createMutation(ClientState state, 
PartitionUpdate.Builder builder)
+    {
+        IMutation mutation;
 
-            mutation.validateIndexedColumns(state);
-            mutation.validateSize(MessagingService.current_version, 
CommitLogSegment.ENTRY_OVERHEAD_SIZE);
-            ms.add(mutation);
-        }
+        if (metadata.isVirtual())
+            mutation = new VirtualMutation(builder.build());
+        else if (metadata.isCounter())
+            mutation = new CounterMutation(new Mutation(builder.build()), 
counterConsistencyLevel);
+        else
+            mutation = new Mutation(builder.build());
 
-        return ms;
+        mutation.validateIndexedColumns(state);
+        mutation.validateSize(MessagingService.current_version, 
CommitLogSegment.ENTRY_OVERHEAD_SIZE);
+        return mutation;
     }
-}
+}
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
 
b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
index 5ff299eb88..2da6b89180 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -96,23 +97,34 @@ final class SingleTableUpdatesCollector implements 
UpdatesCollector
     @Override
     public List<IMutation> toMutations(ClientState state)
     {
+        if (puBuilders.size() == 1)
+        {
+            PartitionUpdate.Builder builder = 
puBuilders.values().iterator().next();
+            return Collections.singletonList(createMutation(state, builder));
+        }
         List<IMutation> ms = new ArrayList<>(puBuilders.size());
         for (PartitionUpdate.Builder builder : puBuilders.values())
         {
-            IMutation mutation;
-
-            if (metadata.isVirtual())
-                mutation = new VirtualMutation(builder.build());
-            else if (metadata.isCounter())
-                mutation = new CounterMutation(new Mutation(builder.build()), 
counterConsistencyLevel);
-            else
-                mutation = new Mutation(builder.build());
-
-            mutation.validateIndexedColumns(state);
-            mutation.validateSize(MessagingService.current_version, 
CommitLogSegment.ENTRY_OVERHEAD_SIZE);
+            IMutation mutation = createMutation(state, builder);
             ms.add(mutation);
         }
 
         return ms;
     }
+
+    private IMutation createMutation(ClientState state, 
PartitionUpdate.Builder builder)
+    {
+        IMutation mutation;
+
+        if (metadata.isVirtual())
+            mutation = new VirtualMutation(builder.build());
+        else if (metadata.isCounter())
+            mutation = new CounterMutation(new Mutation(builder.build()), 
counterConsistencyLevel);
+        else
+            mutation = new Mutation(builder.build());
+
+        mutation.validateIndexedColumns(state);
+        mutation.validateSize(MessagingService.current_version, 
CommitLogSegment.ENTRY_OVERHEAD_SIZE);
+        return mutation;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index af651570bb..f731139e44 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -453,11 +453,11 @@ public class Keyspace
 
         Lock[] locks = null;
 
-        boolean requiresViewUpdate = updateIndexes && 
viewManager.updatesAffectView(Collections.singleton(mutation), false);
+        boolean requiresViewUpdate = updateIndexes && 
viewManager.updatesAffectView(mutation, false);
 
         if (requiresViewUpdate)
         {
-            mutation.viewLockAcquireStart.compareAndSet(0L, 
currentTimeMillis());
+            Mutation.viewLockAcquireStartUpdater.compareAndSet(mutation, 0L, 
currentTimeMillis());
 
             // the order of lock acquisition doesn't matter (from a deadlock 
perspective) because we only use tryLock()
             Collection<TableId> tableIds = mutation.getTableIds();
@@ -534,7 +534,7 @@ public class Keyspace
                 }
             }
 
-            long acquireTime = currentTimeMillis() - 
mutation.viewLockAcquireStart.get();
+            long acquireTime = currentTimeMillis() - 
Mutation.viewLockAcquireStartUpdater.get(mutation);
             // Metrics are only collected for droppable write operations
             // Bulk non-droppable operations (e.g. commitlog replay, hint 
delivery) are not measured
             if (isDroppable)
@@ -553,10 +553,11 @@ public class Keyspace
                     logger.error("Attempting to mutate non-existant table {} 
({}.{})", upd.metadata().id, upd.metadata().keyspace, upd.metadata().name);
                     continue;
                 }
-                AtomicLong baseComplete = new AtomicLong(Long.MAX_VALUE);
+                AtomicLong baseComplete = null;
 
                 if (requiresViewUpdate)
                 {
+                    baseComplete = new AtomicLong(Long.MAX_VALUE);
                     try
                     {
                         Tracing.trace("Creating materialized view mutations 
from base table replica");
diff --git a/src/java/org/apache/cassandra/db/Mutation.java 
b/src/java/org/apache/cassandra/db/Mutation.java
index 384f2b4972..0861bb64c4 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.function.Supplier;
 
 import com.google.common.base.Preconditions;
@@ -68,7 +68,10 @@ public class Mutation implements IMutation, 
Supplier<Mutation>
     // Time at which this mutation or the builder that built it was 
instantiated
     final long approxCreatedAtNanos;
     // keep track of when mutation has started waiting for a MV partition lock
-    final AtomicLong viewLockAcquireStart = new AtomicLong(0);
+
+    final static AtomicLongFieldUpdater<Mutation> viewLockAcquireStartUpdater =
+        AtomicLongFieldUpdater.newUpdater(Mutation.class, 
"viewLockAcquireStart");
+    volatile long viewLockAcquireStart;
 
     private final boolean cdcEnabled;
 
@@ -458,7 +461,7 @@ public class Mutation implements IMutation, 
Supplier<Mutation>
                     try (DataOutputBuffer dob = 
DataOutputBuffer.scratchBuffer.get())
                     {
                         serializeInternal(PartitionUpdate.serializer, 
mutation, dob, version);
-                        serialization = new 
CachedSerialization(dob.toByteArray());
+                        serialization = new 
CachedSerialization(dob.unsafeToByteArray());
                     }
                     catch (IOException e)
                     {
@@ -521,7 +524,7 @@ public class Mutation implements IMutation, 
Supplier<Mutation>
 
                 //Only cache serializations that don't hit the limit
                 if (!teeIn.isLimitReached())
-                    
m.cachedSerializations[MessagingService.getVersionOrdinal(version)] = new 
CachedSerialization(dob.toByteArray());
+                    
m.cachedSerializations[MessagingService.getVersionOrdinal(version)] = new 
CachedSerialization(dob.unsafeToByteArray());
 
                 return m;
             }
diff --git a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java 
b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
index b6da183d01..4daea2f4f7 100644
--- a/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
+++ b/src/java/org/apache/cassandra/db/RegularAndStaticColumns.java
@@ -153,6 +153,8 @@ public class RegularAndStaticColumns implements 
Iterable<ColumnMetadata>
         private BTree.Builder<ColumnMetadata> regularColumns;
         private BTree.Builder<ColumnMetadata> staticColumns;
 
+        private RegularAndStaticColumns lastAddedColumns;
+
         public Builder add(ColumnMetadata c)
         {
             if (c.isStatic())
@@ -180,18 +182,24 @@ public class RegularAndStaticColumns implements 
Iterable<ColumnMetadata>
 
         public Builder addAll(RegularAndStaticColumns columns)
         {
+            // for batch statements it is a frequent case when we have the 
same columns in each inner prepared statement
+            // we use == instead of uquals to make the optimization check cheap
+            if (lastAddedColumns != null && lastAddedColumns == columns) {
+                return this;
+            }
             if (regularColumns == null && !columns.regulars.isEmpty())
                 regularColumns = BTree.builder(naturalOrder());
 
-            for (ColumnMetadata c : columns.regulars)
-                regularColumns.add(c);
+            if (!columns.regulars.isEmpty())
+                regularColumns.addAll(columns.regulars);
 
             if (staticColumns == null && !columns.statics.isEmpty())
                 staticColumns = BTree.builder(naturalOrder());
 
-            for (ColumnMetadata c : columns.statics)
-                staticColumns.add(c);
+            if (!columns.statics.isEmpty())
+                staticColumns.addAll(columns.statics);
 
+            lastAddedColumns = columns;
             return this;
         }
 
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java 
b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index 90fc9f3a11..ae043039e2 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -180,6 +180,15 @@ public abstract class ColumnFilter
         return new WildCardColumnFilter(metadata.regularAndStaticColumns());
     }
 
+    /**
+     * A filter for a PartitionUpdate entity
+     *  which we've just constructed and there no a real need to filter it
+     */
+    public static ColumnFilter all(RegularAndStaticColumns columns)
+    {
+        return new WildCardColumnFilter(columns);
+    }
+
     /**
      * A filter that only fetches/queries the provided columns.
      * <p>
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index ff5d0f9035..00f26451c1 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -223,6 +223,18 @@ public class PartitionUpdate extends AbstractBTreePartition
         return new PartitionUpdate(iterator.metadata(), 
iterator.metadata().epoch, iterator.partitionKey(), holder, deletionInfo, 
false);
     }
 
+    /**
+     * An override of default AbstractBTreePartition iterator
+     * It is added as a performance optimization to avoid full-functional 
filtering
+     * using org.apache.cassandra.db.Columns.inOrderInclusionTester() predicate
+     * when we iterate over row within a PartitionUpdate
+     */
+    @Override
+    public UnfilteredRowIterator unfilteredIterator()
+    {
+        return 
unfilteredIterator(ColumnFilter.SelectionColumnFilter.all(columns()), 
Slices.ALL, false);
+    }
+
 
     public PartitionUpdate withOnlyPresentColumns()
     {
@@ -884,7 +896,11 @@ public class PartitionUpdate extends AbstractBTreePartition
         private final MutableDeletionInfo deletionInfo;
         private final boolean canHaveShadowedData;
         private Object[] tree = BTree.empty();
-        private final BTree.Builder<Row> rowBuilder;
+
+        private Row firstRow;
+        private BTree.Builder<Row> rowBuilder;
+
+        private final int initialRowCapacity;
         private Row staticRow = Rows.EMPTY_STATIC_ROW;
         private final RegularAndStaticColumns columns;
         private boolean isBuilt = false;
@@ -920,7 +936,7 @@ public class PartitionUpdate extends AbstractBTreePartition
             this.metadata = metadata;
             this.key = key;
             this.columns = columns;
-            this.rowBuilder = rowBuilder(initialRowCapacity);
+            this.initialRowCapacity = initialRowCapacity;
             this.canHaveShadowedData = canHaveShadowedData;
             this.deletionInfo = deletionInfo.mutableCopy();
             this.staticRow = staticRow;
@@ -963,19 +979,25 @@ public class PartitionUpdate extends 
AbstractBTreePartition
 
             if (row.isStatic())
             {
-                // this assert is expensive, and possibly of limited value; we 
should consider removing it
-                // or introducing a new class of assertions for test purposes
-                assert columns().statics.containsAll(row.columns()) : 
columns().statics + " is not superset of " + row.columns();
                 staticRow = staticRow.isEmpty()
                             ? row
                             : Rows.merge(staticRow, row);
             }
             else
             {
-                // this assert is expensive, and possibly of limited value; we 
should consider removing it
-                // or introducing a new class of assertions for test purposes
-                assert columns().regulars.containsAll(row.columns()) : 
columns().regulars + " is not superset of " + row.columns();
-                rowBuilder.add(row);
+                if (firstRow == null)
+                {
+                    firstRow = row;
+                }
+                else
+                {
+                    if (rowBuilder == null)
+                    {
+                        rowBuilder = rowBuilder(initialRowCapacity);
+                        rowBuilder.add(firstRow);
+                    }
+                    rowBuilder.add(row);
+                }
             }
         }
 
@@ -999,13 +1021,22 @@ public class PartitionUpdate extends 
AbstractBTreePartition
             return metadata;
         }
 
+        private static final UpdateFunction<Row, Row> ROWS_MERGE_FUNCTION = 
UpdateFunction.Simple.of(Rows::merge);
+
         public PartitionUpdate build()
         {
             // assert that we are not calling build() several times
             assert !isBuilt : "A PartitionUpdate.Builder should only get built 
once";
-            Object[] add = rowBuilder.build();
-            Object[] merged = BTree.<Row, Row, Row>update(tree, add, 
metadata.comparator,
-                                                          
UpdateFunction.Simple.of(Rows::merge));
+            Object[] add;
+            if (rowBuilder == null)
+            {
+                add = firstRow != null ? BTree.singleton(firstRow) : 
BTree.empty();
+            }
+            else
+            {
+                add = rowBuilder.build();
+            }
+            Object[] merged = BTree.<Row, Row, Row>update(tree, add, 
metadata.comparator, ROWS_MERGE_FUNCTION);
 
             EncodingStats newStats = 
EncodingStats.Collector.collect(staticRow, BTree.iterator(merged), 
deletionInfo);
 
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java 
b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 075a4f67fe..fe44fbde53 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.BTreeSearchIterator;
 import org.apache.cassandra.utils.btree.UpdateFunction;
+import org.apache.cassandra.utils.caching.TinyThreadLocalPool;
 import org.apache.cassandra.utils.memory.Cloner;
 
 /**
@@ -560,6 +561,17 @@ public class BTreeRow extends AbstractRow
         return new Builder(false);
     }
 
+    private static final TinyThreadLocalPool<Builder> POOL = new 
TinyThreadLocalPool<>();
+
+    public static Row.Builder pooledUnsortedBuilder() {
+        TinyThreadLocalPool.TinyPool<Builder> pool = POOL.get();
+        Builder builder = pool.poll();
+        if (builder == null)
+            builder = new Builder(false);
+        builder.pool = pool;
+        return builder;
+    }
+
     // This is only used by PartitionUpdate.CounterMark but other uses should 
be avoided as much as possible as it breaks our general
     // assumption that Row objects are immutable. This method should go away 
post-#6506 in particular.
     // This method is in particular not exposed by the Row API on purpose.
@@ -818,6 +830,8 @@ public class BTreeRow extends AbstractRow
 
         // For complex column at index i of 'columns', we store at 
complexDeletions[i] its complex deletion.
 
+        private TinyThreadLocalPool.TinyPool<Builder> pool;
+
         protected Builder(boolean isSorted)
         {
             cells_ = null;
@@ -873,6 +887,11 @@ public class BTreeRow extends AbstractRow
             this.deletion = Deletion.LIVE;
             this.cells_.reuse();
             this.hasComplex = false;
+            if (pool != null)
+            {
+                pool.offer(this);
+                pool = null;
+            }
         }
 
         public void addPrimaryKeyLivenessInfo(LivenessInfo info)
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java 
b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index d0f788ae5a..0acc67815b 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -109,6 +109,13 @@ public class EncodingStats implements IMeasurableMemory
                      ? that.minTTL
                      : (that.minTTL == TTL_EPOCH ? this.minTTL : 
Math.min(this.minTTL, that.minTTL));
 
+        // EncodingStats is immutable, so if the result feilds are the same as 
in the current object we can avoid new object creation
+        // usually we merge an older object with a newer one and timestamp 
usually grows, so chances to reuse the object are high
+        if (this.minTimestamp == minTimestamp
+            && this.minLocalDeletionTime == minDelTime
+            && this.minTTL == minTTL) {
+            return this;
+        }
         return new EncodingStats(minTimestamp, minDelTime, minTTL);
     }
 
diff --git a/src/java/org/apache/cassandra/db/view/View.java 
b/src/java/org/apache/cassandra/db/view/View.java
index 30bad17b34..e926edb3a9 100644
--- a/src/java/org/apache/cassandra/db/view/View.java
+++ b/src/java/org/apache/cassandra/db/view/View.java
@@ -238,6 +238,8 @@ public class View
     public static Iterable<ViewMetadata> findAll(String keyspace, String 
baseTable)
     {
         KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspace);
+        if (ksm.views.isEmpty()) // memory optimization, to avoid a capturing 
lambda allocation
+            return Collections.emptyList();
         return Iterables.filter(ksm.views, view -> 
view.baseTableName.equals(baseTable));
     }
 
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java 
b/src/java/org/apache/cassandra/db/view/ViewManager.java
index fe0f8236f4..06fdcb4db5 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -68,11 +68,25 @@ public class ViewManager
         this.keyspace = keyspace;
     }
 
+    public boolean updatesAffectView(IMutation mutation, boolean 
coordinatorBatchlog)
+    {
+        if (!enableCoordinatorBatchlog && coordinatorBatchlog)
+            return false;
+
+        if (viewsByName.isEmpty())
+            return false;
+
+        return updatesAffectView(Collections.singleton(mutation), 
coordinatorBatchlog);
+    }
+
     public boolean updatesAffectView(Collection<? extends IMutation> 
mutations, boolean coordinatorBatchlog)
     {
         if (!enableCoordinatorBatchlog && coordinatorBatchlog)
             return false;
 
+        if (viewsByName.isEmpty())
+            return false;
+
         ClusterMetadata metadata = ClusterMetadata.currentNullable();
         for (IMutation mutation : mutations)
         {
@@ -83,7 +97,8 @@ public class ViewManager
                 if (coordinatorBatchlog && 
keyspace.getReplicationStrategy().getReplicationFactor().allReplicas == 1)
                     continue;
 
-                if (!forTable(update.metadata()).updatedViews(update, 
metadata).isEmpty())
+                TableViews tableViews = forTable(update.metadata());
+                if (tableViews.hasViews() && !tableViews.updatedViews(update, 
metadata).isEmpty())
                     return true;
             }
         }
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index c0123933b8..791293fbb9 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -1326,6 +1326,8 @@ public class SecondaryIndexManager implements 
IndexRegistry, INotificationConsum
     @Override
     public void validate(PartitionUpdate update, ClientState state) throws 
InvalidRequestException
     {
+        if (indexes.isEmpty())
+            return;
         for (Index index : indexes.values())
             index.validate(update, state);
     }
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
index 4fca9ca181..6fba07ba1e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableWriter.java
@@ -259,7 +259,8 @@ public abstract class SortedTableWriter<P extends 
SortedTablePartitionWriter, I
 
     protected void onStartPartition(DecoratedKey key)
     {
-        notifyObservers(o -> o.startPartition(key, 
partitionWriter.getInitialPosition(), partitionWriter.getInitialPosition()));
+        if (hasObservers())
+            notifyObservers(o -> o.startPartition(key, 
partitionWriter.getInitialPosition(), partitionWriter.getInitialPosition()));
     }
 
     protected void onStaticRow(Row row)
@@ -269,22 +270,29 @@ public abstract class SortedTableWriter<P extends 
SortedTablePartitionWriter, I
 
     protected void onRow(Row row)
     {
-        notifyObservers(o -> o.nextUnfilteredCluster(row));
+        if (hasObservers())
+            notifyObservers(o -> o.nextUnfilteredCluster(row));
     }
 
     protected void onRangeTombstoneMarker(RangeTombstoneMarker marker)
     {
-        notifyObservers(o -> o.nextUnfilteredCluster(marker));
+        if (hasObservers())
+            notifyObservers(o -> o.nextUnfilteredCluster(marker));
     }
 
     protected abstract AbstractRowIndexEntry createRowIndexEntry(DecoratedKey 
key, DeletionTime partitionLevelDeletion, long finishResult) throws IOException;
 
     protected final void notifyObservers(Consumer<SSTableFlushObserver> action)
     {
-        if (observers != null && !observers.isEmpty())
+        if (hasObservers())
             observers.forEach(action);
     }
 
+    private boolean hasObservers()
+    {
+        return observers != null && !observers.isEmpty();
+    }
+
     @Override
     public void mark()
     {
diff --git a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java 
b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
index 3e157b6b9a..3cb5db0f00 100644
--- a/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
+++ b/src/java/org/apache/cassandra/io/util/DataOutputBuffer.java
@@ -290,6 +290,18 @@ public class DataOutputBuffer extends 
BufferedDataOutputStreamPlus
         return result;
     }
 
+    /**
+     * If the calling logic knows that no new calls to this object will happen 
after calling this
+     * method, then this method can avoid the ByteBuffer copying done in 
{@link #buffer()}.
+     */
+    public byte[] unsafeToByteArray()
+    {
+        ByteBuffer buffer = unsafeGetBufferAndFlip();
+        byte[] result = new byte[buffer.remaining()];
+        buffer.get(result);
+        return result;
+    }
+
     public String asString()
     {
         try
diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java 
b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
index 751737fafb..f0069f2555 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
@@ -301,6 +301,8 @@ public abstract class ReplicaLayout<E extends Endpoints<E>>
      */
     static <E extends Endpoints<E>> boolean haveWriteConflicts(E natural, E 
pending)
     {
+        if (pending.isEmpty())
+            return false;
         Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
         for (InetAddressAndPort pendingEndpoint : pending.endpoints())
         {
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 2b7342d571..343bac1c4f 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.service;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -77,7 +78,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements RequestCallback
     private static final 
AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater =
         
AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, 
"failures");
     private volatile int failures = 0;
-    private final Map<InetAddressAndPort, RequestFailureReason> 
failureReasonByEndpoint;
+    private volatile Map<InetAddressAndPort, RequestFailureReason> 
failureReasonByEndpoint;
     private final Dispatcher.RequestTime requestTime;
     private @Nullable final Supplier<Mutation> hintOnFailure;
 
@@ -106,7 +107,6 @@ public abstract class AbstractWriteResponseHandler<T> 
implements RequestCallback
         this.callback = callback;
         this.writeType = writeType;
         this.hintOnFailure = hintOnFailure;
-        this.failureReasonByEndpoint = new ConcurrentHashMap<>();
         this.requestTime = requestTime;
     }
 
@@ -129,12 +129,12 @@ public abstract class AbstractWriteResponseHandler<T> 
implements RequestCallback
 
         if (blockFor() + failures > candidateReplicaCount())
         {
-            if 
(RequestCallback.isTimeout(this.failureReasonByEndpoint.keySet().stream()
+            if 
(RequestCallback.isTimeout(this.getFailureReasonByEndpointMap().keySet().stream()
                                                                       
.filter(this::waitingFor) // DatacenterWriteResponseHandler filters errors from 
remote DCs
-                                                                      
.collect(Collectors.toMap(Function.identity(), 
this.failureReasonByEndpoint::get))))
+                                                                      
.collect(Collectors.toMap(Function.identity(), 
this.getFailureReasonByEndpointMap()::get))))
                 throwTimeout();
 
-            throw new WriteFailureException(replicaPlan.consistencyLevel(), 
ackCount(), blockFor(), writeType, this.failureReasonByEndpoint);
+            throw new WriteFailureException(replicaPlan.consistencyLevel(), 
ackCount(), blockFor(), writeType, this.getFailureReasonByEndpointMap());
         }
 
         if (replicaPlan.stillAppliesTo(ClusterMetadata.current()))
@@ -303,6 +303,12 @@ public abstract class AbstractWriteResponseHandler<T> 
implements RequestCallback
                 ? failuresUpdater.incrementAndGet(this)
                 : failures;
 
+        if (failureReasonByEndpoint == null)
+            synchronized (this)
+            {
+                if (failureReasonByEndpoint == null)
+                    failureReasonByEndpoint = new ConcurrentHashMap<>();
+            }
         failureReasonByEndpoint.put(from, failureReason);
 
         logFailureOrTimeoutToIdealCLDelegate();
@@ -377,4 +383,9 @@ public abstract class AbstractWriteResponseHandler<T> 
implements RequestCallback
             throw new UncheckedInterruptedException(e);
         }
     }
+
+    private Map<InetAddressAndPort, RequestFailureReason> 
getFailureReasonByEndpointMap()
+    {
+        return failureReasonByEndpoint != null ? failureReasonByEndpoint : 
Collections.emptyMap();
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/ClientWarn.java 
b/src/java/org/apache/cassandra/service/ClientWarn.java
index 13cb21d6b2..de8e84da13 100644
--- a/src/java/org/apache/cassandra/service/ClientWarn.java
+++ b/src/java/org/apache/cassandra/service/ClientWarn.java
@@ -77,7 +77,7 @@ public class ClientWarn extends ExecutorLocals.Impl
     public List<String> getWarnings()
     {
         State state = get();
-        if (state == null || state.warnings.isEmpty())
+        if (state == null || state.warnings == null || 
state.warnings.isEmpty())
             return null;
         return state.warnings;
     }
@@ -92,10 +92,16 @@ public class ClientWarn extends ExecutorLocals.Impl
         private boolean collecting = true;
         // This must be a thread-safe list. Even though it's wrapped in a 
ThreadLocal, it's propagated to each thread
         // from shared state, so multiple threads can reference the same State.
-        private final List<String> warnings = new CopyOnWriteArrayList<>();
+        private volatile List<String> warnings;
 
         private void add(String warning)
         {
+            if (warnings == null)
+                synchronized (this) {
+                    if (warnings == null) {
+                        warnings = new CopyOnWriteArrayList<>();
+                    }
+                }
             if (collecting && warnings.size() < FBUtilities.MAX_UNSIGNED_SHORT)
                 warnings.add(maybeTruncate(warning));
         }
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 31e60d68e6..f7219a4b58 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -880,24 +880,25 @@ public class StorageProxy implements StorageProxyMBean
         Tracing.trace("Determining replicas for mutation");
         final String localDataCenter = 
DatabaseDescriptor.getLocator().local().datacenter;
 
-        List<AbstractWriteResponseHandler<IMutation>> responseHandlers = new 
ArrayList<>(mutations.size());
+        AbstractWriteResponseHandler<IMutation>[] responseHandlers = new 
AbstractWriteResponseHandler[mutations.size()];
         WriteType plainWriteType = mutations.size() <= 1 ? WriteType.SIMPLE : 
WriteType.UNLOGGED_BATCH;
 
         try
         {
+            int j = 0;
             for (IMutation mutation : mutations)
             {
                 if (mutation instanceof CounterMutation)
-                    
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, 
requestTime));
+                    responseHandlers[j++] = 
mutateCounter((CounterMutation)mutation, localDataCenter, requestTime);
                 else
-                    responseHandlers.add(performWrite(mutation, 
consistencyLevel, localDataCenter, standardWritePerformer, null, 
plainWriteType, requestTime));
+                    responseHandlers[j++] = performWrite(mutation, 
consistencyLevel, localDataCenter, standardWritePerformer, null, 
plainWriteType, requestTime);
             }
 
             // upgrade to full quorum any failed cheap quorums
             for (int i = 0 ; i < mutations.size() ; ++i)
             {
                 if (!(mutations.get(i) instanceof CounterMutation)) // at the 
moment, only non-counter writes support cheap quorums
-                    
responseHandlers.get(i).maybeTryAdditionalReplicas(mutations.get(i), 
standardWritePerformer, localDataCenter);
+                    
responseHandlers[i].maybeTryAdditionalReplicas(mutations.get(i), 
standardWritePerformer, localDataCenter);
             }
 
             // wait for writes.  throws TimeoutException if necessary
@@ -1275,14 +1276,28 @@ public class StorageProxy implements StorageProxyMBean
         {
             //We could potentially pass a callback into performWrite. And add 
callback provision for mutateCounter or mutateAtomically (sendToHintedEndPoints)
             //However, Trade off between write metric per CF accuracy vs 
performance hit due to callbacks. Similar issue exists with 
CoordinatorReadLatency metric.
-            Set<ColumnFamilyStore> uniqueColumnFamilyStores = new HashSet<>();
+            Set<ColumnFamilyStore> uniqueColumnFamilyStores = null;
+            // very frequently we update just a single table
+            // so an allocation of uniqueColumnFamilyStores set can be avoided
+            ColumnFamilyStore firstColumnFamilyStore = null;
             for (IMutation mutation : mutations)
             {
+                Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
                 for (TableId tableId : mutation.getTableIds())
                 {
-                    ColumnFamilyStore store = 
Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(tableId);
-                    if (uniqueColumnFamilyStores.add(store))
+                    ColumnFamilyStore store = 
keyspace.getColumnFamilyStore(tableId);
+                    if (firstColumnFamilyStore == null)
+                    {
                         store.metric.coordinatorWriteLatency.update(latency, 
NANOSECONDS);
+                        firstColumnFamilyStore = store;
+                    }
+                    else if (!firstColumnFamilyStore.equals(store))
+                    {
+                        if (uniqueColumnFamilyStores == null)
+                            uniqueColumnFamilyStores = new HashSet<>();
+                        if (uniqueColumnFamilyStores.add(store))
+                            
store.metric.coordinatorWriteLatency.update(latency, NANOSECONDS);
+                    }
                 }
             }
         }
diff --git a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java 
b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
index 87d809c6c0..65e67925ad 100644
--- a/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
+++ b/src/java/org/apache/cassandra/transport/CQLMessageHandler.java
@@ -75,7 +75,7 @@ import static 
org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
  * has exceeded the maximum number of allowed permits. The choices are to 
either pause reads from the incoming socket
  * and allow TCP backpressure to do the work, or to throw an explict exception 
and rely on the client to back off.
  */
-public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
+public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler implements Flusher.OnFlushCleanup<Envelope>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(CQLMessageHandler.class);
     private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS);
@@ -374,7 +374,7 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
         ByteBuffer buf = bytes.get();
         int idx = buf.position() + Envelope.Header.LENGTH;
         final int end = idx + Ints.checkedCast(header.bodySizeInBytes);
-        ByteBuf body = Unpooled.wrappedBuffer(buf.slice());
+        ByteBuf body = Unpooled.wrappedBuffer(buf); // buf.slice() is not 
needed: Unpooled.wrappedBuffer does ByteBuffer.slice inside
         body.readerIndex(Envelope.Header.LENGTH);
         body.retain();
         buf.position(end);
@@ -492,10 +492,11 @@ public class CQLMessageHandler<M extends Message> extends 
AbstractMessageHandler
                           responseFrame,
                           request.getSource(),
                           payloadAllocator,
-                          this::release);
+                          this);
     }
 
-    private void release(Flusher.FlushItem<Envelope> flushItem)
+    @Override
+    public void cleanup(Flusher.FlushItem<Envelope> flushItem)
     {
         release(flushItem.request.header);
         flushItem.request.release();
diff --git a/src/java/org/apache/cassandra/transport/Envelope.java 
b/src/java/org/apache/cassandra/transport/Envelope.java
index 99c6e135af..b425abc63e 100644
--- a/src/java/org/apache/cassandra/transport/Envelope.java
+++ b/src/java/org/apache/cassandra/transport/Envelope.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.transport;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -79,7 +78,7 @@ public class Envelope
         return new Envelope(header, 
Unpooled.wrappedBuffer(ByteBufferUtil.clone(body.nioBuffer())));
     }
 
-    public static Envelope create(Message.Type type, int streamId, 
ProtocolVersion version, EnumSet<Header.Flag> flags, ByteBuf body)
+    public static Envelope create(Message.Type type, int streamId, 
ProtocolVersion version, int flags, ByteBuf body)
     {
         Header header = new Header(version, flags, streamId, type, 
body.readableBytes());
         return new Envelope(header, body);
@@ -92,7 +91,7 @@ public class Envelope
 
         Message.Type type = header.type;
         buf.writeByte(type.direction.addToVersion(header.version.asInt()));
-        buf.writeByte(Header.Flag.serialize(header.flags));
+        buf.writeByte(header.flags);
 
         // Continue to support writing pre-v3 headers so that we can give 
proper error messages to drivers that
         // connect with the v1/v2 protocol. See CASSANDRA-11464.
@@ -110,7 +109,7 @@ public class Envelope
     public void encodeHeaderInto(ByteBuffer buf)
     {
         buf.put((byte) 
header.type.direction.addToVersion(header.version.asInt()));
-        buf.put((byte) Envelope.Header.Flag.serialize(header.flags));
+        buf.put((byte) header.flags);
 
         if (header.version.isGreaterOrEqualTo(ProtocolVersion.V3))
             buf.putShort((short) header.streamId);
@@ -125,7 +124,11 @@ public class Envelope
     public void encodeInto(ByteBuffer buf)
     {
         encodeHeaderInto(buf);
-        buf.put(body.nioBuffer());
+        // an alternative logic for : buf.put(body.nioBuffer()) without 
ByteBuffer slicing
+        int originalLimit = buf.limit();
+        buf.limit(buf.position() + body.readableBytes());
+        body.readBytes(buf);
+        buf.limit(originalLimit);
     }
 
     public static class Header
@@ -136,12 +139,12 @@ public class Envelope
         public static final int BODY_LENGTH_SIZE = 4;
 
         public final ProtocolVersion version;
-        public final EnumSet<Flag> flags;
+        public int flags;
         public final int streamId;
         public final Message.Type type;
         public final long bodySizeInBytes;
 
-        private Header(ProtocolVersion version, EnumSet<Flag> flags, int 
streamId, Message.Type type, long bodySizeInBytes)
+        private Header(ProtocolVersion version, int flags, int streamId, 
Message.Type type, long bodySizeInBytes)
         {
             this.version = version;
             this.flags = flags;
@@ -150,6 +153,16 @@ public class Envelope
             this.bodySizeInBytes = bodySizeInBytes;
         }
 
+        public void addFlag(Flag flag)
+        {
+            this.flags = Flag.add(this.flags, flag);
+        }
+
+        public boolean hasFlag(Flag flag)
+        {
+            return Flag.contains(this.flags, flag);
+        }
+
         public enum Flag
         {
             // The order of that enum matters!!
@@ -159,25 +172,27 @@ public class Envelope
             WARNING,
             USE_BETA;
 
-            private static final Flag[] ALL_VALUES = values();
+            private final int mask;
 
-            public static EnumSet<Flag> deserialize(int flags)
+            Flag()
             {
-                EnumSet<Flag> set = EnumSet.noneOf(Flag.class);
-                for (int n = 0; n < ALL_VALUES.length; n++)
-                {
-                    if ((flags & (1 << n)) != 0)
-                        set.add(ALL_VALUES[n]);
-                }
-                return set;
+                this.mask = 1 << this.ordinal();
             }
 
-            public static int serialize(EnumSet<Flag> flags)
+            public static int none()
             {
-                int i = 0;
-                for (Flag flag : flags)
-                    i |= 1 << flag.ordinal();
-                return i;
+                return 0;
+            }
+
+            public static int add(int flags, Flag flagToAdd)
+            {
+                flags |= flagToAdd.mask;
+                return flags;
+            }
+
+            public static boolean contains(long flags, Flag flag)
+            {
+                return (flags & flag.mask) != 0;
             }
         }
     }
@@ -236,15 +251,14 @@ public class Envelope
             Message.Direction direction = 
Message.Direction.extractFromVersion(firstByte);
             Message.Type type;
             ProtocolVersion version;
-            EnumSet<Header.Flag> decodedFlags;
             try
             {
                 // This throws a protocol exception if the version number is 
unsupported,
                 // the opcode is unknown or invalid flags are set for the 
version
                 version = ProtocolVersion.decode(versionNum, 
DatabaseDescriptor.getNativeTransportAllowOlderProtocols());
-                decodedFlags = decodeFlags(version, flags);
+                validateFlags(version, flags);
                 type = Message.Type.fromOpcode(opcode, direction);
-                return new HeaderExtractionResult.Success(new Header(version, 
decodedFlags, streamId, type, bodyLength));
+                return new HeaderExtractionResult.Success(new Header(version, 
flags, streamId, type, bodyLength));
             }
             catch (ProtocolException e)
             {
@@ -372,7 +386,7 @@ public class Envelope
                 return null;
 
             int flags = buffer.getByte(idx++);
-            EnumSet<Header.Flag> decodedFlags = decodeFlags(version, flags);
+            validateFlags(version, flags);
 
             int streamId = buffer.getShort(idx);
             idx += 2;
@@ -417,17 +431,14 @@ public class Envelope
             idx += bodyLength;
             buffer.readerIndex(idx);
 
-            return new Envelope(new Header(version, decodedFlags, streamId, 
type, bodyLength), body);
+            return new Envelope(new Header(version, flags, streamId, type, 
bodyLength), body);
         }
 
-        private EnumSet<Header.Flag> decodeFlags(ProtocolVersion version, int 
flags)
+        private void validateFlags(ProtocolVersion version, int flags)
         {
-            EnumSet<Header.Flag> decodedFlags = Header.Flag.deserialize(flags);
-
-            if (version.isBeta() && 
!decodedFlags.contains(Header.Flag.USE_BETA))
+            if (version.isBeta() && !Header.Flag.contains(flags, 
Header.Flag.USE_BETA))
                 throw new ProtocolException(String.format("Beta version of the 
protocol used (%s), but USE_BETA flag is unset", version),
                                             version);
-            return decodedFlags;
         }
 
         @Override
@@ -488,7 +499,7 @@ public class Envelope
         {
             Connection connection = 
ctx.channel().attr(Connection.attributeKey).get();
 
-            if (!source.header.flags.contains(Header.Flag.COMPRESSED) || 
connection == null)
+            if (!source.header.hasFlag(Header.Flag.COMPRESSED) || connection 
== null)
             {
                 results.add(source);
                 return;
@@ -529,7 +540,7 @@ public class Envelope
                 results.add(source);
                 return;
             }
-            source.header.flags.add(Header.Flag.COMPRESSED);
+            source.header.addFlag(Header.Flag.COMPRESSED);
             results.add(compressor.compress(source));
         }
     }
diff --git a/src/java/org/apache/cassandra/transport/Flusher.java 
b/src/java/org/apache/cassandra/transport/Flusher.java
index 50261de036..ebf708158a 100644
--- a/src/java/org/apache/cassandra/transport/Flusher.java
+++ b/src/java/org/apache/cassandra/transport/Flusher.java
@@ -23,7 +23,6 @@ import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -49,6 +48,9 @@ abstract class Flusher implements Runnable
         Math.min(BufferPool.NORMAL_CHUNK_SIZE,
                  FrameEncoder.Payload.MAX_SIZE - 
Math.max(FrameEncoderCrc.HEADER_AND_TRAILER_LENGTH, 
FrameEncoderLZ4.HEADER_AND_TRAILER_LENGTH));
 
+    interface OnFlushCleanup<T> {
+        void cleanup(FlushItem<T> item);
+    }
     static class FlushItem<T>
     {
         enum Kind {FRAMED, UNFRAMED}
@@ -57,9 +59,9 @@ abstract class Flusher implements Runnable
         final Channel channel;
         final T response;
         final Envelope request;
-        final Consumer<FlushItem<T>> tidy;
+        final OnFlushCleanup<T> tidy;
 
-        FlushItem(Kind kind, Channel channel, T response, Envelope request, 
Consumer<FlushItem<T>> tidy)
+        FlushItem(Kind kind, Channel channel, T response, Envelope request, 
OnFlushCleanup<T> tidy)
         {
             this.kind = kind;
             this.channel = channel;
@@ -70,7 +72,7 @@ abstract class Flusher implements Runnable
 
         void release()
         {
-            tidy.accept(this);
+            tidy.cleanup(this);
         }
 
         static class Framed extends FlushItem<Envelope>
@@ -80,7 +82,7 @@ abstract class Flusher implements Runnable
                    Envelope response,
                    Envelope request,
                    FrameEncoder.PayloadAllocator allocator,
-                   Consumer<FlushItem<Envelope>> tidy)
+                   OnFlushCleanup<Envelope> tidy)
             {
                 super(Kind.FRAMED, channel, response, request, tidy);
                 this.allocator = allocator;
@@ -89,7 +91,7 @@ abstract class Flusher implements Runnable
 
         static class Unframed extends FlushItem<Response>
         {
-            Unframed(Channel channel, Response response, Envelope request, 
Consumer<FlushItem<Response>> tidy)
+            Unframed(Channel channel, Response response, Envelope request, 
OnFlushCleanup<Response> tidy)
             {
                 super(Kind.UNFRAMED, channel, response, request, tidy);
             }
@@ -156,8 +158,14 @@ abstract class Flusher implements Runnable
         }
         else
         {
-            payloads.computeIfAbsent(flush.channel, channel -> new 
FlushBuffer(channel, flush.allocator, 5))
-                    .add(flush.response);
+            FlushBuffer flushBuffer = payloads.get(flush.channel);
+            if (flushBuffer == null)
+            {
+                flushBuffer = new FlushBuffer(flush.channel, flush.allocator, 
5);
+                payloads.put(flushBuffer.channel, flushBuffer);
+            }
+
+            flushBuffer.add(flush.response);
         }
     }
 
@@ -226,8 +234,9 @@ abstract class Flusher implements Runnable
     protected void flushWrittenChannels()
     {
         // flush the channels pre-V5 to which messages were written in 
writeSingleResponse
-        for (Channel channel : channels)
-            channel.flush();
+        if (!channels.isEmpty())
+            for (Channel channel : channels)
+                channel.flush();
 
         // Framed messages (V5) are grouped by channel, now encode them into 
payloads, write and flush
         for (FlushBuffer buffer : payloads.values())
@@ -247,8 +256,11 @@ abstract class Flusher implements Runnable
         // collated into frames, and so their buffers can be released 
immediately after flushing.
         // In V4 however, the buffers containing each CQL envelope are emitted 
from Envelope.Encoder
         // and so releasing them is handled by Netty internally.
-        for (FlushItem<?> item : processed)
+        for (int i = 0; i < processed.size(); i++)
+        {
+            FlushItem<?> item = processed.get(i);
             item.release();
+        }
 
         payloads.clear();
         channels.clear();
@@ -298,8 +310,9 @@ abstract class Flusher implements Runnable
             int writtenBytes = 0;
             int messagesToWrite = this.size();
             FrameEncoder.Payload sending = allocate(sizeInBytes, 
messagesToWrite);
-            for (Envelope f : this)
+            for (int i = 0; i < this.size(); i++)
             {
+                Envelope f = this.get(i);
                 messageSize = envelopeSize(f.header);
                 if (sending.remaining() < messageSize)
                 {
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index ed853c0cbd..47d4289b07 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.transport;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.ByteBuffer;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 
@@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.*;
+import org.apache.cassandra.transport.Envelope.Header.Flag;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.utils.MonotonicClock;
 import org.apache.cassandra.utils.ReflectionUtils;
@@ -328,7 +328,7 @@ public abstract class Message
 
     public Envelope encode(ProtocolVersion version)
     {
-        EnumSet<Envelope.Header.Flag> flags = 
EnumSet.noneOf(Envelope.Header.Flag.class);
+        int flags = Flag.none();
         @SuppressWarnings("unchecked")
         Codec<Message> codec = (Codec<Message>)this.type.codec;
         try
@@ -366,24 +366,24 @@ public abstract class Message
                 if (tracingId != null)
                 {
                     CBUtil.writeUUID(tracingId, body);
-                    flags.add(Envelope.Header.Flag.TRACING);
+                    flags = Flag.add(flags, Flag.TRACING);
                 }
                 if (warnings != null)
                 {
                     CBUtil.writeStringList(warnings, body);
-                    flags.add(Envelope.Header.Flag.WARNING);
+                    flags = Flag.add(flags, Flag.WARNING);
                 }
                 if (customPayload != null)
                 {
                     CBUtil.writeBytesMap(customPayload, body);
-                    flags.add(Envelope.Header.Flag.CUSTOM_PAYLOAD);
+                    flags = Flag.add(flags, Flag.CUSTOM_PAYLOAD);
                 }
             }
             else
             {
                 assert this instanceof Request;
                 if (((Request)this).isTracingRequested())
-                    flags.add(Envelope.Header.Flag.TRACING);
+                    flags = Flag.add(flags, Flag.TRACING);
                 Map<String, ByteBuffer> payload = getCustomPayload();
                 if (payload != null)
                     messageSize += CBUtil.sizeOfBytesMap(payload);
@@ -391,7 +391,7 @@ public abstract class Message
                 if (payload != null)
                 {
                     CBUtil.writeBytesMap(payload, body);
-                    flags.add(Envelope.Header.Flag.CUSTOM_PAYLOAD);
+                    flags = Flag.add(flags, Flag.CUSTOM_PAYLOAD);
                 }
             }
 
@@ -412,7 +412,7 @@ public abstract class Message
                                               : forcedProtocolVersion;
 
             if (responseVersion.isBeta())
-                flags.add(Envelope.Header.Flag.USE_BETA);
+                flags = Flag.add(flags, Flag.USE_BETA);
 
             return Envelope.create(type, getStreamId(), responseVersion, 
flags, body);
         }
@@ -427,9 +427,9 @@ public abstract class Message
         static Message decodeMessage(Channel channel, Envelope inbound)
         {
             boolean isRequest = inbound.header.type.direction == 
Direction.REQUEST;
-            boolean isTracing = 
inbound.header.flags.contains(Envelope.Header.Flag.TRACING);
-            boolean isCustomPayload = 
inbound.header.flags.contains(Envelope.Header.Flag.CUSTOM_PAYLOAD);
-            boolean hasWarning = 
inbound.header.flags.contains(Envelope.Header.Flag.WARNING);
+            boolean isTracing = inbound.header.hasFlag(Flag.TRACING);
+            boolean isCustomPayload = 
inbound.header.hasFlag(Flag.CUSTOM_PAYLOAD);
+            boolean hasWarning = inbound.header.hasFlag(Flag.WARNING);
 
             TimeUUID tracingId = isRequest || !isTracing ? null : 
CBUtil.readTimeUUID(inbound.body);
             List<String> warnings = isRequest || !hasWarning ? null : 
CBUtil.readStringList(inbound.body);
diff --git a/test/unit/org/apache/cassandra/cql3/QueryOptionsFlagsTest.java 
b/test/unit/org/apache/cassandra/cql3/QueryOptionsFlagsTest.java
new file mode 100644
index 0000000000..58b6f103f7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/cql3/QueryOptionsFlagsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cql3;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.QueryOptions.Codec.Flag;
+
+public class QueryOptionsFlagsTest
+{
+    @Test
+    public void checkFlagOperations()
+    {
+        int flags = Flag.none();
+        for (Flag flag : Flag.values())
+        {
+            flags = Flag.add(flags, flag);
+            Assert.assertTrue(Flag.contains(flags, flag));
+            for (int i = flag.ordinal() + 1; i < Flag.values().length; i++)
+                Assert.assertFalse(Flag.contains(flags, Flag.values()[i]));
+        }
+        for (Flag flag : Flag.values())
+        {
+            flags = Flag.remove(flags, flag);
+            Assert.assertFalse(Flag.contains(flags, flag));
+            for (int i = flag.ordinal() + 1; i < Flag.values().length; i++)
+                Assert.assertTrue(Flag.contains(flags, Flag.values()[i]));
+        }
+
+    }
+
+    @Test
+    public void checkFlagEncoding()
+    {
+        int flags = Flag.none();
+        flags = Flag.add(flags, Flag.VALUES);
+        flags = Flag.add(flags, Flag.PAGING_STATE);
+        flags = Flag.add(flags, Flag.TIMESTAMP);
+
+        Assert.assertEquals(flags, 0x0001 | 0x0008 |  0x0020);
+    }
+
+    @Test
+    public void checkFlagDecoding()
+    {
+        int flags = 0x0001 | 0x0040 | 0x0004 | 0x0100;
+        Assert.assertTrue(Flag.contains(flags, Flag.VALUES));
+        Assert.assertTrue(Flag.contains(flags, Flag.NAMES_FOR_VALUES));
+        Assert.assertTrue(Flag.contains(flags, Flag.PAGE_SIZE));
+        Assert.assertFalse(Flag.contains(flags, Flag.SKIP_METADATA));
+        Assert.assertTrue(Flag.contains(flags, Flag.NOW_IN_SECONDS));
+    }
+}
diff --git a/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java 
b/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
index e01128a6d7..a3c9bfe407 100644
--- a/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
+++ b/test/unit/org/apache/cassandra/transport/CQLConnectionTest.java
@@ -524,7 +524,7 @@ public class CQLConnectionTest
         return Envelope.create(type,
                                streamId,
                                ProtocolVersion.V5,
-                               EnumSet.of(Envelope.Header.Flag.USE_BETA),
+                               
Envelope.Header.Flag.add(Envelope.Header.Flag.none(), 
Envelope.Header.Flag.USE_BETA),
                                Unpooled.wrappedBuffer(bytes));
     }
 
diff --git 
a/test/unit/org/apache/cassandra/transport/EnvelopeHeaderFlagsTest.java 
b/test/unit/org/apache/cassandra/transport/EnvelopeHeaderFlagsTest.java
new file mode 100644
index 0000000000..de31d1bedb
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/EnvelopeHeaderFlagsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.transport.Envelope.Header.Flag;
+
+public class EnvelopeHeaderFlagsTest
+{
+    @Test
+    public void checkFlagOperations()
+    {
+        int flags = Flag.none();
+        for (Flag flag : Flag.values())
+        {
+            flags = Flag.add(flags, flag);
+            Assert.assertTrue(Flag.contains(flags, flag));
+            for (int i = flag.ordinal() + 1; i < Flag.values().length; i++)
+                Assert.assertFalse(Flag.contains(flags, Flag.values()[i]));
+        }
+    }
+
+    @Test
+    public void checkFlagEncoding()
+    {
+        int flags = Flag.none();
+        flags = Flag.add(flags, Flag.COMPRESSED);
+        flags = Flag.add(flags, Flag.TRACING);
+        flags = Flag.add(flags, Flag.USE_BETA);
+
+        Assert.assertEquals(flags, 0x01 | 0x02 | 0x10);
+    }
+
+    @Test
+    public void checkFlagDecoding()
+    {
+        int flags = 0x02 | 0x08 | 0x10;
+        Assert.assertFalse(Flag.contains(flags, Flag.COMPRESSED));
+        Assert.assertTrue(Flag.contains(flags, Flag.TRACING));
+        Assert.assertFalse(Flag.contains(flags, Flag.CUSTOM_PAYLOAD));
+        Assert.assertTrue(Flag.contains(flags, Flag.WARNING));
+        Assert.assertTrue(Flag.contains(flags, Flag.USE_BETA));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to