Repository: cassandra
Updated Branches:
  refs/heads/trunk 1e2f5244e -> 5b645de13


Log the server-generated timestamp and nowInSeconds used by queries in FQL

patch by Aleksey Yeschenko; reviewed by Marcus Eriksson for
CASSANDRA-14675


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b645de1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b645de1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b645de1

Branch: refs/heads/trunk
Commit: 5b645de13f8bea775d5a979712b3bea910960255
Parents: 1e2f524
Author: Aleksey Yeshchenko <[email protected]>
Authored: Fri Aug 31 14:47:02 2018 +0100
Committer: Aleksey Yeshchenko <[email protected]>
Committed: Fri Aug 31 19:42:23 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/audit/AuditLogEntry.java   |   5 +
 .../apache/cassandra/audit/AuditLogManager.java |  11 +-
 .../apache/cassandra/audit/BinAuditLogger.java  |   6 +-
 .../cassandra/audit/BinLogAuditLogger.java      |  93 -------
 .../apache/cassandra/audit/FullQueryLogger.java | 255 ++++++++++++++-----
 .../org/apache/cassandra/cql3/QueryOptions.java |   3 +-
 .../apache/cassandra/service/QueryState.java    |  22 +-
 .../apache/cassandra/tools/fqltool/Dump.java    | 148 +++++++----
 .../transport/messages/BatchMessage.java        |   2 +-
 .../cassandra/audit/FullQueryLoggerTest.java    | 129 ++++++----
 11 files changed, 418 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d2d9c86..9e76586 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Log server-generated timestamp and nowInSeconds used by queries in FQL 
(CASSANDRA-14675)
  * Add diagnostic events for read repairs (CASSANDRA-14668)
  * Use consistent nowInSeconds and timestamps values within a request 
(CASSANDRA-14671)
  * Add sampler for query time and expose with nodetool (CASSANDRA-14436)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/audit/AuditLogEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntry.java 
b/src/java/org/apache/cassandra/audit/AuditLogEntry.java
index 0b891d4..4d3b867 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogEntry.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntry.java
@@ -153,6 +153,11 @@ public class AuditLogEntry
         return options;
     }
 
