Repository: cassandra Updated Branches: refs/heads/trunk 1e2f5244e -> 5b645de13
Log the server-generated timestamp and nowInSeconds used by queries in FQL patch by Aleksey Yeschenko; reviewed by Marcus Eriksson for CASSANDRA-14675 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b645de1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b645de1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b645de1 Branch: refs/heads/trunk Commit: 5b645de13f8bea775d5a979712b3bea910960255 Parents: 1e2f524 Author: Aleksey Yeshchenko <[email protected]> Authored: Fri Aug 31 14:47:02 2018 +0100 Committer: Aleksey Yeshchenko <[email protected]> Committed: Fri Aug 31 19:42:23 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/audit/AuditLogEntry.java | 5 + .../apache/cassandra/audit/AuditLogManager.java | 11 +- .../apache/cassandra/audit/BinAuditLogger.java | 6 +- .../cassandra/audit/BinLogAuditLogger.java | 93 ------- .../apache/cassandra/audit/FullQueryLogger.java | 255 ++++++++++++++----- .../org/apache/cassandra/cql3/QueryOptions.java | 3 +- .../apache/cassandra/service/QueryState.java | 22 +- .../apache/cassandra/tools/fqltool/Dump.java | 148 +++++++---- .../transport/messages/BatchMessage.java | 2 +- .../cassandra/audit/FullQueryLoggerTest.java | 129 ++++++---- 11 files changed, 418 insertions(+), 257 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d2d9c86..9e76586 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675) * Add diagnostic events for read repairs (CASSANDRA-14668) * Use consistent nowInSeconds and timestamps values within a request (CASSANDRA-14671) * Add sampler for query time and expose with nodetool (CASSANDRA-14436) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/audit/AuditLogEntry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntry.java b/src/java/org/apache/cassandra/audit/AuditLogEntry.java index 0b891d4..4d3b867 100644 --- a/src/java/org/apache/cassandra/audit/AuditLogEntry.java +++ b/src/java/org/apache/cassandra/audit/AuditLogEntry.java @@ -153,6 +153,11 @@ public class AuditLogEntry return options; } + public QueryState getState() + { + return state; + } + public static class Builder { private static final InetAddressAndPort DEFAULT_SOURCE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/audit/AuditLogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/AuditLogManager.java b/src/java/org/apache/cassandra/audit/AuditLogManager.java index ab9c2e9..25966f7 100644 --- a/src/java/org/apache/cassandra/audit/AuditLogManager.java +++ b/src/java/org/apache/cassandra/audit/AuditLogManager.java @@ -33,6 +33,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.statements.BatchStatement; import org.apache.cassandra.exceptions.AuthenticationException; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.UnauthorizedException; @@ -187,7 +188,13 @@ public class AuditLogManager /** * Logs Batch queries to both FQL and standard audit logger. */ - public void logBatch(String batchTypeName, List<Object> queryOrIdList, List<List<ByteBuffer>> values, List<QueryHandler.Prepared> prepared, QueryOptions options, QueryState state, long queryStartTimeMillis) + public void logBatch(BatchStatement.Type type, + List<Object> queryOrIdList, + List<List<ByteBuffer>> values, + List<QueryHandler.Prepared> prepared, + QueryOptions options, + QueryState state, + long queryStartTimeMillis) { if (isAuditingEnabled()) { @@ -205,7 +212,7 @@ public class AuditLogManager { queryStrings.add(prepStatment.rawCQLStatement); } - fullQueryLogger.logBatch(batchTypeName, queryStrings, values, options, queryStartTimeMillis); + fullQueryLogger.logBatch(type, queryStrings, values, options, state, queryStartTimeMillis); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/audit/BinAuditLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/BinAuditLogger.java b/src/java/org/apache/cassandra/audit/BinAuditLogger.java index 89b764c..3ac9499 100644 --- a/src/java/org/apache/cassandra/audit/BinAuditLogger.java +++ b/src/java/org/apache/cassandra/audit/BinAuditLogger.java @@ -51,14 +51,14 @@ public class BinAuditLogger extends BinLogAuditLogger implements IAuditLogger return; } - super.logRecord(new WeighableMarshallableMessage(auditLogEntry.getLogString()), binLog); + super.logRecord(new Message(auditLogEntry.getLogString()), binLog); } - static class WeighableMarshallableMessage extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable + static class Message extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable { private final String message; - WeighableMarshallableMessage(String message) + Message(String message) { this.message = message; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java b/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java index a2426b9..7534650 100644 --- a/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java +++ b/src/java/org/apache/cassandra/audit/BinLogAuditLogger.java @@ -19,56 +19,25 @@ package org.apache.cassandra.audit; import java.io.File; -import java.nio.ByteBuffer; import java.nio.file.Path; -import java.util.ArrayList; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; -import com.google.common.primitives.Ints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; -import net.openhft.chronicle.bytes.BytesStore; import net.openhft.chronicle.queue.RollCycles; -import net.openhft.chronicle.wire.WireOut; -import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.io.FSError; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.transport.CBUtil; -import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.NoSpamLogger; -import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.binlog.BinLog; -import org.apache.cassandra.utils.concurrent.WeightedQueue; -import org.github.jamm.MemoryLayoutSpecification; abstract class BinLogAuditLogger implements IAuditLogger { - static final int EMPTY_BYTEBUFFER_SIZE = Ints.checkedCast(ObjectSizes.sizeOnHeapExcludingData(ByteBuffer.allocate(0))); - static final int EMPTY_LIST_SIZE = Ints.checkedCast(ObjectSizes.measureDeep(new ArrayList(0))); - private static final int EMPTY_BYTEBUF_SIZE; - private static final int OBJECT_HEADER_SIZE = MemoryLayoutSpecification.SPEC.getObjectHeaderSize(); - static - { - int tempSize = 0; - ByteBuf buf = CBUtil.allocator.buffer(0, 0); - try - { - tempSize = Ints.checkedCast(ObjectSizes.measure(buf)); - } - finally - { - buf.release(); - } - EMPTY_BYTEBUF_SIZE = tempSize; - } - protected static final Logger logger = LoggerFactory.getLogger(BinLogAuditLogger.class); private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES); private static final NoSpamLogger.NoSpamLogStatement droppedSamplesStatement = noSpamLogger.getStatement("Dropped {} binary log samples", 1, TimeUnit.MINUTES); @@ -290,68 +259,6 @@ abstract class BinLogAuditLogger implements IAuditLogger } } - protected static abstract class AbstractWeighableMarshallable extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable - { - private final ByteBuf queryOptionsBuffer; - private final long timeMillis; - private final int protocolVersion; - - AbstractWeighableMarshallable(QueryOptions queryOptions, long timeMillis) - { - this.timeMillis = timeMillis; - ProtocolVersion version = queryOptions.getProtocolVersion(); - this.protocolVersion = version.asInt(); - int optionsSize = QueryOptions.codec.encodedSize(queryOptions, version); - queryOptionsBuffer = CBUtil.allocator.buffer(optionsSize, optionsSize); - /* - * Struggled with what tradeoff to make in terms of query options which is potentially large and complicated - * There is tension between low garbage production (or allocator overhead), small working set size, and CPU overhead reserializing the - * query options into binary format. - * - * I went with the lowest risk most predictable option which is allocator overhead and CPU overhead - * rather then keep the original query message around so I could just serialize that as a memcpy. It's more - * instructions when turned on, but it doesn't change memory footprint quite as much and it's more pay for what you use - * in terms of query volume. The CPU overhead is spread out across producers so we should at least get - * some scaling. - * - */ - boolean success = false; - try - { - QueryOptions.codec.encode(queryOptions, queryOptionsBuffer, version); - success = true; - } - finally - { - if (!success) - { - queryOptionsBuffer.release(); - } - } - } - - @Override - public void writeMarshallable(WireOut wire) - { - wire.write("protocol-version").int32(protocolVersion); - wire.write("query-options").bytes(BytesStore.wrap(queryOptionsBuffer.nioBuffer())); - wire.write("query-time").int64(timeMillis); - } - - @Override - public void release() - { - queryOptionsBuffer.release(); - } - - //8-bytes for protocol version (assume alignment cost), 8-byte timestamp, 8-byte object header + other contents - @Override - public int weight() - { - return 8 + 8 + OBJECT_HEADER_SIZE + EMPTY_BYTEBUF_SIZE + queryOptionsBuffer.capacity(); - } - } - private static Throwable cleanDirectory(File directory, Throwable accumulate) { if (!directory.exists()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/audit/FullQueryLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/FullQueryLogger.java b/src/java/org/apache/cassandra/audit/FullQueryLogger.java index 36d0127..8cd8f4a 100644 --- a/src/java/org/apache/cassandra/audit/FullQueryLogger.java +++ b/src/java/org/apache/cassandra/audit/FullQueryLogger.java @@ -15,31 +15,78 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.audit; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; +import io.netty.buffer.ByteBuf; import net.openhft.chronicle.bytes.BytesStore; import net.openhft.chronicle.wire.ValueOut; import net.openhft.chronicle.wire.WireOut; import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.statements.BatchStatement; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.CBUtil; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.binlog.BinLog; +import org.apache.cassandra.utils.concurrent.WeightedQueue; +import org.github.jamm.MemoryLayoutSpecification; /** * A logger that logs entire query contents after the query finishes (or times out). */ public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger { + public static final long CURRENT_VERSION = 0; // encode a dummy version, to prevent pain in decoding in the future + + public static final String VERSION = "version"; + public static final String TYPE = "type"; + + public static final String PROTOCOL_VERSION = "protocol-version"; + public static final String QUERY_OPTIONS = "query-options"; + public static final String QUERY_START_TIME = "query-start-time"; + + public static final String GENERATED_TIMESTAMP = "generated-timestamp"; + public static final String GENERATED_NOW_IN_SECONDS = "generated-now-in-seconds"; + + public static final String BATCH = "batch"; + public static final String SINGLE_QUERY = "single-query"; + + public static final String QUERY = "query"; + public static final String BATCH_TYPE = "batch-type"; + public static final String QUERIES = "queries"; + public static final String VALUES = "values"; + + private static final int EMPTY_BYTEBUFFER_SIZE = Ints.checkedCast(ObjectSizes.sizeOnHeapExcludingData(ByteBuffer.allocate(0))); + + private static final int EMPTY_LIST_SIZE = Ints.checkedCast(ObjectSizes.measureDeep(new ArrayList(0))); + private static final int EMPTY_BYTEBUF_SIZE; + + private static final int OBJECT_HEADER_SIZE = MemoryLayoutSpecification.SPEC.getObjectHeaderSize(); + private static final int OBJECT_REFERENCE_SIZE = MemoryLayoutSpecification.SPEC.getReferenceSize(); + + static + { + ByteBuf buf = CBUtil.allocator.buffer(0, 0); + try + { + EMPTY_BYTEBUF_SIZE = Ints.checkedCast(ObjectSizes.measure(buf)); + } + finally + { + buf.release(); + } + } + @Override public void log(AuditLogEntry entry) { - logQuery(entry.getOperation(), entry.getOptions(), entry.getTimestamp()); + logQuery(entry.getOperation(), entry.getOptions(), entry.getState(), entry.getTimestamp()); } /** @@ -48,14 +95,21 @@ public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger * @param queries CQL text of the queries * @param values Values to bind to as parameters for the queries * @param queryOptions Options associated with the query invocation + * @param queryState Timestamp state associated with the query invocation * @param batchTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked */ - void logBatch(String type, List<String> queries, List<List<ByteBuffer>> values, QueryOptions queryOptions, long batchTimeMillis) + void logBatch(BatchStatement.Type type, + List<String> queries, + List<List<ByteBuffer>> values, + QueryOptions queryOptions, + QueryState queryState, + long batchTimeMillis) { Preconditions.checkNotNull(type, "type was null"); Preconditions.checkNotNull(queries, "queries was null"); Preconditions.checkNotNull(values, "value was null"); Preconditions.checkNotNull(queryOptions, "queryOptions was null"); + Preconditions.checkNotNull(queryState, "queryState was null"); Preconditions.checkArgument(batchTimeMillis > 0, "batchTimeMillis must be > 0"); //Don't construct the wrapper if the log is disabled @@ -65,7 +119,7 @@ public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger return; } - WeighableMarshallableBatch wrappedBatch = new WeighableMarshallableBatch(type, queries, values, queryOptions, batchTimeMillis); + Batch wrappedBatch = new Batch(type, queries, values, queryOptions, queryState, batchTimeMillis); logRecord(wrappedBatch, binLog); } @@ -73,12 +127,14 @@ public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger * Log a single CQL query * @param query CQL query text * @param queryOptions Options associated with the query invocation + * @param queryState Timestamp state associated with the query invocation * @param queryTimeMillis Approximate time in milliseconds since the epoch since the batch was invoked */ - void logQuery(String query, QueryOptions queryOptions, long queryTimeMillis) + void logQuery(String query, QueryOptions queryOptions, QueryState queryState, long queryTimeMillis) { Preconditions.checkNotNull(query, "query was null"); Preconditions.checkNotNull(queryOptions, "queryOptions was null"); + Preconditions.checkNotNull(queryState, "queryState was null"); Preconditions.checkArgument(queryTimeMillis > 0, "queryTimeMillis must be > 0"); //Don't construct the wrapper if the log is disabled @@ -88,70 +144,98 @@ public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger return; } - WeighableMarshallableQuery wrappedQuery = new WeighableMarshallableQuery(query, queryOptions, queryTimeMillis); + Query wrappedQuery = new Query(query, queryOptions, queryState, queryTimeMillis); logRecord(wrappedQuery, binLog); } - static class WeighableMarshallableBatch extends AbstractWeighableMarshallable + static class Query extends AbstractLogEntry + { + private final String query; + + public Query(String query, QueryOptions queryOptions, QueryState queryState, long queryStartTime) + { + super(queryOptions, queryState, queryStartTime); + this.query = query; + } + + @Override + protected String type() + { + return SINGLE_QUERY; + } + + @Override + public void writeMarshallable(WireOut wire) + { + super.writeMarshallable(wire); + wire.write(QUERY).text(query); + } + + @Override + public int weight() + { + return Ints.checkedCast(ObjectSizes.sizeOf(query)) + super.weight(); + } + } + + static class Batch extends AbstractLogEntry { private final int weight; - private final String batchType; + private final BatchStatement.Type batchType; private final List<String> queries; private final List<List<ByteBuffer>> values; - public WeighableMarshallableBatch(String batchType, List<String> queries, List<List<ByteBuffer>> values, QueryOptions queryOptions, long batchTimeMillis) - { - super(queryOptions, batchTimeMillis); - this.queries = queries; - this.values = values; - this.batchType = batchType; - boolean success = false; - try - { - //weight, batch type, queries, values - int weightTemp = 8 + EMPTY_LIST_SIZE + EMPTY_LIST_SIZE; - for (int ii = 0; ii < queries.size(); ii++) - { - weightTemp += ObjectSizes.sizeOf(queries.get(ii)); - } - - weightTemp += EMPTY_LIST_SIZE * values.size(); - for (int ii = 0; ii < values.size(); ii++) - { - List<ByteBuffer> sublist = values.get(ii); - weightTemp += EMPTY_BYTEBUFFER_SIZE * sublist.size(); - for (int zz = 0; zz < sublist.size(); zz++) - { - weightTemp += sublist.get(zz).capacity(); - } - } - weightTemp += super.weight(); - weightTemp += ObjectSizes.sizeOf(batchType); - weight = weightTemp; - success = true; - } - finally - { - if (!success) - { - release(); - } - } + public Batch(BatchStatement.Type batchType, + List<String> queries, + List<List<ByteBuffer>> values, + QueryOptions queryOptions, + QueryState queryState, + long batchTimeMillis) + { + super(queryOptions, queryState, batchTimeMillis); + + this.queries = queries; + this.values = values; + this.batchType = batchType; + + int weight = super.weight(); + + // weight, queries, values, batch type + weight += 4 + // cached weight + 2 * EMPTY_LIST_SIZE + // queries + values lists + OBJECT_REFERENCE_SIZE; // batchType reference, worst case + + for (String query : queries) + weight += ObjectSizes.sizeOf(query); + + for (List<ByteBuffer> subValues : values) + { + weight += EMPTY_LIST_SIZE; + for (ByteBuffer value : subValues) + weight += EMPTY_BYTEBUFFER_SIZE + value.capacity(); + } + + this.weight = weight; + } + + @Override + protected String type() + { + return BATCH; } @Override public void writeMarshallable(WireOut wire) { - wire.write("type").text("batch"); super.writeMarshallable(wire); - wire.write("batch-type").text(batchType); - ValueOut valueOut = wire.write("queries"); + wire.write(BATCH_TYPE).text(batchType.name()); + ValueOut valueOut = wire.write(QUERIES); valueOut.int32(queries.size()); for (String query : queries) { valueOut.text(query); } - valueOut = wire.write("values"); + valueOut = wire.write(VALUES); valueOut.int32(values.size()); for (List<ByteBuffer> subValues : values) { @@ -170,28 +254,81 @@ public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger } } - static class WeighableMarshallableQuery extends AbstractWeighableMarshallable + private static abstract class AbstractLogEntry extends BinLog.ReleaseableWriteMarshallable implements WeightedQueue.Weighable { - private final String query; + private final long queryStartTime; + private final int protocolVersion; + private final ByteBuf queryOptionsBuffer; - public WeighableMarshallableQuery(String query, QueryOptions queryOptions, long queryTimeMillis) + private final long generatedTimestamp; + private final int generatedNowInSeconds; + + AbstractLogEntry(QueryOptions queryOptions, QueryState queryState, long queryStartTime) { - super(queryOptions, queryTimeMillis); - this.query = query; + this.queryStartTime = queryStartTime; + + this.protocolVersion = queryOptions.getProtocolVersion().asInt(); + int optionsSize = QueryOptions.codec.encodedSize(queryOptions, queryOptions.getProtocolVersion()); + queryOptionsBuffer = CBUtil.allocator.buffer(optionsSize, optionsSize); + + this.generatedTimestamp = queryState.generatedTimestamp(); + this.generatedNowInSeconds = queryState.generatedNowInSeconds(); + + /* + * Struggled with what tradeoff to make in terms of query options which is potentially large and complicated + * There is tension between low garbage production (or allocator overhead), small working set size, and CPU overhead reserializing the + * query options into binary format. + * + * I went with the lowest risk most predictable option which is allocator overhead and CPU overhead + * rather then keep the original query message around so I could just serialize that as a memcpy. It's more + * instructions when turned on, but it doesn't change memory footprint quite as much and it's more pay for what you use + * in terms of query volume. The CPU overhead is spread out across producers so we should at least get + * some scaling. + * + */ + try + { + QueryOptions.codec.encode(queryOptions, queryOptionsBuffer, queryOptions.getProtocolVersion()); + } + catch (Throwable e) + { + queryOptionsBuffer.release(); + throw e; + } } @Override public void writeMarshallable(WireOut wire) { - wire.write("type").text("single"); - super.writeMarshallable(wire); - wire.write("query").text(query); + wire.write(VERSION).int16(CURRENT_VERSION); + wire.write(TYPE).text(type()); + + wire.write(QUERY_START_TIME).int64(queryStartTime); + wire.write(PROTOCOL_VERSION).int32(protocolVersion); + wire.write(QUERY_OPTIONS).bytes(BytesStore.wrap(queryOptionsBuffer.nioBuffer())); + + wire.write(GENERATED_TIMESTAMP).int64(generatedTimestamp); + wire.write(GENERATED_NOW_IN_SECONDS).int32(generatedNowInSeconds); + } + + @Override + public void release() + { + queryOptionsBuffer.release(); } @Override public int weight() { - return Ints.checkedCast(ObjectSizes.sizeOf(query)) + super.weight(); + return OBJECT_HEADER_SIZE + + 8 // queryStartTime + + 4 // protocolVersion + + EMPTY_BYTEBUF_SIZE + queryOptionsBuffer.capacity() // queryOptionsBuffer + + 8 // generatedTimestamp + + 4; // generatedNowInSeconds } + + protected abstract String type(); } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/cql3/QueryOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java index f76d6b2..84adf80 100644 --- a/src/java/org/apache/cassandra/cql3/QueryOptions.java +++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java @@ -34,7 +34,6 @@ import org.apache.cassandra.transport.CBCodec; import org.apache.cassandra.transport.CBUtil; import org.apache.cassandra.transport.ProtocolException; import org.apache.cassandra.transport.ProtocolVersion; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -45,7 +44,7 @@ import org.apache.commons.lang3.builder.ToStringStyle; public abstract class QueryOptions { public static final QueryOptions DEFAULT = new DefaultQueryOptions(ConsistencyLevel.ONE, - Collections.<ByteBuffer>emptyList(), + Collections.emptyList(), false, SpecificOptions.DEFAULT, ProtocolVersion.CURRENT); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/service/QueryState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java index d1b03d4..2bd07ab 100644 --- a/src/java/org/apache/cassandra/service/QueryState.java +++ b/src/java/org/apache/cassandra/service/QueryState.java @@ -40,6 +40,14 @@ public class QueryState } /** + * @return a QueryState object for internal C* calls (not limited by any kind of auth). + */ + public static QueryState forInternalCalls() + { + return new QueryState(ClientState.forInternalCalls()); + } + + /** * Generate, cache, and record a timestamp value on the server-side. * * Used in reads for all live and expiring cells, and all kinds of deletion infos. @@ -75,11 +83,19 @@ public class QueryState } /** - * @return a QueryState object for internal C* calls (not limited by any kind of auth). + * @return server-generated timestamp value, if one had been requested, or Long.MIN_VALUE otherwise */ - public static QueryState forInternalCalls() + public long generatedTimestamp() { - return new QueryState(ClientState.forInternalCalls()); + return timestamp; + } + + /** + * @return server-generated nowInSeconds value, if one had been requested, or Integer.MIN_VALUE otherwise + */ + public int generatedNowInSeconds() + { + return nowInSeconds; } public ClientState getClientState() http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/tools/fqltool/Dump.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/fqltool/Dump.java b/src/java/org/apache/cassandra/tools/fqltool/Dump.java index 52eadb5..a8e7592 100644 --- a/src/java/org/apache/cassandra/tools/fqltool/Dump.java +++ b/src/java/org/apache/cassandra/tools/fqltool/Dump.java @@ -38,6 +38,8 @@ import net.openhft.chronicle.queue.RollCycles; import net.openhft.chronicle.threads.Pauser; import net.openhft.chronicle.wire.ReadMarshallable; import net.openhft.chronicle.wire.ValueIn; +import net.openhft.chronicle.wire.WireIn; +import org.apache.cassandra.audit.FullQueryLogger; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.transport.ProtocolVersion; @@ -67,59 +69,57 @@ public class Dump implements Runnable public static void dump(List<String> arguments, String rollCycle, boolean follow) { StringBuilder sb = new StringBuilder(); - ReadMarshallable reader = wireIn -> { + ReadMarshallable reader = wireIn -> + { sb.setLength(0); - String type = wireIn.read("type").text(); - sb.append("Type: ").append(type).append(System.lineSeparator()); - assert type != null; - if (type.equals("AuditLog")) - { - sb.append("LogMessage: ").append(wireIn.read("message").text()).append(System.lineSeparator()); - } - else + int version = wireIn.read(FullQueryLogger.VERSION).int16(); + if (version != FullQueryLogger.CURRENT_VERSION) + throw new UnsupportedOperationException("Full query log of unexpected version " + version + " encountered"); + + String type = wireIn.read(FullQueryLogger.TYPE).text(); + sb.append("Type: ") + .append(type) + .append(System.lineSeparator()); + + long queryStartTime = wireIn.read(FullQueryLogger.QUERY_START_TIME).int64(); + sb.append("Query start time: ") + .append(queryStartTime) + .append(System.lineSeparator()); + + int protocolVersion = wireIn.read(FullQueryLogger.PROTOCOL_VERSION).int32(); + sb.append("Protocol version: ") + .append(protocolVersion) + .append(System.lineSeparator()); + + QueryOptions options = + QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(FullQueryLogger.QUERY_OPTIONS).bytes()), + ProtocolVersion.decode(protocolVersion)); + + long generatedTimestamp = wireIn.read(FullQueryLogger.GENERATED_TIMESTAMP).int64(); + sb.append("Generated timestamp:") + .append(generatedTimestamp) + .append(System.lineSeparator()); + + int generatedNowInSeconds = wireIn.read(FullQueryLogger.GENERATED_NOW_IN_SECONDS).int32(); + sb.append("Generated nowInSeconds:") + .append(generatedNowInSeconds) + .append(System.lineSeparator()); + + switch (type) { - int protocolVersion = wireIn.read("protocol-version").int32(); - sb.append("Protocol version: ").append(protocolVersion).append(System.lineSeparator()); - QueryOptions options = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read("query-options").bytes()), ProtocolVersion.decode(protocolVersion)); - sb.append("Query time: ").append(wireIn.read("query-time").int64()).append(System.lineSeparator()); + case (FullQueryLogger.SINGLE_QUERY): + dumpQuery(options, wireIn, sb); + break; - if (type.equals("single")) - { - sb.append("Query: ").append(wireIn.read("query").text()).append(System.lineSeparator()); - List<ByteBuffer> values = options.getValues() != null ? options.getValues() : Collections.EMPTY_LIST; - sb.append("Values: ").append(System.lineSeparator()); - valuesToStringBuilder(values, sb); - } - else - { - sb.append("Batch type: ").append(wireIn.read("batch-type").text()).append(System.lineSeparator()); - ValueIn in = wireIn.read("queries"); - int numQueries = in.int32(); - List<String> queries = new ArrayList<>(); - for (int ii = 0; ii < numQueries; ii++) - { - queries.add(in.text()); - } - in = wireIn.read("values"); - int numValues = in.int32(); - List<List<ByteBuffer>> values = new ArrayList<>(); - for (int ii = 0; ii < numValues; ii++) - { - List<ByteBuffer> subValues = new ArrayList<>(); - values.add(subValues); - int numSubValues = in.int32(); - for (int zz = 0; zz < numSubValues; zz++) - { - subValues.add(ByteBuffer.wrap(in.bytes())); - } - sb.append("Query: ").append(queries.get(ii)).append(System.lineSeparator()); - sb.append("Values: ").append(System.lineSeparator()); - valuesToStringBuilder(subValues, sb); - } - } + case (FullQueryLogger.BATCH): + dumpBatch(options, wireIn, sb); + break; + + default: + throw new UnsupportedOperationException("Log entry of unsupported type " + type); } - sb.append(System.lineSeparator()); + System.out.print(sb.toString()); System.out.flush(); }; @@ -153,7 +153,57 @@ public class Dump implements Runnable } } - private static void valuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb) + private static void dumpQuery(QueryOptions options, WireIn wireIn, StringBuilder sb) + { + sb.append("Query: ") + .append(wireIn.read(FullQueryLogger.QUERY).text()) + .append(System.lineSeparator()); + + List<ByteBuffer> values = options.getValues() != null + ? options.getValues() + : Collections.emptyList(); + + sb.append("Values: ") + .append(System.lineSeparator()); + appendValuesToStringBuilder(values, sb); + sb.append(System.lineSeparator()); + } + + private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder sb) + { + sb.append("Batch type: ") + .append(wireIn.read(FullQueryLogger.BATCH_TYPE).text()) + .append(System.lineSeparator()); + + ValueIn in = wireIn.read(FullQueryLogger.QUERIES); + int numQueries = in.int32(); + List<String> queries = new ArrayList<>(numQueries); + for (int i = 0; i < numQueries; i++) + queries.add(in.text()); + + in = wireIn.read(FullQueryLogger.VALUES); + int numValues = in.int32(); + + for (int i = 0; i < numValues; i++) + { + int numSubValues = in.int32(); + List<ByteBuffer> subValues = new ArrayList<>(numSubValues); + for (int j = 0; j < numSubValues; j++) + subValues.add(ByteBuffer.wrap(in.bytes())); + + sb.append("Query: ") + .append(queries.get(i)) + .append(System.lineSeparator()); + + sb.append("Values: ") + .append(System.lineSeparator()); + appendValuesToStringBuilder(subValues, sb); + } + + sb.append(System.lineSeparator()); + } + + private static void appendValuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb) { boolean first = true; for (ByteBuffer value : values) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index b379918..a85bec0 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -221,7 +221,7 @@ public class BatchMessage extends Message.Request Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload(), queryStartNanoTime); if (auditLogManager.isLoggingEnabled()) - auditLogManager.logBatch(batchType.name(), queryOrIdList, values, prepared, options, state, fqlTime); + auditLogManager.logBatch(batchType, queryOrIdList, values, prepared, options, state, fqlTime); return response; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b645de1/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java index 0cc8094..542f8bb 100644 --- a/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java +++ b/test/unit/org/apache/cassandra/audit/FullQueryLoggerTest.java @@ -15,15 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.audit; - import java.io.File; import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Semaphore; @@ -44,9 +43,12 @@ import net.openhft.chronicle.wire.WireOut; import org.apache.cassandra.Util; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.cql3.QueryOptions; -import org.apache.cassandra.audit.FullQueryLogger.WeighableMarshallableQuery; -import org.apache.cassandra.audit.FullQueryLogger.WeighableMarshallableBatch; +import org.apache.cassandra.audit.FullQueryLogger.Query; +import org.apache.cassandra.audit.FullQueryLogger.Batch; +import org.apache.cassandra.cql3.statements.BatchStatement.Type; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.binlog.BinLogTest; @@ -55,6 +57,20 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.apache.cassandra.audit.FullQueryLogger.BATCH; +import static org.apache.cassandra.audit.FullQueryLogger.BATCH_TYPE; +import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_NOW_IN_SECONDS; +import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_TIMESTAMP; +import static org.apache.cassandra.audit.FullQueryLogger.PROTOCOL_VERSION; +import static org.apache.cassandra.audit.FullQueryLogger.QUERIES; +import static org.apache.cassandra.audit.FullQueryLogger.QUERY; +import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS; +import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME; +import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY; +import static org.apache.cassandra.audit.FullQueryLogger.TYPE; +import static org.apache.cassandra.audit.FullQueryLogger.VALUES; +import static org.apache.cassandra.audit.FullQueryLogger.VERSION; + public class FullQueryLoggerTest extends CQLTester { private static Path tempDir; @@ -250,7 +266,7 @@ public class FullQueryLoggerTest extends CQLTester { //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior Semaphore binLogBlocked = new Semaphore(0); - instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1) + instance.binLog.put(new Query("foo1", QueryOptions.DEFAULT, queryState(), 1) { public void writeMarshallable(WireOut wire) @@ -332,7 +348,7 @@ public class FullQueryLoggerTest extends CQLTester { //Find out when the bin log thread has been blocked, necessary to not run into batch task drain behavior Semaphore binLogBlocked = new Semaphore(0); - instance.binLog.put(new WeighableMarshallableQuery("foo1", QueryOptions.DEFAULT, 1) + instance.binLog.put(new Query("foo1", QueryOptions.DEFAULT, queryState(), 1) { public void writeMarshallable(WireOut wire) @@ -366,7 +382,7 @@ public class FullQueryLoggerTest extends CQLTester //This sample should get dropped AKA released without being written AtomicInteger releasedCount = new AtomicInteger(0); AtomicInteger writtenCount = new AtomicInteger(0); - instance.logRecord(new WeighableMarshallableQuery("foo3", QueryOptions.DEFAULT, 1) { + instance.logRecord(new Query("foo3", QueryOptions.DEFAULT, queryState(), 1) { public void writeMarshallable(WireOut wire) { writtenCount.incrementAndGet(); @@ -403,14 +419,20 @@ public class FullQueryLoggerTest extends CQLTester try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) { ExcerptTailer tailer = queue.createTailer(); - assertTrue(tailer.readDocument(wire -> { - assertEquals("single", wire.read("type").text()); - ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32()); + assertTrue(tailer.readDocument(wire -> + { + assertEquals(0, wire.read(VERSION).int16()); + assertEquals(SINGLE_QUERY, wire.read(TYPE).text()); + + assertEquals(1L, wire.read(QUERY_START_TIME).int64()); + + ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read(PROTOCOL_VERSION).int32()); assertEquals(ProtocolVersion.CURRENT, protocolVersion); - QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion); + + QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read(QUERY_OPTIONS).bytes()), protocolVersion); compareQueryOptions(QueryOptions.DEFAULT, queryOptions); - assertEquals(1L, wire.read("query-time").int64()); - assertEquals("foo", wire.read("query").text()); + + assertEquals("foo", wire.read(QUERY).text()); })); } } @@ -419,7 +441,15 @@ public class FullQueryLoggerTest extends CQLTester public void testRoundTripBatch() throws Exception { configureFQL(); - instance.logBatch("UNLOGGED", Arrays.asList("foo1", "foo2"), Arrays.asList(Arrays.asList(ByteBuffer.allocate(1) , ByteBuffer.allocateDirect(2)), Arrays.asList()), QueryOptions.DEFAULT, 1); + instance.logBatch(Type.UNLOGGED, + Arrays.asList("foo1", "foo2"), + Arrays.asList(Arrays.asList(ByteBuffer.allocate(1), + ByteBuffer.allocateDirect(2)), + Collections.emptyList()), + QueryOptions.DEFAULT, + queryState(), + 1); + Util.spinAssertEquals(true, () -> { try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) @@ -427,22 +457,31 @@ public class FullQueryLoggerTest extends CQLTester return queue.createTailer().readingDocument().isPresent(); } }, 60); + try (ChronicleQueue queue = ChronicleQueueBuilder.single(tempDir.toFile()).rollCycle(RollCycles.TEST_SECONDLY).build()) { ExcerptTailer tailer = queue.createTailer(); assertTrue(tailer.readDocument(wire -> { - assertEquals("batch", wire.read("type").text()); - ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read("protocol-version").int32()); + assertEquals(0, wire.read(VERSION).int16()); + assertEquals(BATCH, wire.read(TYPE).text()); + + assertEquals(1L, wire.read(QUERY_START_TIME).int64()); + + ProtocolVersion protocolVersion = ProtocolVersion.decode(wire.read(PROTOCOL_VERSION).int32()); assertEquals(ProtocolVersion.CURRENT, protocolVersion); - QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read("query-options").bytes()), protocolVersion); - assertEquals(1L, wire.read("query-time").int64()); + + QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wire.read(QUERY_OPTIONS).bytes()), protocolVersion); compareQueryOptions(QueryOptions.DEFAULT, queryOptions); - assertEquals("UNLOGGED", wire.read("batch-type").text()); - ValueIn in = wire.read("queries"); + + assertEquals(Long.MIN_VALUE, wire.read(GENERATED_TIMESTAMP).int64()); + assertEquals(Integer.MIN_VALUE, wire.read(GENERATED_NOW_IN_SECONDS).int32()); + + assertEquals("UNLOGGED", wire.read(BATCH_TYPE).text()); + ValueIn in = wire.read(QUERIES); assertEquals(2, in.int32()); assertEquals("foo1", in.text()); assertEquals("foo2", in.text()); - in = wire.read("values"); + in = wire.read(VALUES); assertEquals(2, in.int32()); assertEquals(2, in.int32()); assertTrue(Arrays.equals(new byte[1], in.bytes())); @@ -456,7 +495,7 @@ public class FullQueryLoggerTest extends CQLTester public void testQueryWeight() { //Empty query should have some weight - WeighableMarshallableQuery query = new WeighableMarshallableQuery("", QueryOptions.DEFAULT, 1); + Query query = new Query("", QueryOptions.DEFAULT, queryState(), 1); assertTrue(query.weight() >= 95); StringBuilder sb = new StringBuilder(); @@ -464,14 +503,14 @@ public class FullQueryLoggerTest extends CQLTester { sb.append('a'); } - query = new WeighableMarshallableQuery(sb.toString(), QueryOptions.DEFAULT, 1); + query = new Query(sb.toString(), QueryOptions.DEFAULT, queryState(), 1); //A large query should be reflected in the size, * 2 since characters are still two bytes assertTrue(query.weight() > ObjectSizes.measureDeep(sb.toString())); //Large query options should be reflected QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024))); - query = new WeighableMarshallableQuery("", largeOptions, 1); + query = new Query("", largeOptions, queryState(), 1); assertTrue(query.weight() > 1024 * 1024); System.out.printf("weight %d%n", query.weight()); } @@ -480,8 +519,8 @@ public class FullQueryLoggerTest extends CQLTester public void testBatchWeight() { //An empty batch should have weight - WeighableMarshallableBatch batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1); - assertTrue(batch.weight() >= 183); + Batch batch = new Batch(Type.UNLOGGED, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1); + assertTrue(batch.weight() > 0); StringBuilder sb = new StringBuilder(); for (int ii = 0; ii < 1024 * 1024; ii++) @@ -489,23 +528,19 @@ public class FullQueryLoggerTest extends CQLTester sb.append('a'); } - //The weight of the type string should be reflected - batch = new WeighableMarshallableBatch(sb.toString(), new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1); - assertTrue(batch.weight() > ObjectSizes.measureDeep(sb.toString())); - //The weight of the list containing queries should be reflected List<String> bigList = new ArrayList(100000); for (int ii = 0; ii < 100000; ii++) { bigList.add(""); } - batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1); + batch = new Batch(Type.UNLOGGED, bigList, new ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1); assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList)); //The size of the query should be reflected bigList = new ArrayList(1); bigList.add(sb.toString()); - batch = new WeighableMarshallableBatch("", bigList, new ArrayList<>(), QueryOptions.DEFAULT, 1); + batch = new Batch(Type.UNLOGGED, bigList, new ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1); assertTrue(batch.weight() > ObjectSizes.measureDeep(bigList)); bigList = null; @@ -516,74 +551,74 @@ public class FullQueryLoggerTest extends CQLTester bigValues.add(new ArrayList<>(0)); } bigValues.get(0).add(ByteBuffer.allocate(1024 * 1024 * 5)); - batch = new WeighableMarshallableBatch("", new ArrayList<>(), bigValues, QueryOptions.DEFAULT, 1); + batch = new Batch(Type.UNLOGGED, new ArrayList<>(), bigValues, QueryOptions.DEFAULT, queryState(), 1); assertTrue(batch.weight() > ObjectSizes.measureDeep(bigValues)); //As should the size of the values QueryOptions largeOptions = QueryOptions.forInternalCalls(Arrays.asList(ByteBuffer.allocate(1024 * 1024))); - batch = new WeighableMarshallableBatch("", new ArrayList<>(), new ArrayList<>(), largeOptions, 1); + batch = new Batch(Type.UNLOGGED, new ArrayList<>(), new ArrayList<>(), largeOptions, queryState(), 1); assertTrue(batch.weight() > 1024 * 1024); } @Test(expected = NullPointerException.class) public void testLogBatchNullType() throws Exception { - instance.logBatch(null, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, 1); + instance.logBatch(null, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1); } @Test(expected = NullPointerException.class) public void testLogBatchNullQueries() throws Exception { - instance.logBatch("", null, new ArrayList<>(), QueryOptions.DEFAULT, 1); + instance.logBatch(Type.UNLOGGED, null, new ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1); } @Test(expected = NullPointerException.class) public void testLogBatchNullQueriesQuery() throws Exception { configureFQL(); - instance.logBatch("", Arrays.asList((String)null), new ArrayList<>(), QueryOptions.DEFAULT, 1); + instance.logBatch(Type.UNLOGGED, Arrays.asList((String)null), new ArrayList<>(), QueryOptions.DEFAULT, queryState(), 1); } @Test(expected = NullPointerException.class) public void testLogBatchNullValues() throws Exception { - instance.logBatch("", new ArrayList<>(), null, QueryOptions.DEFAULT, 1); + instance.logBatch(Type.UNLOGGED, new ArrayList<>(), null, QueryOptions.DEFAULT, queryState(), 1); } @Test(expected = NullPointerException.class) public void testLogBatchNullValuesValue() throws Exception { - instance.logBatch("", new ArrayList<>(), Arrays.asList((List<ByteBuffer>)null), null, 1); + instance.logBatch(Type.UNLOGGED, new ArrayList<>(), Arrays.asList((List<ByteBuffer>)null), null, queryState(), 1); } @Test(expected = NullPointerException.class) public void testLogBatchNullQueryOptions() throws Exception { - instance.logBatch("", new ArrayList<>(), new ArrayList<>(), null, 1); + instance.logBatch(Type.UNLOGGED, new ArrayList<>(), new ArrayList<>(), null, queryState(), 1); } @Test(expected = IllegalArgumentException.class) public void testLogBatchNegativeTime() throws Exception { - instance.logBatch("", new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, -1); + instance.logBatch(Type.UNLOGGED, new ArrayList<>(), new ArrayList<>(), QueryOptions.DEFAULT, queryState(), -1); } @Test(expected = NullPointerException.class) public void testLogQueryNullQuery() throws Exception { - instance.logQuery(null, QueryOptions.DEFAULT, 1); + instance.logQuery(null, QueryOptions.DEFAULT, queryState(), 1); } @Test(expected = NullPointerException.class) public void testLogQueryNullQueryOptions() throws Exception { - instance.logQuery("", null, 1); + instance.logQuery("", null, queryState(), 1); } @Test(expected = IllegalArgumentException.class) public void testLogQueryNegativeTime() throws Exception { - instance.logQuery("", QueryOptions.DEFAULT, -1); + instance.logQuery("", QueryOptions.DEFAULT, queryState(), -1); } private static void compareQueryOptions(QueryOptions a, QueryOptions b) @@ -604,8 +639,12 @@ public class FullQueryLoggerTest extends CQLTester private void logQuery(String query) { - instance.logQuery(query, QueryOptions.DEFAULT, 1); + instance.logQuery(query, QueryOptions.DEFAULT, queryState(), 1); } + private QueryState queryState() + { + return new QueryState(ClientState.forInternalCalls()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