+    public QueryState getState()
+    {
+        return state;
+    }
+
     public static class Builder
     {
         private static final InetAddressAndPort DEFAULT_SOURCE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/audit/AuditLogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/AuditLogManager.java 
b/src/java/org/apache/cassandra/audit/AuditLogManager.java
index ab9c2e9..25966f7 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogManager.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogManager.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLStatement;
 import org.apache.cassandra.cql3.QueryHandler;
 import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.BatchStatement;
 import org.apache.cassandra.exceptions.AuthenticationException;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.UnauthorizedException;
@@ -187,7 +188,13 @@ public class AuditLogManager
     /**
      * Logs Batch queries to both FQL and standard audit logger.
      */
-    public void logBatch(String batchTypeName, List<Object> queryOrIdList, 
List<List<ByteBuffer>> values, List<QueryHandler.Prepared> prepared, 
QueryOptions options, QueryState state, long queryStartTimeMillis)
+    public void logBatch(BatchStatement.Type type,
+                         List<Object> queryOrIdList,
+                         List<List<ByteBuffer>> values,
+                         List<QueryHandler.Prepared> prepared,
+                         QueryOptions options,
+                         QueryState state,
+                         long queryStartTimeMillis)
     {
         if (isAuditingEnabled())
         {
@@ -205,7 +212,7 @@ public class AuditLogManager
             {
                 queryStrings.add(prepStatment.rawCQLStatement);
             }
-            fullQueryLogger.logBatch(batchTypeName, queryStrings, values, 
options, queryStartTimeMillis);
+            fullQueryLogger.logBatch(type, queryStrings, values, options, 
state, queryStartTimeMillis);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/audit/BinAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/BinAuditLogger.java 
b/src/java/org/apache/cassandra/audit/BinAuditLogger.java
index 89b764c..3ac9499 100644
--- a/src/java/org/apache/cassandra/audit/BinAuditLogger.java
+++ b/src/java/org/apache/cassandra/audit/BinAuditLogger.java
@@ -51,14 +51,14 @@ public class BinAuditLogger extends BinLogAuditLogger 
implements IAuditLogger
             return;
         }
 
-        super.logRecord(new 
WeighableMarshallableMessage(auditLogEntry.getLogString()), binLog);
+        super.logRecord(new Message(auditLogEntry.getLogString()), binLog);
     }
 
-    static class WeighableMarshallableMessage extends 
BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable
+    static class Message extends BinLog.ReleaseableWriteMarshallable 
implements WeightedQueue.Weighable
     {
         private final String message;
 
-        WeighableMarshallableMessage(String message)
+        Message(String message)
         {
             this.message = message;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java 
b/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
index a2426b9..7534650 100644
--- a/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
+++ b/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java
@@ -19,56 +19,25 @@
 package org.apache.cassandra.audit;
 
 import java.io.File;
-import java.nio.ByteBuffer;
 import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
-import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-import net.openhft.chronicle.bytes.BytesStore;
 import net.openhft.chronicle.queue.RollCycles;
-import net.openhft.chronicle.wire.WireOut;
-import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.transport.CBUtil;
-import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.binlog.BinLog;
-import org.apache.cassandra.utils.concurrent.WeightedQueue;
-import org.github.jamm.MemoryLayoutSpecification;
 
 abstract class BinLogAuditLogger implements IAuditLogger
 {
-    static final int EMPTY_BYTEBUFFER_SIZE = 
Ints.checkedCast(ObjectSizes.sizeOnHeapExcludingData(ByteBuffer.allocate(0)));
-    static final int EMPTY_LIST_SIZE = 
Ints.checkedCast(ObjectSizes.measureDeep(new ArrayList(0)));
-    private static final int EMPTY_BYTEBUF_SIZE;
-    private static final int OBJECT_HEADER_SIZE = 
MemoryLayoutSpecification.SPEC.getObjectHeaderSize();
-    static
-    {
-        int tempSize = 0;
-        ByteBuf buf = CBUtil.allocator.buffer(0, 0);
-        try
-        {
-            tempSize = Ints.checkedCast(ObjectSizes.measure(buf));
-        }
-        finally
-        {
-            buf.release();
-        }
-        EMPTY_BYTEBUF_SIZE = tempSize;
-    }
-
     protected static final Logger logger = 
LoggerFactory.getLogger(BinLogAuditLogger.class);
     private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
     private static final NoSpamLogger.NoSpamLogStatement 
droppedSamplesStatement = noSpamLogger.getStatement("Dropped {} binary log 
samples", 1, TimeUnit.MINUTES);
@@ -290,68 +259,6 @@ abstract class BinLogAuditLogger implements IAuditLogger
         }
     }
 
-    protected static abstract class AbstractWeighableMarshallable extends 
BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable
-    {
-        private final ByteBuf queryOptionsBuffer;
-        private final long timeMillis;
-        private final int protocolVersion;
-
-        AbstractWeighableMarshallable(QueryOptions queryOptions, long 
timeMillis)
-        {
-            this.timeMillis = timeMillis;
-            ProtocolVersion version = queryOptions.getProtocolVersion();
-            this.protocolVersion = version.asInt();
-            int optionsSize = QueryOptions.codec.encodedSize(queryOptions, 
version);
-            queryOptionsBuffer = CBUtil.allocator.buffer(optionsSize, 
optionsSize);
-            /*
-             * Struggled with what tradeoff to make in terms of query options 
which is potentially large and complicated
-             * There is tension between low garbage production (or allocator 
overhead), small working set size, and CPU overhead reserializing the
-             * query options into binary format.
-             *
-             * I went with the lowest risk most predictable option which is 
allocator overhead and CPU overhead
-             * rather then keep the original query message around so I could 
just serialize that as a memcpy. It's more
-             * instructions when turned on, but it doesn't change memory 
footprint quite as much and it's more pay for what you use
-             * in terms of query volume. The CPU overhead is spread out across 
producers so we should at least get
-             * some scaling.
-             *
-             */
-            boolean success = false;
-            try
-            {
-                QueryOptions.codec.encode(queryOptions, queryOptionsBuffer, 
version);
-                success = true;
-            }
-            finally
-            {
-                if (!success)
-                {
-                    queryOptionsBuffer.release();
-                }
-            }
-        }
-
-        @Override
-        public void writeMarshallable(WireOut wire)
-        {
-            wire.write("protocol-version").int32(protocolVersion);
-            
wire.write("query-options").bytes(BytesStore.wrap(queryOptionsBuffer.nioBuffer()));
-            wire.write("query-time").int64(timeMillis);
-        }
-
-        @Override
-        public void release()
-        {
-            queryOptionsBuffer.release();
-        }
-
-        //8-bytes for protocol version (assume alignment cost), 8-byte 
timestamp, 8-byte object header + other contents
-        @Override
-        public int weight()
-        {
-            return 8 + 8 + OBJECT_HEADER_SIZE + EMPTY_BYTEBUF_SIZE + 
queryOptionsBuffer.capacity();
-        }
-    }
-
     private static Throwable cleanDirectory(File directory, Throwable 
accumulate)
     {
         if (!directory.exists())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/audit/FullQueryLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/audit/FullQueryLogger.java 
b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
index 36d0127..8cd8f4a 100644
--- a/src/java/org/apache/cassandra/audit/FullQueryLogger.java
+++ b/src/java/org/apache/cassandra/audit/FullQueryLogger.java
@@ -15,31 +15,78 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.audit;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Ints;
 
+import io.netty.buffer.ByteBuf;
 import net.openhft.chronicle.bytes.BytesStore;
 import net.openhft.chronicle.wire.ValueOut;
 import net.openhft.chronicle.wire.WireOut;
 import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.binlog.BinLog;
+import org.apache.cassandra.utils.concurrent.WeightedQueue;
+import org.github.jamm.MemoryLayoutSpecification;
 
 /**
  * A logger that logs entire query contents after the query finishes (or times 
out).
  */
 public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger
 {
+    public static final long CURRENT_VERSION = 0; // encode a dummy version, 
to prevent pain in decoding in the future
+
+    public static final String VERSION = "version";
+    public static final String TYPE = "type";
+
+    public static final String PROTOCOL_VERSION = "protocol-version";
+    public static final String QUERY_OPTIONS = "query-options";
+    public static final String QUERY_START_TIME = "query-start-time";
+
+    public static final String GENERATED_TIMESTAMP = "generated-timestamp";
+    public static final String GENERATED_NOW_IN_SECONDS = 
"generated-now-in-seconds";
+
+    public static final String BATCH = "batch";
+    public static final String SINGLE_QUERY = "single-query";
+
+    public static final String QUERY = "query";
+    public static final String BATCH_TYPE = "batch-type";
+    public static final String QUERIES = "queries";
+    public static final String VALUES = "values";
+
+    private static final int EMPTY_BYTEBUFFER_SIZE = 
Ints.checkedCast(ObjectSizes.sizeOnHeapExcludingData(ByteBuffer.allocate(0)));
+
+    private static final int EMPTY_LIST_SIZE = 
Ints.checkedCast(ObjectSizes.measureDeep(new ArrayList(0)));
+    private static final int EMPTY_BYTEBUF_SIZE;
+
+    private static final int OBJECT_HEADER_SIZE = 
MemoryLayoutSpecification.SPEC.getObjectHeaderSize();
+    private static final int OBJECT_REFERENCE_SIZE = 
MemoryLayoutSpecification.SPEC.getReferenceSize();
+
+    static
+    {
+        ByteBuf buf = CBUtil.allocator.buffer(0, 0);
+        try
+        {
+            EMPTY_BYTEBUF_SIZE = Ints.checkedCast(ObjectSizes.measure(buf));
+        }
+        finally
+        {
+            buf.release();
+        }
+    }
+
     @Override
     public void log(AuditLogEntry entry)
     {
-        logQuery(entry.getOperation(), entry.getOptions(), 
entry.getTimestamp());
+        logQuery(entry.getOperation(), entry.getOptions(), entry.getState(), 
entry.getTimestamp());
     }
 
     /**
@@ -48,14 +95,21 @@ public class FullQueryLogger extends BinLogAuditLogger 
implements IAuditLogger
      * @param queries CQL text of the queries
      * @param values Values to bind to as parameters for the queries
      * @param queryOptions Options associated with the query invocation
+     * @param queryState Timestamp state associated with the query invocation
      * @param batchTimeMillis Approximate time in milliseconds since the epoch 
since the batch was invoked
      */
-    void logBatch(String type, List<String> queries, List<List<ByteBuffer>> 
values, QueryOptions queryOptions, long batchTimeMillis)
+    void logBatch(BatchStatement.Type type,
+                  List<String> queries,
+                  List<List<ByteBuffer>> values,
+                  QueryOptions queryOptions,
+                  QueryState queryState,
+                  long batchTimeMillis)
     {
         Preconditions.checkNotNull(type, "type was null");
         Preconditions.checkNotNull(queries, "queries was null");
         Preconditions.checkNotNull(values, "value was null");
         Preconditions.checkNotNull(queryOptions, "queryOptions was null");
+        Preconditions.checkNotNull(queryState, "queryState was null");
         Preconditions.checkArgument(batchTimeMillis > 0, "batchTimeMillis must 
be > 0");
 
         //Don't construct the wrapper if the log is disabled
@@ -65,7 +119,7 @@ public class FullQueryLogger extends BinLogAuditLogger 
implements IAuditLogger
             return;
         }
 
-        WeighableMarshallableBatch wrappedBatch = new 
WeighableMarshallableBatch(type, queries, values, queryOptions, 
batchTimeMillis);
+        Batch wrappedBatch = new Batch(type, queries, values, queryOptions, 
queryState, batchTimeMillis);
         logRecord(wrappedBatch, binLog);
     }
 
@@ -73,12 +127,14 @@ public class FullQueryLogger extends BinLogAuditLogger 
implements IAuditLogger
      * Log a single CQL query
      * @param query CQL query text
      * @param queryOptions Options associated with the query invocation
+     * @param queryState Timestamp state associated with the query invocation
      * @param queryTimeMillis Approximate time in milliseconds since the epoch 
since the batch was invoked
      */
-    void logQuery(String query, QueryOptions queryOptions, long 
queryTimeMillis)
+    void logQuery(String query, QueryOptions queryOptions, QueryState 
queryState, long queryTimeMillis)
     {
         Preconditions.checkNotNull(query, "query was null");
         Preconditions.checkNotNull(queryOptions, "queryOptions was null");
+        Preconditions.checkNotNull(queryState, "queryState was null");
         Preconditions.checkArgument(queryTimeMillis > 0, "queryTimeMillis must 
be > 0");
 
         //Don't construct the wrapper if the log is disabled
@@ -88,70 +144,98 @@ public class FullQueryLogger extends BinLogAuditLogger 
implements IAuditLogger
             return;
         }
 
-        WeighableMarshallableQuery wrappedQuery = new 
WeighableMarshallableQuery(query, queryOptions, queryTimeMillis);
+        Query wrappedQuery = new Query(query, queryOptions, queryState, 
queryTimeMillis);
         logRecord(wrappedQuery, binLog);
     }
 
-    static class WeighableMarshallableBatch extends 
AbstractWeighableMarshallable
+    static class Query extends AbstractLogEntry
+    {
+        private final String query;
+
+        public Query(String query, QueryOptions queryOptions, QueryState 
queryState, long queryStartTime)
+        {
+            super(queryOptions, queryState, queryStartTime);
+            this.query = query;
+        }
+
+        @Override
+        protected String type()
+        {
+            return SINGLE_QUERY;
+        }
+
+        @Override
+        public void writeMarshallable(WireOut wire)
+        {
+            super.writeMarshallable(wire);
+            wire.write(QUERY).text(query);
+        }
+
+        @Override
+        public int weight()
+        {
+            return Ints.checkedCast(ObjectSizes.sizeOf(query)) + 
super.weight();
+        }
+    }
+
+    static class Batch extends AbstractLogEntry
     {
         private final int weight;
-        private final String batchType;
+        private final BatchStatement.Type batchType;
         private final List<String> queries;
         private final List<List<ByteBuffer>> values;
 
-        public WeighableMarshallableBatch(String batchType, List<String> 
queries, List<List<ByteBuffer>> values, QueryOptions queryOptions, long 
batchTimeMillis)
-        {
-           super(queryOptions, batchTimeMillis);
-           this.queries = queries;
-           this.values = values;
-           this.batchType = batchType;
-           boolean success = false;
-           try
-           {
-               //weight, batch type, queries, values
-               int weightTemp = 8 + EMPTY_LIST_SIZE + EMPTY_LIST_SIZE;
-               for (int ii = 0; ii < queries.size(); ii++)
-               {
-                   weightTemp += ObjectSizes.sizeOf(queries.get(ii));
-               }
-
-               weightTemp += EMPTY_LIST_SIZE * values.size();
-               for (int ii = 0; ii < values.size(); ii++)
-               {
-                   List<ByteBuffer> sublist = values.get(ii);
-                   weightTemp += EMPTY_BYTEBUFFER_SIZE * sublist.size();
-                   for (int zz = 0; zz < sublist.size(); zz++)
-                   {
-                       weightTemp += sublist.get(zz).capacity();
-                   }
-               }
-               weightTemp += super.weight();
-               weightTemp += ObjectSizes.sizeOf(batchType);
-               weight = weightTemp;
-               success = true;
-           }
-           finally
-           {
-               if (!success)
-               {
-                   release();
-               }
-           }
+        public Batch(BatchStatement.Type batchType,
+                     List<String> queries,
+                     List<List<ByteBuffer>> values,
+                     QueryOptions queryOptions,
+                     QueryState queryState,
+                     long batchTimeMillis)
+        {
+            super(queryOptions, queryState, batchTimeMillis);
+
+            this.queries = queries;
+            this.values = values;
+            this.batchType = batchType;
+
+            int weight = super.weight();
+
+            // weight, queries, values, batch type
+            weight += 4 +                    // cached weight
+                      2 * EMPTY_LIST_SIZE +  // queries + values lists
+                      OBJECT_REFERENCE_SIZE; // batchType reference, worst case
+
+            for (String query : queries)
+                weight += ObjectSizes.sizeOf(query);
+
+            for (List<ByteBuffer> subValues : values)
+            {
+                weight += EMPTY_LIST_SIZE;
+                for (ByteBuffer value : subValues)
+                    weight += EMPTY_BYTEBUFFER_SIZE + value.capacity();
+            }
+
+            this.weight = weight;
+        }
+
+        @Override
+        protected String type()
+        {
+            return BATCH;
         }
 
         @Override
         public void writeMarshallable(WireOut wire)
         {
-            wire.write("type").text("batch");
             super.writeMarshallable(wire);
-            wire.write("batch-type").text(batchType);
-            ValueOut valueOut = wire.write("queries");
+            wire.write(BATCH_TYPE).text(batchType.name());
+            ValueOut valueOut = wire.write(QUERIES);
             valueOut.int32(queries.size());
             for (String query : queries)
             {
                 valueOut.text(query);
             }
-            valueOut = wire.write("values");
+            valueOut = wire.write(VALUES);
             valueOut.int32(values.size());
             for (List<ByteBuffer> subValues : values)
             {
@@ -170,28 +254,81 @@ public class FullQueryLogger extends BinLogAuditLogger 
implements IAuditLogger
         }
     }
 
-    static class WeighableMarshallableQuery extends 
AbstractWeighableMarshallable
+    private static abstract class AbstractLogEntry extends 
BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable
     {
-        private final String query;
+        private final long queryStartTime;
+        private final int protocolVersion;
+        private final ByteBuf queryOptionsBuffer;
 
-        public WeighableMarshallableQuery(String query, QueryOptions 
queryOptions, long queryTimeMillis)
+        private final long generatedTimestamp;
+        private final int generatedNowInSeconds;
+
+        AbstractLogEntry(QueryOptions queryOptions, QueryState queryState, 
long queryStartTime)
         {
-            super(queryOptions, queryTimeMillis);
-            this.query = query;
+            this.queryStartTime = queryStartTime;
+
+            this.protocolVersion = queryOptions.getProtocolVersion().asInt();
+            int optionsSize = QueryOptions.codec.encodedSize(queryOptions, 
queryOptions.getProtocolVersion());
+            queryOptionsBuffer = CBUtil.allocator.buffer(optionsSize, 
optionsSize);
+
+            this.generatedTimestamp = queryState.generatedTimestamp();
+            this.generatedNowInSeconds = queryState.generatedNowInSeconds();
+
+            /*
+             * Struggled with what tradeoff to make in terms of query options 
which is potentially large and complicated
+             * There is tension between low garbage production (or allocator 
overhead), small working set size, and CPU overhead reserializing the
+             * query options into binary format.
+             *
+             * I went with the lowest risk most predictable option which is 
allocator overhead and CPU overhead
+             * rather then keep the original query message around so I could 
just serialize that as a memcpy. It's more
+             * instructions when turned on, but it doesn't change memory 
footprint quite as much and it's more pay for what you use
+             * in terms of query volume. The CPU overhead is spread out across 
producers so we should at least get
+             * some scaling.
+             *
+             */
+            try
+            {
+                QueryOptions.codec.encode(queryOptions, queryOptionsBuffer, 
queryOptions.getProtocolVersion());
+            }
+            catch (Throwable e)
+            {
+                queryOptionsBuffer.release();
+                throw e;
+            }
         }
 
         @Override
         public void writeMarshallable(WireOut wire)
         {
-            wire.write("type").text("single");
-            super.writeMarshallable(wire);
-            wire.write("query").text(query);
+            wire.write(VERSION).int16(CURRENT_VERSION);
+            wire.write(TYPE).text(type());
+
+            wire.write(QUERY_START_TIME).int64(queryStartTime);
+            wire.write(PROTOCOL_VERSION).int32(protocolVersion);
+            
wire.write(QUERY_OPTIONS).bytes(BytesStore.wrap(queryOptionsBuffer.nioBuffer()));
+
+            wire.write(GENERATED_TIMESTAMP).int64(generatedTimestamp);
+            wire.write(GENERATED_NOW_IN_SECONDS).int32(generatedNowInSeconds);
+        }
+
+        @Override
+        public void release()
+        {
+            queryOptionsBuffer.release();
         }
 
         @Override
         public int weight()
         {
-            return Ints.checkedCast(ObjectSizes.sizeOf(query)) + 
super.weight();
+            return OBJECT_HEADER_SIZE
+                 + 8                                                  // 
queryStartTime
+                 + 4                                                  // 
protocolVersion
+                 + EMPTY_BYTEBUF_SIZE + queryOptionsBuffer.capacity() // 
queryOptionsBuffer
+                 + 8                                                  // 
generatedTimestamp
+                 + 4;                                                 // 
generatedNowInSeconds
         }
+
+        protected abstract String type();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/cql3/QueryOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java 
b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index f76d6b2..84adf80 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -34,7 +34,6 @@ import org.apache.cassandra.transport.CBCodec;
 import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.transport.ProtocolException;
 import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
@@ -45,7 +44,7 @@ import org.apache.commons.lang3.builder.ToStringStyle;
 public abstract class QueryOptions
 {
     public static final QueryOptions DEFAULT = new 
DefaultQueryOptions(ConsistencyLevel.ONE,
-                                                                       
Collections.<ByteBuffer>emptyList(),
+                                                                       
Collections.emptyList(),
                                                                        false,
                                                                        
SpecificOptions.DEFAULT,
                                                                        
ProtocolVersion.CURRENT);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java 
b/src/java/org/apache/cassandra/service/QueryState.java
index d1b03d4..2bd07ab 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -40,6 +40,14 @@ public class QueryState
     }
 
     /**
+     * @return a QueryState object for internal C* calls (not limited by any 
kind of auth).
+     */
+    public static QueryState forInternalCalls()
+    {
+        return new QueryState(ClientState.forInternalCalls());
+    }
+
+    /**
      * Generate, cache, and record a timestamp value on the server-side.
      *
      * Used in reads for all live and expiring cells, and all kinds of 
deletion infos.
@@ -75,11 +83,19 @@ public class QueryState
     }
 
     /**
-     * @return a QueryState object for internal C* calls (not limited by any 
kind of auth).
+     * @return server-generated timestamp value, if one had been requested, or 
Long.MIN_VALUE otherwise
      */
-    public static QueryState forInternalCalls()
+    public long generatedTimestamp()
     {
-        return new QueryState(ClientState.forInternalCalls());
+        return timestamp;
+    }
+
+    /**
+     * @return server-generated nowInSeconds value, if one had been requested, 
or Integer.MIN_VALUE otherwise
+     */
+    public int generatedNowInSeconds()
+    {
+        return nowInSeconds;
     }
 
     public ClientState getClientState()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/tools/fqltool/Dump.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/fqltool/Dump.java 
b/src/java/org/apache/cassandra/tools/fqltool/Dump.java
index 52eadb5..a8e7592 100644
--- a/src/java/org/apache/cassandra/tools/fqltool/Dump.java
+++ b/src/java/org/apache/cassandra/tools/fqltool/Dump.java
@@ -38,6 +38,8 @@ import net.openhft.chronicle.queue.RollCycles;
 import net.openhft.chronicle.threads.Pauser;
 import net.openhft.chronicle.wire.ReadMarshallable;
 import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireIn;
+import org.apache.cassandra.audit.FullQueryLogger;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.transport.ProtocolVersion;
 
@@ -67,59 +69,57 @@ public class Dump implements Runnable
     public static void dump(List<String> arguments, String rollCycle, boolean 
follow)
     {
         StringBuilder sb = new StringBuilder();
-        ReadMarshallable reader = wireIn -> {
+        ReadMarshallable reader = wireIn ->
+        {
             sb.setLength(0);
-            String type = wireIn.read("type").text();
-            sb.append("Type: ").append(type).append(System.lineSeparator());
-            assert type != null;
-            if (type.equals("AuditLog"))
-            {
-                sb.append("LogMessage: 
").append(wireIn.read("message").text()).append(System.lineSeparator());
 
-            }
-            else
+            int version = wireIn.read(FullQueryLogger.VERSION).int16();
+            if (version != FullQueryLogger.CURRENT_VERSION)
+                throw new UnsupportedOperationException("Full query log of 
unexpected version " + version + " encountered");
+
+            String type = wireIn.read(FullQueryLogger.TYPE).text();
+            sb.append("Type: ")
+              .append(type)
+              .append(System.lineSeparator());
+
+            long queryStartTime = 
wireIn.read(FullQueryLogger.QUERY_START_TIME).int64();
+            sb.append("Query start time: ")
+              .append(queryStartTime)
+              .append(System.lineSeparator());
+
+            int protocolVersion = 
wireIn.read(FullQueryLogger.PROTOCOL_VERSION).int32();
+            sb.append("Protocol version: ")
+              .append(protocolVersion)
+              .append(System.lineSeparator());
+
+            QueryOptions options =
+                
QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(FullQueryLogger.QUERY_OPTIONS).bytes()),
+                                          
ProtocolVersion.decode(protocolVersion));
+
+            long generatedTimestamp = 
wireIn.read(FullQueryLogger.GENERATED_TIMESTAMP).int64();
+            sb.append("Generated timestamp:")
+              .append(generatedTimestamp)
+              .append(System.lineSeparator());
+
+            int generatedNowInSeconds = 
wireIn.read(FullQueryLogger.GENERATED_NOW_IN_SECONDS).int32();
+            sb.append("Generated nowInSeconds:")
+              .append(generatedNowInSeconds)
+              .append(System.lineSeparator());
+
+            switch (type)
             {
-                int protocolVersion = wireIn.read("protocol-version").int32();
-                sb.append("Protocol version: 
").append(protocolVersion).append(System.lineSeparator());
-                QueryOptions options = 
QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read("query-options").bytes()),
 ProtocolVersion.decode(protocolVersion));
-                sb.append("Query time: 
").append(wireIn.read("query-time").int64()).append(System.lineSeparator());
+                case (FullQueryLogger.SINGLE_QUERY):
+                    dumpQuery(options, wireIn, sb);
+                    break;
 
-                if (type.equals("single"))
-                {
-                    sb.append("Query: 
").append(wireIn.read("query").text()).append(System.lineSeparator());
-                    List<ByteBuffer> values = options.getValues() != null ? 
options.getValues() : Collections.EMPTY_LIST;
-                    sb.append("Values: ").append(System.lineSeparator());
-                    valuesToStringBuilder(values, sb);
-                }
-                else
-                {
-                    sb.append("Batch type: 
").append(wireIn.read("batch-type").text()).append(System.lineSeparator());
-                    ValueIn in = wireIn.read("queries");
-                    int numQueries = in.int32();
-                    List<String> queries = new ArrayList<>();
-                    for (int ii = 0; ii < numQueries; ii++)
-                    {
-                        queries.add(in.text());
-                    }
-                    in = wireIn.read("values");
-                    int numValues = in.int32();
-                    List<List<ByteBuffer>> values = new ArrayList<>();
-                    for (int ii = 0; ii < numValues; ii++)
-                    {
-                        List<ByteBuffer> subValues = new ArrayList<>();
-                        values.add(subValues);
-                        int numSubValues = in.int32();
-                        for (int zz = 0; zz < numSubValues; zz++)
-                        {
-                            subValues.add(ByteBuffer.wrap(in.bytes()));
-                        }
-                        sb.append("Query: 
").append(queries.get(ii)).append(System.lineSeparator());
-                        sb.append("Values: ").append(System.lineSeparator());
-                        valuesToStringBuilder(subValues, sb);
-                    }
-                }
+                case (FullQueryLogger.BATCH):
+                    dumpBatch(options, wireIn, sb);
+                    break;
+
+                default:
+                    throw new UnsupportedOperationException("Log entry of 
unsupported type " + type);
             }
-            sb.append(System.lineSeparator());
+
             System.out.print(sb.toString());
             System.out.flush();
         };
@@ -153,7 +153,57 @@ public class Dump implements Runnable
         }
     }
 
-    private static void valuesToStringBuilder(List<ByteBuffer> values, 
StringBuilder sb)
+    private static void dumpQuery(QueryOptions options, WireIn wireIn, 
StringBuilder sb)
+    {
+        sb.append("Query: ")
+          .append(wireIn.read(FullQueryLogger.QUERY).text())
+          .append(System.lineSeparator());
+
+        List<ByteBuffer> values = options.getValues() != null
+                                ? options.getValues()
+                                : Collections.emptyList();
+
+        sb.append("Values: ")
+          .append(System.lineSeparator());
+        appendValuesToStringBuilder(values, sb);
+        sb.append(System.lineSeparator());
+    }
+
+    private static void dumpBatch(QueryOptions options, WireIn wireIn, 
StringBuilder sb)
+    {
+        sb.append("Batch type: ")
+          .append(wireIn.read(FullQueryLogger.BATCH_TYPE).text())
+          .append(System.lineSeparator());
+
+        ValueIn in = wireIn.read(FullQueryLogger.QUERIES);
+        int numQueries = in.int32();
+        List<String> queries = new ArrayList<>(numQueries);
+        for (int i = 0; i < numQueries; i++)
+            queries.add(in.text());
+
+        in = wireIn.read(FullQueryLogger.VALUES);
+        int numValues = in.int32();
+
+        for (int i = 0; i < numValues; i++)
+        {
+            int numSubValues = in.int32();
+            List<ByteBuffer> subValues = new ArrayList<>(numSubValues);
+            for (int j = 0; j < numSubValues; j++)
+                subValues.add(ByteBuffer.wrap(in.bytes()));
+
+            sb.append("Query: ")
+              .append(queries.get(i))
+              .append(System.lineSeparator());
+
+            sb.append("Values: ")
+              .append(System.lineSeparator());
+            appendValuesToStringBuilder(subValues, sb);
+        }
+
+        sb.append(System.lineSeparator());
+    }
+
+    private static void appendValuesToStringBuilder(List<ByteBuffer> values, 
StringBuilder sb)
     {
         boolean first = true;
         for (ByteBuffer value : values)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java 
b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index b379918..a85bec0 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -221,7 +221,7 @@ public class BatchMessage extends Message.Request
             Message.Response response = handler.processBatch(batch, state, 
batchOptions, getCustomPayload(), queryStartNanoTime);
 
             if (auditLogManager.isLoggingEnabled())
-                auditLogManager.logBatch(batchType.name(), queryOrIdList, 
values, prepared, options, state, fqlTime);
+                auditLogManager.logBatch(batchType, queryOrIdList, values, 
prepared, options, state, fqlTime);
 
             return response;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java 
b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
index 0cc8094..542f8bb 100644
--- a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
+++ b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java
@@ -15,15 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.audit;
 
-
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Semaphore;
@@ -44,9 +43,12 @@ import net.openhft.chronicle.wire.WireOut;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.audit.FullQueryLogger.WeighableMarshallableQuery;
-import org.apache.cassandra.audit.FullQueryLogger.WeighableMarshallableBatch;
+import org.apache.cassandra.audit.FullQueryLogger.Query;
+import org.apache.cassandra.audit.FullQueryLogger.Batch;
+import org.apache.cassandra.cql3.statements.BatchStatement.Type;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.binlog.BinLogTest;
@@ -55,6 +57,20 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import static org.apache.cassandra.audit.FullQueryLogger.BATCH;
+import static org.apache.cassandra.audit.FullQueryLogger.BATCH_TYPE;
+import static 
org.apache.cassandra.audit.FullQueryLogger.GENERATED_NOW_IN_SECONDS;
+import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_TIMESTAMP;
+import static org.apache.cassandra.audit.FullQueryLogger.PROTOCOL_VERSION;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERIES;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS;
+import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME;
+import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY;
+import static org.apache.cassandra.audit.FullQueryLogger.TYPE;
+import static org.apache.cassandra.audit.FullQueryLogger.VALUES;
+import static org.apache.cassandra.audit.FullQueryLogger.VERSION;
+
 public class FullQueryLoggerTest extends CQLTester
 {
     private static Path tempDir;
@@ -250,7 +266,7 @@ public class FullQueryLoggerTest extends CQLTester
         {
             //Find out when the bin log thread has been blocked, necessary to 
not run into batch task drain behavior
             Semaphore binLogBlocked = new Semaphore(0);
-            instance.binLog.put(new WeighableMarshallableQuery("foo1", 
QueryOptions.DEFAULT, 1)
+            instance.binLog.put(new Query("foo1", QueryOptions.DEFAULT, 
queryState(), 1)
             {
 
                 public void writeMarshallable(WireOut wire)
@@ -332,7 +348,7 @@ public class FullQueryLoggerTest extends CQLTester
         {
             //Find out when the bin log thread has been blocked, necessary to 
not run into batch task drain behavior
             Semaphore binLogBlocked = new Semaphore(0);
-            instance.binLog.put(new WeighableMarshallableQuery("foo1", 
QueryOptions.DEFAULT, 1)
+            instance.binLog.put(new Query("foo1", QueryOptions.DEFAULT, 
queryState(), 1)
             {
 
                 public void writeMarshallable(WireOut wire)
@@ -366,7 +382,7 @@ public class FullQueryLoggerTest extends CQLTester
             //This sample should get dropped AKA released without being written
             AtomicInteger releasedCount = new AtomicInteger(0);
             AtomicInteger writtenCount = new AtomicInteger(0);
-            instance.logRecord(new WeighableMarshallableQuery("foo3", 
QueryOptions.DEFAULT, 1) {
+            instance.logRecord(new Query("foo3", QueryOptions.DEFAULT, 
queryState(), 1) {
                 public void writeMarshallable(WireOut wire)
                 {
                     writtenCount.incrementAndGet();
@@ -403,14 +419,20 @@ public class FullQueryLoggerTest extends CQLTester
         try (ChronicleQueue queue = 
ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
         {
             ExcerptTailer tailer = queue.createTailer();
-            assertTrue(tailer.readDocument(wire -> {
-                assertEquals("single", wire.read("type").text());
-                ProtocolVersion protocolVersion = 
ProtocolVersion.decode(wire.read("protocol-version").int32());
+            assertTrue(tailer.readDocument(wire ->
+            {
+                assertEquals(0, wire.read(VERSION).int16());
+                assertEquals(SINGLE_QUERY, wire.read(TYPE).text());
+
+                assertEquals(1L, wire.read(QUERY_START_TIME).int64());
+
+                ProtocolVersion protocolVersion = 
ProtocolVersion.decode(wire.read(PROTOCOL_VERSION).int32());
                 assertEquals(ProtocolVersion.CURRENT, protocolVersion);
-                QueryOptions queryOptions = 
QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()),
 protocolVersion);
+
+                QueryOptions queryOptions = 
QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read(QUERY_OPTIONS).bytes()),
 protocolVersion);
                 compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
-                assertEquals(1L, wire.read("query-time").int64());
-                assertEquals("foo", wire.read("query").text());
+
+                assertEquals("foo", wire.read(QUERY).text());
             }));
         }
     }
@@ -419,7 +441,15 @@ public class FullQueryLoggerTest extends CQLTester
     public void testRoundTripBatch() throws Exception
     {
         configureFQL();
-        instance.logBatch("UNLOGGED", Arrays.asList("foo1", "foo2"), 
Arrays.asList(Arrays.asList(ByteBuffer.allocate(1) , 
ByteBuffer.allocateDirect(2)), Arrays.asList()), QueryOptions.DEFAULT, 1);
+        instance.logBatch(Type.UNLOGGED,
+                          Arrays.asList("foo1", "foo2"),
+                          Arrays.asList(Arrays.asList(ByteBuffer.allocate(1),
+                                        ByteBuffer.allocateDirect(2)),
+                                        Collections.emptyList()),
+                          QueryOptions.DEFAULT,
+                          queryState(),
+                          1);
+
         Util.spinAssertEquals(true, () ->
         {
             try (ChronicleQueue queue = 
ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
@@ -427,22 +457,31 @@ public class FullQueryLoggerTest extends CQLTester
                 return queue.createTailer().readingDocument().isPresent();
             }
         }, 60);
+
         try (ChronicleQueue queue = 
ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build())
         {
             ExcerptTailer tailer = queue.createTailer();
             assertTrue(tailer.readDocument(wire -> {
-                assertEquals("batch", wire.read("type").text());
-                ProtocolVersion protocolVersion = 
ProtocolVersion.decode(wire.read("protocol-version").int32());
+                assertEquals(0, wire.read(VERSION).int16());
+                assertEquals(BATCH, wire.read(TYPE).text());
+
+                assertEquals(1L, wire.read(QUERY_START_TIME).int64());
+
+                ProtocolVersion protocolVersion = 
ProtocolVersion.decode(wire.read(PROTOCOL_VERSION).int32());
                 assertEquals(ProtocolVersion.CURRENT, protocolVersion);
-                QueryOptions queryOptions = 
QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()),
 protocolVersion);
-                assertEquals(1L, wire.read("query-time").int64());
+
+                QueryOptions queryOptions = 
QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read(QUERY_OPTIONS).bytes()),
 protocolVersion);
                 compareQueryOptions(QueryOptions.DEFAULT, queryOptions);
-                assertEquals("UNLOGGED", wire.read("batch-type").text());
-                ValueIn in = wire.read("queries");
+
+                assertEquals(Long.MIN_VALUE, 
wire.read(GENERATED_TIMESTAMP).int64());
+                assertEquals(Integer.MIN_VALUE, 
wire.read(GENERATED_NOW_IN_SECONDS).int32());
+
+                assertEquals("UNLOGGED", wire.read(BATCH_TYPE).text());
+                ValueIn in = wire.read(QUERIES);
                 assertEquals(2, in.int32());
                 assertEquals("foo1", in.text());
                 assertEquals("foo2", in.text());
-                in = wire.read("values");
+                in = wire.read(VALUES);
                 assertEquals(2, in.int32());
                 assertEquals(2, in.int32());
                 assertTrue(Arrays.equals(new byte[1], in.bytes()));
@@ -456,7 +495,7 @@ public class FullQueryLoggerTest extends CQLTester
     public void testQueryWeight()
     {
         //Empty query should have some weight
-        WeighableMarshallableQuery query = new WeighableMarshallableQuery("", 
QueryOptions.DEFAULT, 1);
+        Query query = new Query("", QueryOptions.DEFAULT, queryState(), 1);
         assertTrue(query.weight() >= 95);
 
         StringBuilder sb = new StringBuilder();
@@ -464,14 +503,14 @@ public class FullQueryLoggerTest extends CQLTester
         {
             sb.append('a');
         }
-        query = new WeighableMarshallableQuery(sb.toString(), 
QueryOptions.DEFAULT, 1);
+        query = new Query(sb.toString(), QueryOptions.DEFAULT, queryState(), 
1);
 
         //A large query should be reflected in the size, * 2 since characters 
are still two bytes
         assertTrue(query.weight() > ObjectSizes.measureDeep(sb.toString()));
 
         //Large query options should be reflected
         QueryOptions largeOptions = 
QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
-        query = new WeighableMarshallableQuery("", largeOptions, 1);
+        query = new Query("", largeOptions, queryState(), 1);
         assertTrue(query.weight() > 1024 * 1024);
         System.out.printf("weight %d%n", query.weight());
     }
@@ -480,8 +519,8 @@ public class FullQueryLoggerTest extends CQLTester
     public void testBatchWeight()
     {
         //An empty batch should have weight
-        WeighableMarshallableBatch batch = new WeighableMarshallableBatch("", 
new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
-        assertTrue(batch.weight() >= 183);
+        Batch batch = new Batch(Type.UNLOGGED, new ArrayList<>(), new 
ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1);
+        assertTrue(batch.weight() > 0);
 
         StringBuilder sb = new StringBuilder();
         for (int ii = 0; ii < 1024 * 1024; ii++)
@@ -489,23 +528,19 @@ public class FullQueryLoggerTest extends CQLTester
             sb.append('a');
         }
 
-        //The weight of the type string should be reflected
-        batch = new WeighableMarshallableBatch(sb.toString(), new 
ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1);
-        assertTrue(batch.weight() > ObjectSizes.measureDeep(sb.toString()));
-
         //The weight of the list containing queries should be reflected
         List<String> bigList = new ArrayList(100000);
         for (int ii = 0; ii < 100000; ii++)
         {
             bigList.add("");
         }
-        batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), 
QueryOptions.DEFAULT, 1);
+        batch = new Batch(Type.UNLOGGED, bigList, new ArrayList<>(), 
QueryOptions.DEFAULT, queryState(), 1);
         assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
 
         //The size of the query should be reflected
         bigList = new ArrayList(1);
         bigList.add(sb.toString());
-        batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), 
QueryOptions.DEFAULT, 1);
+        batch = new Batch(Type.UNLOGGED, bigList, new ArrayList<>(), 
QueryOptions.DEFAULT, queryState(), 1);
         assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList));
 
         bigList = null;
@@ -516,74 +551,74 @@ public class FullQueryLoggerTest extends CQLTester
             bigValues.add(new ArrayList<>(0));
         }
         bigValues.get(0).add(ByteBuffer.allocate(1024 * 1024 * 5));
-        batch = new WeighableMarshallableBatch("", new ArrayList<>(),  
bigValues, QueryOptions.DEFAULT, 1);
+        batch = new Batch(Type.UNLOGGED, new ArrayList<>(), bigValues, 
QueryOptions.DEFAULT, queryState(), 1);
         assertTrue(batch.weight() > ObjectSizes.measureDeep(bigValues));
 
         //As should the size of the values
         QueryOptions largeOptions = 
QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024)));
-        batch = new WeighableMarshallableBatch("", new ArrayList<>(), new 
ArrayList<>(), largeOptions, 1);
+        batch = new Batch(Type.UNLOGGED, new ArrayList<>(), new ArrayList<>(), 
largeOptions, queryState(), 1);
         assertTrue(batch.weight() > 1024 * 1024);
     }
 
     @Test(expected = NullPointerException.class)
     public void testLogBatchNullType() throws Exception
     {
-        instance.logBatch(null, new ArrayList<>(), new ArrayList<>(), 
QueryOptions.DEFAULT, 1);
+        instance.logBatch(null, new ArrayList<>(), new ArrayList<>(), 
QueryOptions.DEFAULT, queryState(), 1);
     }
 
     @Test(expected = NullPointerException.class)
     public void testLogBatchNullQueries() throws Exception
     {
-        instance.logBatch("", null, new ArrayList<>(), QueryOptions.DEFAULT, 
1);
+        instance.logBatch(Type.UNLOGGED, null, new ArrayList<>(), 
QueryOptions.DEFAULT, queryState(), 1);
     }
 
     @Test(expected = NullPointerException.class)
     public void testLogBatchNullQueriesQuery() throws Exception
     {
         configureFQL();
-        instance.logBatch("", Arrays.asList((String)null), new ArrayList<>(), 
QueryOptions.DEFAULT, 1);
+        instance.logBatch(Type.UNLOGGED, Arrays.asList((String)null), new 
ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1);
     }
 
     @Test(expected = NullPointerException.class)
     public void testLogBatchNullValues() throws Exception
     {
-        instance.logBatch("", new ArrayList<>(), null, QueryOptions.DEFAULT, 
1);
+        instance.logBatch(Type.UNLOGGED, new ArrayList<>(), null, 
QueryOptions.DEFAULT, queryState(), 1);
     }
 
     @Test(expected = NullPointerException.class)
     public void testLogBatchNullValuesValue() throws Exception
     {
-        instance.logBatch("", new ArrayList<>(), 
Arrays.asList((List<ByteBuffer>)null), null, 1);
+        instance.logBatch(Type.UNLOGGED, new ArrayList<>(), 
Arrays.asList((List<ByteBuffer>)null), null, queryState(), 1);
     }
 
     @Test(expected = NullPointerException.class)
     public void testLogBatchNullQueryOptions() throws Exception
     {
-        instance.logBatch("", new ArrayList<>(), new ArrayList<>(), null, 1);
+        instance.logBatch(Type.UNLOGGED, new ArrayList<>(), new ArrayList<>(), 
null, queryState(), 1);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testLogBatchNegativeTime() throws Exception
     {
-        instance.logBatch("", new ArrayList<>(), new ArrayList<>(), 
QueryOptions.DEFAULT, -1);
+        instance.logBatch(Type.UNLOGGED, new ArrayList<>(), new ArrayList<>(), 
QueryOptions.DEFAULT, queryState(), -1);
     }
 
     @Test(expected = NullPointerException.class)
     public void testLogQueryNullQuery() throws Exception
     {
-        instance.logQuery(null, QueryOptions.DEFAULT, 1);
+        instance.logQuery(null, QueryOptions.DEFAULT, queryState(), 1);
     }
 
     @Test(expected = NullPointerException.class)
     public void testLogQueryNullQueryOptions() throws Exception
     {
-        instance.logQuery("", null, 1);
+        instance.logQuery("", null, queryState(), 1);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testLogQueryNegativeTime() throws Exception
     {
-        instance.logQuery("", QueryOptions.DEFAULT, -1);
+        instance.logQuery("", QueryOptions.DEFAULT, queryState(), -1);
     }
 
     private static void compareQueryOptions(QueryOptions a, QueryOptions b)
@@ -604,8 +639,12 @@ public class FullQueryLoggerTest extends CQLTester
 
     private void logQuery(String query)
     {
-        instance.logQuery(query, QueryOptions.DEFAULT, 1);
+        instance.logQuery(query, QueryOptions.DEFAULT, queryState(), 1);
     }
 
+    private QueryState queryState()
+    {
+        return new QueryState(ClientState.forInternalCalls());
+    }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to