Add fqltool replay Patch by marcuse; reviewed by Jason Brown and Dinesh Joshi for CASSANDRA-14618
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/62ffb772 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/62ffb772 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/62ffb772 Branch: refs/heads/trunk Commit: 62ffb7723917768c38c9e012710c6dce509191c1 Parents: 46c33f3 Author: Marcus Eriksson <[email protected]> Authored: Mon Aug 6 16:32:27 2018 +0200 Committer: Marcus Eriksson <[email protected]> Committed: Sat Sep 1 08:35:54 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/audit/FullQueryLogger.java | 5 +- .../apache/cassandra/service/QueryState.java | 8 + .../cassandra/tools/FullQueryLogTool.java | 6 +- .../tools/fqltool/DriverResultSet.java | 241 ++++++ .../apache/cassandra/tools/fqltool/Dump.java | 325 -------- .../cassandra/tools/fqltool/FQLQuery.java | 278 +++++++ .../tools/fqltool/FQLQueryIterator.java | 72 ++ .../cassandra/tools/fqltool/FQLQueryReader.java | 116 +++ .../cassandra/tools/fqltool/QueryReplayer.java | 167 ++++ .../tools/fqltool/ResultComparator.java | 116 +++ .../cassandra/tools/fqltool/ResultHandler.java | 124 +++ .../cassandra/tools/fqltool/ResultStore.java | 142 ++++ .../cassandra/tools/fqltool/commands/Dump.java | 325 ++++++++ .../tools/fqltool/commands/Replay.java | 148 ++++ .../cassandra/tools/fqltool/FQLReplayTest.java | 760 +++++++++++++++++++ 16 files changed, 2505 insertions(+), 329 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cd2a14a..1227337 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add fqltool replay (CASSANDRA-14618) * Log keyspace in full query log (CASSANDRA-14656) * Transient Replication and Cheap Quorums (CASSANDRA-14404) * Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675) http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/audit/FullQueryLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/audit/FullQueryLogger.java b/src/java/org/apache/cassandra/audit/FullQueryLogger.java index c9f8447..9c1f472 100644 --- a/src/java/org/apache/cassandra/audit/FullQueryLogger.java +++ b/src/java/org/apache/cassandra/audit/FullQueryLogger.java @@ -23,6 +23,7 @@ import java.util.List; import javax.annotation.Nullable; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.primitives.Ints; @@ -151,7 +152,7 @@ public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger logRecord(wrappedQuery, binLog); } - static class Query extends AbstractLogEntry + public static class Query extends AbstractLogEntry { private final String query; @@ -181,7 +182,7 @@ public class FullQueryLogger extends BinLogAuditLogger implements IAuditLogger } } - static class Batch extends AbstractLogEntry + public static class Batch extends AbstractLogEntry { private final int weight; private final BatchStatement.Type batchType; http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/service/QueryState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java index 2bd07ab..26f58bf 100644 --- a/src/java/org/apache/cassandra/service/QueryState.java +++ b/src/java/org/apache/cassandra/service/QueryState.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service; import java.net.InetAddress; +import org.apache.cassandra.transport.ClientStat; import org.apache.cassandra.utils.FBUtilities; /** @@ -39,6 +40,13 @@ public class QueryState this.clientState = clientState; } + public QueryState(ClientState clientState, long timestamp, int nowInSeconds) + { + this(clientState); + this.timestamp = timestamp; + this.nowInSeconds = nowInSeconds; + } + /** * @return a QueryState object for internal C* calls (not limited by any kind of auth). */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/FullQueryLogTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/FullQueryLogTool.java b/src/java/org/apache/cassandra/tools/FullQueryLogTool.java index 0d170d9..c1d4713 100644 --- a/src/java/org/apache/cassandra/tools/FullQueryLogTool.java +++ b/src/java/org/apache/cassandra/tools/FullQueryLogTool.java @@ -31,7 +31,8 @@ import io.airlift.airline.ParseCommandUnrecognizedException; import io.airlift.airline.ParseOptionConversionException; import io.airlift.airline.ParseOptionMissingException; import io.airlift.airline.ParseOptionMissingValueException; -import org.apache.cassandra.tools.fqltool.Dump; +import org.apache.cassandra.tools.fqltool.commands.Dump; +import org.apache.cassandra.tools.fqltool.commands.Replay; import static com.google.common.base.Throwables.getStackTraceAsString; import static com.google.common.collect.Lists.newArrayList; @@ -42,7 +43,8 @@ public class FullQueryLogTool { List<Class<? extends Runnable>> commands = newArrayList( Help.class, - Dump.class + Dump.class, + Replay.class ); Cli.CliBuilder<Runnable> builder = Cli.builder("fqltool"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java b/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java new file mode 100644 index 0000000..6c4ee45 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.collect.AbstractIterator; + +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import org.apache.cassandra.utils.ByteBufferUtil; + + +/** + * Wraps a result set from the driver so that we can reuse the compare code when reading + * up a result set produced by ResultStore. + */ +public class DriverResultSet implements ResultHandler.ComparableResultSet +{ + private final ResultSet resultSet; + private final Throwable failureException; + + public DriverResultSet(ResultSet resultSet) + { + this(resultSet, null); + } + + private DriverResultSet(ResultSet res, Throwable failureException) + { + resultSet = res; + this.failureException = failureException; + } + + public static DriverResultSet failed(Throwable ex) + { + return new DriverResultSet(null, ex); + } + + public ResultHandler.ComparableColumnDefinitions getColumnDefinitions() + { + if (wasFailed()) + return new DriverColumnDefinitions(null, true); + + return new DriverColumnDefinitions(resultSet.getColumnDefinitions()); + } + + public boolean wasFailed() + { + return failureException != null; + } + + public Throwable getFailureException() + { + return failureException; + } + + public Iterator<ResultHandler.ComparableRow> iterator() + { + if (wasFailed()) + return Collections.emptyListIterator(); + return new AbstractIterator<ResultHandler.ComparableRow>() + { + Iterator<Row> iter = resultSet.iterator(); + protected ResultHandler.ComparableRow computeNext() + { + if (iter.hasNext()) + return new DriverRow(iter.next()); + return endOfData(); + } + }; + } + + public static class DriverRow implements ResultHandler.ComparableRow + { + private final Row row; + + public DriverRow(Row row) + { + this.row = row; + } + + public ResultHandler.ComparableColumnDefinitions getColumnDefinitions() + { + return new DriverColumnDefinitions(row.getColumnDefinitions()); + } + + public ByteBuffer getBytesUnsafe(int i) + { + return row.getBytesUnsafe(i); + } + + @Override + public boolean equals(Object oo) + { + if (!(oo instanceof ResultHandler.ComparableRow)) + return false; + + ResultHandler.ComparableRow o = (ResultHandler.ComparableRow)oo; + if (getColumnDefinitions().size() != o.getColumnDefinitions().size()) + return false; + + for (int j = 0; j < getColumnDefinitions().size(); j++) + { + ByteBuffer b1 = getBytesUnsafe(j); + ByteBuffer b2 = o.getBytesUnsafe(j); + + if (b1 != null && b2 != null && !b1.equals(b2)) + { + return false; + } + if (b1 == null && b2 != null || b2 == null && b1 != null) + { + return false; + } + } + return true; + } + + public String toString() + { + StringBuilder sb = new StringBuilder(); + List<ResultHandler.ComparableDefinition> colDefs = getColumnDefinitions().asList(); + for (int i = 0; i < getColumnDefinitions().size(); i++) + { + ByteBuffer bb = getBytesUnsafe(i); + String row = bb != null ? ByteBufferUtil.bytesToHex(bb) : "NULL"; + sb.append(colDefs.get(i)).append(':').append(row).append(","); + } + return sb.toString(); + } + } + + public static class DriverColumnDefinitions implements ResultHandler.ComparableColumnDefinitions + { + private final ColumnDefinitions columnDefinitions; + private final boolean failed; + + public DriverColumnDefinitions(ColumnDefinitions columnDefinitions) + { + this(columnDefinitions, false); + } + + private DriverColumnDefinitions(ColumnDefinitions columnDefinitions, boolean failed) + { + this.columnDefinitions = columnDefinitions; + this.failed = failed; + } + + public List<ResultHandler.ComparableDefinition> asList() + { + if (wasFailed()) + return Collections.emptyList(); + return columnDefinitions.asList().stream().map(DriverDefinition::new).collect(Collectors.toList()); + } + + public boolean wasFailed() + { + return failed; + } + + public int size() + { + return columnDefinitions.size(); + } + + public Iterator<ResultHandler.ComparableDefinition> iterator() + { + return asList().iterator(); + } + + public boolean equals(Object oo) + { + if (!(oo instanceof ResultHandler.ComparableColumnDefinitions)) + return false; + + ResultHandler.ComparableColumnDefinitions o = (ResultHandler.ComparableColumnDefinitions)oo; + if (wasFailed() && o.wasFailed()) + return true; + + if (size() != o.size()) + return false; + + return asList().equals(o.asList()); + } + } + + public static class DriverDefinition implements ResultHandler.ComparableDefinition + { + private final ColumnDefinitions.Definition def; + + public DriverDefinition(ColumnDefinitions.Definition def) + { + this.def = def; + } + + public String getType() + { + return def.getType().toString(); + } + + public String getName() + { + return def.getName(); + } + + public boolean equals(Object oo) + { + if (!(oo instanceof ResultHandler.ComparableDefinition)) + return false; + + return def.equals(((DriverDefinition)oo).def); + } + + public String toString() + { + return getName() + ':' + getType(); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/Dump.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/fqltool/Dump.java b/src/java/org/apache/cassandra/tools/fqltool/Dump.java deleted file mode 100644 index a8e7592..0000000 --- a/src/java/org/apache/cassandra/tools/fqltool/Dump.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.tools.fqltool; - -import java.io.File; -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -import io.airlift.airline.Arguments; -import io.airlift.airline.Command; -import io.airlift.airline.Option; -import io.netty.buffer.Unpooled; -import net.openhft.chronicle.bytes.Bytes; -import net.openhft.chronicle.queue.ChronicleQueue; -import net.openhft.chronicle.queue.ChronicleQueueBuilder; -import net.openhft.chronicle.queue.ExcerptTailer; -import net.openhft.chronicle.queue.RollCycles; -import net.openhft.chronicle.threads.Pauser; -import net.openhft.chronicle.wire.ReadMarshallable; -import net.openhft.chronicle.wire.ValueIn; -import net.openhft.chronicle.wire.WireIn; -import org.apache.cassandra.audit.FullQueryLogger; -import org.apache.cassandra.cql3.QueryOptions; -import org.apache.cassandra.transport.ProtocolVersion; - -/** - * Dump the contents of a list of paths containing full query logs - */ -@Command(name = "dump", description = "Dump the contents of a full query log") -public class Dump implements Runnable -{ - static final char[] HEXI_DECIMAL = "0123456789ABCDEF".toCharArray(); - - @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Path containing the full query logs to dump.", required = true) - private List<String> arguments = new ArrayList<>(); - - @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file was rolled. May be necessary for Chronicle to correctly parse file names. (MINUTELY, HOURLY, DAILY). Default HOURLY.") - private String rollCycle = "HOURLY"; - - @Option(title = "follow", name = {"--follow"}, description = "Upon reacahing the end of the log continue indefinitely waiting for more records") - private boolean follow = false; - - @Override - public void run() - { - dump(arguments, rollCycle, follow); - } - - public static void dump(List<String> arguments, String rollCycle, boolean follow) - { - StringBuilder sb = new StringBuilder(); - ReadMarshallable reader = wireIn -> - { - sb.setLength(0); - - int version = wireIn.read(FullQueryLogger.VERSION).int16(); - if (version != FullQueryLogger.CURRENT_VERSION) - throw new UnsupportedOperationException("Full query log of unexpected version " + version + " encountered"); - - String type = wireIn.read(FullQueryLogger.TYPE).text(); - sb.append("Type: ") - .append(type) - .append(System.lineSeparator()); - - long queryStartTime = wireIn.read(FullQueryLogger.QUERY_START_TIME).int64(); - sb.append("Query start time: ") - .append(queryStartTime) - .append(System.lineSeparator()); - - int protocolVersion = wireIn.read(FullQueryLogger.PROTOCOL_VERSION).int32(); - sb.append("Protocol version: ") - .append(protocolVersion) - .append(System.lineSeparator()); - - QueryOptions options = - QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(FullQueryLogger.QUERY_OPTIONS).bytes()), - ProtocolVersion.decode(protocolVersion)); - - long generatedTimestamp = wireIn.read(FullQueryLogger.GENERATED_TIMESTAMP).int64(); - sb.append("Generated timestamp:") - .append(generatedTimestamp) - .append(System.lineSeparator()); - - int generatedNowInSeconds = wireIn.read(FullQueryLogger.GENERATED_NOW_IN_SECONDS).int32(); - sb.append("Generated nowInSeconds:") - .append(generatedNowInSeconds) - .append(System.lineSeparator()); - - switch (type) - { - case (FullQueryLogger.SINGLE_QUERY): - dumpQuery(options, wireIn, sb); - break; - - case (FullQueryLogger.BATCH): - dumpBatch(options, wireIn, sb); - break; - - default: - throw new UnsupportedOperationException("Log entry of unsupported type " + type); - } - - System.out.print(sb.toString()); - System.out.flush(); - }; - - //Backoff strategy for spinning on the queue, not aggressive at all as this doesn't need to be low latency - Pauser pauser = Pauser.millis(100); - List<ChronicleQueue> queues = arguments.stream().distinct().map(path -> ChronicleQueueBuilder.single(new File(path)).readOnly(true).rollCycle(RollCycles.valueOf(rollCycle)).build()).collect(Collectors.toList()); - List<ExcerptTailer> tailers = queues.stream().map(ChronicleQueue::createTailer).collect(Collectors.toList()); - boolean hadWork = true; - while (hadWork) - { - hadWork = false; - for (ExcerptTailer tailer : tailers) - { - while (tailer.readDocument(reader)) - { - hadWork = true; - } - } - - if (follow) - { - if (!hadWork) - { - //Chronicle queue doesn't support blocking so use this backoff strategy - pauser.pause(); - } - //Don't terminate the loop even if there wasn't work - hadWork = true; - } - } - } - - private static void dumpQuery(QueryOptions options, WireIn wireIn, StringBuilder sb) - { - sb.append("Query: ") - .append(wireIn.read(FullQueryLogger.QUERY).text()) - .append(System.lineSeparator()); - - List<ByteBuffer> values = options.getValues() != null - ? options.getValues() - : Collections.emptyList(); - - sb.append("Values: ") - .append(System.lineSeparator()); - appendValuesToStringBuilder(values, sb); - sb.append(System.lineSeparator()); - } - - private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder sb) - { - sb.append("Batch type: ") - .append(wireIn.read(FullQueryLogger.BATCH_TYPE).text()) - .append(System.lineSeparator()); - - ValueIn in = wireIn.read(FullQueryLogger.QUERIES); - int numQueries = in.int32(); - List<String> queries = new ArrayList<>(numQueries); - for (int i = 0; i < numQueries; i++) - queries.add(in.text()); - - in = wireIn.read(FullQueryLogger.VALUES); - int numValues = in.int32(); - - for (int i = 0; i < numValues; i++) - { - int numSubValues = in.int32(); - List<ByteBuffer> subValues = new ArrayList<>(numSubValues); - for (int j = 0; j < numSubValues; j++) - subValues.add(ByteBuffer.wrap(in.bytes())); - - sb.append("Query: ") - .append(queries.get(i)) - .append(System.lineSeparator()); - - sb.append("Values: ") - .append(System.lineSeparator()); - appendValuesToStringBuilder(subValues, sb); - } - - sb.append(System.lineSeparator()); - } - - private static void appendValuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb) - { - boolean first = true; - for (ByteBuffer value : values) - { - Bytes bytes = Bytes.wrapForRead(value); - long maxLength2 = Math.min(1024, bytes.readLimit() - bytes.readPosition()); - toHexString(bytes, bytes.readPosition(), maxLength2, sb); - if (maxLength2 < bytes.readLimit() - bytes.readPosition()) - { - sb.append("... truncated").append(System.lineSeparator()); - } - - if (first) - { - first = false; - } - else - { - sb.append("-----").append(System.lineSeparator()); - } - } - } - - //This is from net.openhft.chronicle.bytes, need to pass in the StringBuilder so had to copy - /* - * Copyright 2016 higherfrequencytrading.com - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - /** - * display the hex data of {@link Bytes} from the position() to the limit() - * - * @param bytes the buffer you wish to toString() - * @return hex representation of the buffer, from example [0D ,OA, FF] - */ - public static String toHexString(final Bytes bytes, long offset, long len, StringBuilder builder) - throws BufferUnderflowException - { - if (len == 0) - return ""; - - int width = 16; - int[] lastLine = new int[width]; - String sep = ""; - long position = bytes.readPosition(); - long limit = bytes.readLimit(); - - try { - bytes.readPositionRemaining(offset, len); - - long start = offset / width * width; - long end = (offset + len + width - 1) / width * width; - for (long i = start; i < end; i += width) { - // check for duplicate rows - if (i + width < end) { - boolean same = true; - - for (int j = 0; j < width && i + j < offset + len; j++) { - int ch = bytes.readUnsignedByte(i + j); - same &= (ch == lastLine[j]); - lastLine[j] = ch; - } - if (i > start && same) { - sep = "........\n"; - continue; - } - } - builder.append(sep); - sep = ""; - String str = Long.toHexString(i); - for (int j = str.length(); j < 8; j++) - builder.append('0'); - builder.append(str); - for (int j = 0; j < width; j++) { - if (j == width / 2) - builder.append(' '); - if (i + j < offset || i + j >= offset + len) { - builder.append(" "); - - } else { - builder.append(' '); - int ch = bytes.readUnsignedByte(i + j); - builder.append(HEXI_DECIMAL[ch >> 4]); - builder.append(HEXI_DECIMAL[ch & 15]); - } - } - builder.append(' '); - for (int j = 0; j < width; j++) { - if (j == width / 2) - builder.append(' '); - if (i + j < offset || i + j >= offset + len) { - builder.append(' '); - - } else { - int ch = bytes.readUnsignedByte(i + j); - if (ch < ' ' || ch > 126) - ch = '\u00B7'; - builder.append((char) ch); - } - } - builder.append("\n"); - } - return builder.toString(); - } finally { - bytes.readLimit(limit); - bytes.readPosition(position); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java b/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java new file mode 100644 index 0000000..6c0a6b9 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.google.common.primitives.Longs; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import org.apache.cassandra.audit.FullQueryLogger; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.binlog.BinLog; + +public abstract class FQLQuery implements Comparable<FQLQuery> +{ + public final long queryStartTime; + public final QueryOptions queryOptions; + public final int protocolVersion; + public final String keyspace; + public final long generatedTimestamp; + private final int generatedNowInSeconds; + + public FQLQuery(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds) + { + this.queryStartTime = queryStartTime; + this.queryOptions = queryOptions; + this.protocolVersion = protocolVersion; + this.keyspace = keyspace; + this.generatedTimestamp = generatedTimestamp; + this.generatedNowInSeconds = generatedNowInSeconds; + } + + public abstract Statement toStatement(); + + /** + * used when storing the queries executed + */ + public abstract BinLog.ReleaseableWriteMarshallable toMarshallable(); + + public QueryState queryState() + { + ClientState clientState = keyspace != null ? ClientState.forInternalCalls(keyspace) : ClientState.forInternalCalls(); + + return new QueryState(clientState, generatedTimestamp, generatedNowInSeconds); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof FQLQuery)) return false; + FQLQuery fqlQuery = (FQLQuery) o; + return queryStartTime == fqlQuery.queryStartTime && + protocolVersion == fqlQuery.protocolVersion && + generatedTimestamp == fqlQuery.generatedTimestamp && + generatedNowInSeconds == fqlQuery.generatedNowInSeconds && + Objects.equals(queryOptions.getValues(), fqlQuery.queryOptions.getValues()) && + Objects.equals(keyspace, fqlQuery.keyspace); + } + + public int hashCode() + { + return Objects.hash(queryStartTime, queryOptions, protocolVersion, keyspace, generatedTimestamp, generatedNowInSeconds); + } + + public int compareTo(FQLQuery other) + { + int cmp = Longs.compare(queryStartTime, other.queryStartTime); + if (cmp != 0) + return cmp; + cmp = Longs.compare(generatedTimestamp, other.generatedTimestamp); + if (cmp != 0) + return cmp; + + return Longs.compare(generatedNowInSeconds, other.generatedNowInSeconds); + } + + public String toString() + { + return "FQLQuery{" + + "queryStartTime=" + queryStartTime + + ", protocolVersion=" + protocolVersion + + ", keyspace='" + keyspace + '\'' + + ", generatedTimestamp=" + generatedTimestamp + + ", generatedNowInSeconds=" + generatedNowInSeconds + + '}'; + } + + public static class Single extends FQLQuery + { + public final String query; + public final List<ByteBuffer> values; + + public Single(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, String queryString, List<ByteBuffer> values) + { + super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds); + this.query = queryString; + this.values = values; + } + + @Override + public String toString() + { + return String.format("%s%nQuery = %s, Values = %s", + super.toString(), + query, + values.stream().map(ByteBufferUtil::bytesToHex).collect(Collectors.joining(","))); + } + + public Statement toStatement() + { + SimpleStatement ss = new SimpleStatement(query, values.toArray()); + ss.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name())); + ss.setDefaultTimestamp(generatedTimestamp); + return ss; + } + + public BinLog.ReleaseableWriteMarshallable toMarshallable() + { + + return new FullQueryLogger.Query(query, queryOptions, queryState(), queryStartTime); + } + + public int compareTo(FQLQuery other) + { + int cmp = super.compareTo(other); + + if (cmp == 0) + { + if (other instanceof Batch) + return -1; + + Single singleQuery = (Single) other; + + cmp = query.compareTo(singleQuery.query); + if (cmp == 0) + { + if (values.size() != singleQuery.values.size()) + return values.size() - singleQuery.values.size(); + for (int i = 0; i < values.size(); i++) + { + cmp = values.get(i).compareTo(singleQuery.values.get(i)); + if (cmp != 0) + return cmp; + } + } + } + return cmp; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof Single)) return false; + if (!super.equals(o)) return false; + Single single = (Single) o; + return Objects.equals(query, single.query) && + Objects.equals(values, single.values); + } + + public int hashCode() + { + return Objects.hash(super.hashCode(), query, values); + } + } + + public static class Batch extends FQLQuery + { + public final BatchStatement.Type batchType; + public final List<Single> queries; + + public Batch(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, BatchStatement.Type batchType, List<String> queries, List<List<ByteBuffer>> values) + { + super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds); + this.batchType = batchType; + this.queries = new ArrayList<>(queries.size()); + for (int i = 0; i < queries.size(); i++) + this.queries.add(new Single(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds, queries.get(i), values.get(i))); + } + + public Statement toStatement() + { + BatchStatement bs = new BatchStatement(batchType); + + for (Single query : queries) + { + bs.add(new SimpleStatement(query.query, query.values.toArray())); + } + bs.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name())); + bs.setDefaultTimestamp(generatedTimestamp); // todo: set actual server side generated time + return bs; + } + + public int compareTo(FQLQuery other) + { + int cmp = super.compareTo(other); + + if (cmp == 0) + { + if (other instanceof Single) + return 1; + + Batch otherBatch = (Batch) other; + if (queries.size() != otherBatch.queries.size()) + return queries.size() - otherBatch.queries.size(); + for (int i = 0; i < queries.size(); i++) + { + cmp = queries.get(i).compareTo(otherBatch.queries.get(i)); + if (cmp != 0) + return cmp; + } + } + return cmp; + } + + public BinLog.ReleaseableWriteMarshallable toMarshallable() + { + List<String> queryStrings = new ArrayList<>(); + List<List<ByteBuffer>> values = new ArrayList<>(); + for (Single q : queries) + { + queryStrings.add(q.query); + values.add(q.values); + } + return new FullQueryLogger.Batch(org.apache.cassandra.cql3.statements.BatchStatement.Type.valueOf(batchType.name()), queryStrings, values, queryOptions, queryState(), queryStartTime); + } + + public String toString() + { + StringBuilder sb = new StringBuilder(super.toString()).append("\nbatch: ").append(batchType).append('\n'); + for (Single q : queries) + sb.append(q.toString()).append('\n'); + sb.append("end batch"); + return sb.toString(); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof Batch)) return false; + if (!super.equals(o)) return false; + Batch batch = (Batch) o; + return batchType == batch.batchType && + Objects.equals(queries, batch.queries); + } + + public int hashCode() + { + return Objects.hash(super.hashCode(), batchType, queries); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java new file mode 100644 index 0000000..390a52e --- /dev/null +++ b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.util.PriorityQueue; + +import net.openhft.chronicle.queue.ExcerptTailer; +import org.apache.cassandra.utils.AbstractIterator; + +public class FQLQueryIterator extends AbstractIterator<FQLQuery> +{ + // use a priority queue to be able to sort the head of the query logs in memory + private final PriorityQueue<FQLQuery> pq; + private final ExcerptTailer tailer; + private final FQLQueryReader reader; + + /** + * Create an iterator over the FQLQueries in tailer + * + * Reads up to readAhead queries in to memory to be able to sort them (the files are mostly sorted already) + */ + public FQLQueryIterator(ExcerptTailer tailer, int readAhead) + { + assert readAhead > 0 : "readAhead needs to be > 0"; + reader = new FQLQueryReader(); + this.tailer = tailer; + pq = new PriorityQueue<>(readAhead); + for (int i = 0; i < readAhead; i++) + { + FQLQuery next = readNext(); + if (next != null) + pq.add(next); + else + break; + } + } + + protected FQLQuery computeNext() + { + FQLQuery q = pq.poll(); + if (q == null) + return endOfData(); + FQLQuery next = readNext(); + if (next != null) + pq.add(next); + return q; + } + + private FQLQuery readNext() + { + if (tailer.readDocument(reader)) + return reader.getQuery(); + return null; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java new file mode 100644 index 0000000..af77c59 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import com.datastax.driver.core.BatchStatement; +import io.netty.buffer.Unpooled; +import net.openhft.chronicle.core.io.IORuntimeException; +import net.openhft.chronicle.wire.ReadMarshallable; +import net.openhft.chronicle.wire.ValueIn; +import net.openhft.chronicle.wire.WireIn; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.transport.ProtocolVersion; + +import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_NOW_IN_SECONDS; +import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_TIMESTAMP; +import static org.apache.cassandra.audit.FullQueryLogger.KEYSPACE; +import static org.apache.cassandra.audit.FullQueryLogger.PROTOCOL_VERSION; +import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS; +import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME; +import static org.apache.cassandra.audit.FullQueryLogger.TYPE; +import static org.apache.cassandra.audit.FullQueryLogger.VERSION; +import static org.apache.cassandra.audit.FullQueryLogger.BATCH; +import static org.apache.cassandra.audit.FullQueryLogger.BATCH_TYPE; +import static org.apache.cassandra.audit.FullQueryLogger.QUERIES; +import static org.apache.cassandra.audit.FullQueryLogger.QUERY; +import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY; +import static org.apache.cassandra.audit.FullQueryLogger.VALUES; + +public class FQLQueryReader implements ReadMarshallable +{ + private FQLQuery query; + + public void readMarshallable(WireIn wireIn) throws IORuntimeException + { + int currentVersion = wireIn.read(VERSION).int16(); + String type = wireIn.read(TYPE).text(); + long queryStartTime = wireIn.read(QUERY_START_TIME).int64(); + int protocolVersion = wireIn.read(PROTOCOL_VERSION).int32(); + QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(QUERY_OPTIONS).bytes()), ProtocolVersion.decode(protocolVersion)); + long generatedTimestamp = wireIn.read(GENERATED_TIMESTAMP).int64(); + int generatedNowInSeconds = wireIn.read(GENERATED_NOW_IN_SECONDS).int32(); + String keyspace = wireIn.read(KEYSPACE).text(); + + switch (type) + { + case SINGLE_QUERY: + String queryString = wireIn.read(QUERY).text(); + query = new FQLQuery.Single(keyspace, + protocolVersion, + queryOptions, + queryStartTime, + generatedTimestamp, + generatedNowInSeconds, + queryString, + queryOptions.getValues()); + break; + case BATCH: + BatchStatement.Type batchType = BatchStatement.Type.valueOf(wireIn.read(BATCH_TYPE).text()); + ValueIn in = wireIn.read(QUERIES); + int queryCount = in.int32(); + + List<String> queries = new ArrayList<>(queryCount); + for (int i = 0; i < queryCount; i++) + queries.add(in.text()); + in = wireIn.read(VALUES); + int valueCount = in.int32(); + List<List<ByteBuffer>> values = new ArrayList<>(valueCount); + for (int ii = 0; ii < valueCount; ii++) + { + List<ByteBuffer> subValues = new ArrayList<>(); + values.add(subValues); + int numSubValues = in.int32(); + for (int zz = 0; zz < numSubValues; zz++) + subValues.add(ByteBuffer.wrap(in.bytes())); + } + query = new FQLQuery.Batch(keyspace, + protocolVersion, + queryOptions, + queryStartTime, + generatedTimestamp, + generatedNowInSeconds, + batchType, + queries, + values); + break; + default: + throw new RuntimeException("Unknown type: " + type); + } + } + + public FQLQuery getQuery() + { + return query; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java b/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java new file mode 100644 index 0000000..0c8382f --- /dev/null +++ b/src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.io.Closeable; +import java.io.File; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import org.apache.cassandra.utils.FBUtilities; + +public class QueryReplayer implements Closeable +{ + private static final int PRINT_RATE = 5000; + private final ExecutorService es = Executors.newFixedThreadPool(1); + private final Iterator<List<FQLQuery>> queryIterator; + private final List<Cluster> targetClusters; + private final List<Predicate<FQLQuery>> filters; + private final List<Session> sessions; + private final ResultHandler resultHandler; + private final MetricRegistry metrics = new MetricRegistry(); + private final boolean debug; + private final PrintStream out; + + public QueryReplayer(Iterator<List<FQLQuery>> queryIterator, + List<String> targetHosts, + List<File> resultPaths, + List<Predicate<FQLQuery>> filters, + PrintStream out, + String queryFilePathString, + boolean debug) + { + this.queryIterator = queryIterator; + targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList()); + this.filters = filters; + sessions = targetClusters.stream().map(Cluster::connect).collect(Collectors.toList()); + File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null; + resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath); + this.debug = debug; + this.out = out; + } + + public void replay() + { + while (queryIterator.hasNext()) + { + List<FQLQuery> queries = queryIterator.next(); + for (FQLQuery query : queries) + { + if (filters.stream().anyMatch(f -> !f.test(query))) + continue; + try (Timer.Context ctx = metrics.timer("queries").time()) + { + List<ListenableFuture<ResultHandler.ComparableResultSet>> results = new ArrayList<>(sessions.size()); + Statement statement = query.toStatement(); + for (Session session : sessions) + { + try + { + if (query.keyspace != null && !query.keyspace.equals(session.getLoggedKeyspace())) + { + if (debug) + out.printf("Switching keyspace from %s to %s%n", session.getLoggedKeyspace(), query.keyspace); + session.execute("USE " + query.keyspace); + } + } + catch (Throwable t) + { + out.printf("USE %s failed: %s%n", query.keyspace, t.getMessage()); + } + if (debug) + { + out.println("Executing query:"); + out.println(query); + } + ListenableFuture<ResultSet> future = session.executeAsync(statement); + results.add(handleErrors(future)); + } + + ListenableFuture<List<ResultHandler.ComparableResultSet>> resultList = Futures.allAsList(results); + + Futures.addCallback(resultList, new FutureCallback<List<ResultHandler.ComparableResultSet>>() + { + public void onSuccess(@Nullable List<ResultHandler.ComparableResultSet> resultSets) + { + // note that the order of resultSets is signifcant here - resultSets.get(x) should + // be the result from a query against targetHosts.get(x) + resultHandler.handleResults(query, resultSets); + } + + public void onFailure(Throwable throwable) + { + throw new AssertionError("Errors should be handled in FQLQuery.execute", throwable); + } + }, es); + + FBUtilities.waitOnFuture(resultList); + } + catch (Throwable t) + { + out.printf("QUERY %s got exception: %s", query, t.getMessage()); + } + + Timer timer = metrics.timer("queries"); + if (timer.getCount() % PRINT_RATE == 0) + out.printf("%d queries, rate = %.2f%n", timer.getCount(), timer.getOneMinuteRate()); + } + } + } + + /** + * Make sure we catch any query errors + * + * On error, this creates a failed ComparableResultSet with the exception set to be able to store + * this fact in the result file and handle comparison of failed result sets. + */ + private static ListenableFuture<ResultHandler.ComparableResultSet> handleErrors(ListenableFuture<ResultSet> result) + { + FluentFuture<ResultHandler.ComparableResultSet> fluentFuture = FluentFuture.from(result) + .transform(DriverResultSet::new, MoreExecutors.directExecutor()); + return fluentFuture.catching(Throwable.class, DriverResultSet::failed, MoreExecutors.directExecutor()); + } + + public void close() + { + sessions.forEach(Session::close); + targetClusters.forEach(Cluster::close); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java b/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java new file mode 100644 index 0000000..4bbaf7a --- /dev/null +++ b/src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.collect.Streams; + +public class ResultComparator +{ + /** + * Compares the rows in rows + * the row at position x in rows will have come from host at position x in targetHosts + */ + public boolean compareRows(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows) + { + if (rows.size() < 2 || rows.stream().allMatch(Objects::isNull)) + return true; + + if (rows.stream().anyMatch(Objects::isNull)) + { + handleMismatch(targetHosts, query, rows); + return false; + } + + ResultHandler.ComparableRow ref = rows.get(0); + boolean equal = true; + for (int i = 1; i < rows.size(); i++) + { + ResultHandler.ComparableRow compare = rows.get(i); + if (!ref.equals(compare)) + equal = false; + } + if (!equal) + handleMismatch(targetHosts, query, rows); + return equal; + } + + /** + * Compares the column definitions + * + * the column definitions at position x in cds will have come from host at position x in targetHosts + */ + public boolean compareColumnDefinitions(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds) + { + if (cds.size() < 2) + return true; + + boolean equal = true; + List<ResultHandler.ComparableDefinition> refDefs = cds.get(0).asList(); + for (int i = 1; i < cds.size(); i++) + { + List<ResultHandler.ComparableDefinition> toCompare = cds.get(i).asList(); + if (!refDefs.equals(toCompare)) + equal = false; + } + if (!equal) + handleColumnDefMismatch(targetHosts, query, cds); + return equal; + } + + private void handleMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows) + { + System.out.println("MISMATCH:"); + System.out.println("Query = " + query); + System.out.println("Results:"); + System.out.println(Streams.zip(rows.stream(), targetHosts.stream(), (r, host) -> String.format("%s: %s%n", host, r == null ? "null" : r)).collect(Collectors.joining())); + } + + private void handleColumnDefMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds) + { + System.out.println("COLUMN DEFINITION MISMATCH:"); + System.out.println("Query = " + query); + System.out.println("Results: "); + System.out.println(Streams.zip(cds.stream(), targetHosts.stream(), (cd, host) -> String.format("%s: %s%n", host, columnDefinitionsString(cd))).collect(Collectors.joining())); + } + + private String columnDefinitionsString(ResultHandler.ComparableColumnDefinitions cd) + { + StringBuilder sb = new StringBuilder(); + if (cd == null) + sb.append("NULL"); + else if (cd.wasFailed()) + sb.append("FAILED"); + else + { + for (ResultHandler.ComparableDefinition def : cd) + { + sb.append(def.toString()); + } + } + return sb.toString(); + } + + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java b/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java new file mode 100644 index 0000000..c769231 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; + +public class ResultHandler +{ + private final ResultStore resultStore; + private final ResultComparator resultComparator; + private final List<String> targetHosts; + + public ResultHandler(List<String> targetHosts, List<File> resultPaths, File queryFilePath) + { + this.targetHosts = targetHosts; + resultStore = resultPaths != null ? new ResultStore(resultPaths, queryFilePath) : null; + resultComparator = new ResultComparator(); + } + + /** + * Since we can't iterate a ResultSet more than once, and we don't want to keep the entire result set in memory + * we feed the rows one-by-one to resultComparator and resultStore. + * + * results.get(x) should be the results from executing query against targetHosts.get(x) + */ + public void handleResults(FQLQuery query, List<ComparableResultSet> results) + { + for (int i = 0; i < targetHosts.size(); i++) + { + if (results.get(i).wasFailed()) + { + System.out.println("Query against "+targetHosts.get(i)+" failure:"); + System.out.println(query); + System.out.println("Message: "+results.get(i).getFailureException().getMessage()); + } + } + + List<ComparableColumnDefinitions> columnDefinitions = results.stream().map(ComparableResultSet::getColumnDefinitions).collect(Collectors.toList()); + resultComparator.compareColumnDefinitions(targetHosts, query, columnDefinitions); + if (resultStore != null) + resultStore.storeColumnDefinitions(query, columnDefinitions); + List<Iterator<ComparableRow>> iters = results.stream().map(Iterable::iterator).collect(Collectors.toList()); + + while (true) + { + List<ComparableRow> rows = rows(iters); + resultComparator.compareRows(targetHosts, query, rows); + if (resultStore != null) + resultStore.storeRows(rows); + // all rows being null marks end of all resultsets, we need to call compareRows + // and storeRows once with everything null to mark that fact + if (rows.stream().allMatch(Objects::isNull)) + return; + } + } + + /** + * Get the first row from each of the iterators, if the iterator has run out, null will mark that in the list + */ + @VisibleForTesting + public static List<ComparableRow> rows(List<Iterator<ComparableRow>> iters) + { + List<ComparableRow> rows = new ArrayList<>(iters.size()); + for (Iterator<ComparableRow> iter : iters) + { + if (iter.hasNext()) + rows.add(iter.next()); + else + rows.add(null); + } + return rows; + } + + public interface ComparableResultSet extends Iterable<ComparableRow> + { + public ComparableColumnDefinitions getColumnDefinitions(); + public boolean wasFailed(); + public Throwable getFailureException(); + } + + public interface ComparableColumnDefinitions extends Iterable<ComparableDefinition> + { + public List<ComparableDefinition> asList(); + public boolean wasFailed(); + public int size(); + } + + public interface ComparableDefinition + { + public String getType(); + public String getName(); + } + + public interface ComparableRow + { + public ByteBuffer getBytesUnsafe(int i); + public ComparableColumnDefinitions getColumnDefinitions(); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java b/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java new file mode 100644 index 0000000..6d6aaac --- /dev/null +++ b/src/java/org/apache/cassandra/tools/fqltool/ResultStore.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import net.openhft.chronicle.bytes.BytesStore; +import net.openhft.chronicle.core.io.Closeable; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; +import net.openhft.chronicle.queue.ExcerptAppender; +import net.openhft.chronicle.wire.ValueOut; +import org.apache.cassandra.utils.binlog.BinLog; + +/** + * see FQLReplayTest#readResultFile for how to read files produced by this class + */ +public class ResultStore +{ + private final List<ChronicleQueue> queues; + private final List<ExcerptAppender> appenders; + private final ChronicleQueue queryStoreQueue; + private final ExcerptAppender queryStoreAppender; + private final Set<Integer> finishedHosts = new HashSet<>(); + + public ResultStore(List<File> resultPaths, File queryFilePath) + { + queues = resultPaths.stream().map(path -> ChronicleQueueBuilder.single(path).build()).collect(Collectors.toList()); + appenders = queues.stream().map(ChronicleQueue::acquireAppender).collect(Collectors.toList()); + queryStoreQueue = queryFilePath != null ? ChronicleQueueBuilder.single(queryFilePath).build() : null; + queryStoreAppender = queryStoreQueue != null ? queryStoreQueue.acquireAppender() : null; + } + + /** + * Store the column definitions in cds + * + * the ColumnDefinitions at position x will get stored by the appender at position x + * + * Calling this method indicates that we are starting a new result set from a query, it must be called before + * calling storeRows. + * + */ + public void storeColumnDefinitions(FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds) + { + finishedHosts.clear(); + if (queryStoreAppender != null) + { + BinLog.ReleaseableWriteMarshallable writeMarshallableQuery = query.toMarshallable(); + queryStoreAppender.writeDocument(writeMarshallableQuery); + writeMarshallableQuery.release(); + } + for (int i = 0; i < cds.size(); i++) + { + ResultHandler.ComparableColumnDefinitions cd = cds.get(i); + appenders.get(i).writeDocument(wire -> + { + if (!cd.wasFailed()) + { + wire.write("type").text("column_definitions"); + wire.write("column_count").int32(cd.size()); + for (ResultHandler.ComparableDefinition d : cd.asList()) + { + ValueOut vo = wire.write("column_definition"); + vo.text(d.getName()); + vo.text(d.getType()); + } + } + else + { + wire.write("type").text("query_failed"); + } + }); + } + } + + /** + * Store rows + * + * the row at position x will get stored by appender at position x + * + * Before calling this for a new result set, storeColumnDefinitions must be called. + */ + public void storeRows(List<ResultHandler.ComparableRow> rows) + { + for (int i = 0; i < rows.size(); i++) + { + ResultHandler.ComparableRow row = rows.get(i); + if (row == null && !finishedHosts.contains(i)) + { + appenders.get(i).writeDocument(wire -> wire.write("type").text("end_resultset")); + finishedHosts.add(i); + } + else if (row != null) + { + appenders.get(i).writeDocument(wire -> + { + { + wire.write("type").text("row"); + wire.write("row_column_count").int32(row.getColumnDefinitions().size()); + for (int jj = 0; jj < row.getColumnDefinitions().size(); jj++) + { + ByteBuffer bb = row.getBytesUnsafe(jj); + if (bb != null) + wire.write("column").bytes(BytesStore.wrap(bb)); + else + wire.write("column").bytes("NULL".getBytes()); + } + } + }); + } + } + } + + public void close() + { + queues.forEach(Closeable::close); + if (queryStoreQueue != null) + queryStoreQueue.close(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/62ffb772/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java b/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java new file mode 100644 index 0000000..5c23d3e --- /dev/null +++ b/src/java/org/apache/cassandra/tools/fqltool/commands/Dump.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool.commands; + +import java.io.File; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import io.netty.buffer.Unpooled; +import net.openhft.chronicle.bytes.Bytes; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; +import net.openhft.chronicle.queue.ExcerptTailer; +import net.openhft.chronicle.queue.RollCycles; +import net.openhft.chronicle.threads.Pauser; +import net.openhft.chronicle.wire.ReadMarshallable; +import net.openhft.chronicle.wire.ValueIn; +import net.openhft.chronicle.wire.WireIn; +import org.apache.cassandra.audit.FullQueryLogger; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.transport.ProtocolVersion; + +/** + * Dump the contents of a list of paths containing full query logs + */ +@Command(name = "dump", description = "Dump the contents of a full query log") +public class Dump implements Runnable +{ + static final char[] HEXI_DECIMAL = "0123456789ABCDEF".toCharArray(); + + @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Path containing the full query logs to dump.", required = true) + private List<String> arguments = new ArrayList<>(); + + @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file was rolled. May be necessary for Chronicle to correctly parse file names. (MINUTELY, HOURLY, DAILY). Default HOURLY.") + private String rollCycle = "HOURLY"; + + @Option(title = "follow", name = {"--follow"}, description = "Upon reacahing the end of the log continue indefinitely waiting for more records") + private boolean follow = false; + + @Override + public void run() + { + dump(arguments, rollCycle, follow); + } + + public static void dump(List<String> arguments, String rollCycle, boolean follow) + { + StringBuilder sb = new StringBuilder(); + ReadMarshallable reader = wireIn -> + { + sb.setLength(0); + + int version = wireIn.read(FullQueryLogger.VERSION).int16(); + if (version != FullQueryLogger.CURRENT_VERSION) + throw new UnsupportedOperationException("Full query log of unexpected version " + version + " encountered"); + + String type = wireIn.read(FullQueryLogger.TYPE).text(); + sb.append("Type: ") + .append(type) + .append(System.lineSeparator()); + + long queryStartTime = wireIn.read(FullQueryLogger.QUERY_START_TIME).int64(); + sb.append("Query start time: ") + .append(queryStartTime) + .append(System.lineSeparator()); + + int protocolVersion = wireIn.read(FullQueryLogger.PROTOCOL_VERSION).int32(); + sb.append("Protocol version: ") + .append(protocolVersion) + .append(System.lineSeparator()); + + QueryOptions options = + QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(FullQueryLogger.QUERY_OPTIONS).bytes()), + ProtocolVersion.decode(protocolVersion)); + + long generatedTimestamp = wireIn.read(FullQueryLogger.GENERATED_TIMESTAMP).int64(); + sb.append("Generated timestamp:") + .append(generatedTimestamp) + .append(System.lineSeparator()); + + int generatedNowInSeconds = wireIn.read(FullQueryLogger.GENERATED_NOW_IN_SECONDS).int32(); + sb.append("Generated nowInSeconds:") + .append(generatedNowInSeconds) + .append(System.lineSeparator()); + + switch (type) + { + case (FullQueryLogger.SINGLE_QUERY): + dumpQuery(options, wireIn, sb); + break; + + case (FullQueryLogger.BATCH): + dumpBatch(options, wireIn, sb); + break; + + default: + throw new UnsupportedOperationException("Log entry of unsupported type " + type); + } + + System.out.print(sb.toString()); + System.out.flush(); + }; + + //Backoff strategy for spinning on the queue, not aggressive at all as this doesn't need to be low latency + Pauser pauser = Pauser.millis(100); + List<ChronicleQueue> queues = arguments.stream().distinct().map(path -> ChronicleQueueBuilder.single(new File(path)).readOnly(true).rollCycle(RollCycles.valueOf(rollCycle)).build()).collect(Collectors.toList()); + List<ExcerptTailer> tailers = queues.stream().map(ChronicleQueue::createTailer).collect(Collectors.toList()); + boolean hadWork = true; + while (hadWork) + { + hadWork = false; + for (ExcerptTailer tailer : tailers) + { + while (tailer.readDocument(reader)) + { + hadWork = true; + } + } + + if (follow) + { + if (!hadWork) + { + //Chronicle queue doesn't support blocking so use this backoff strategy + pauser.pause(); + } + //Don't terminate the loop even if there wasn't work + hadWork = true; + } + } + } + + private static void dumpQuery(QueryOptions options, WireIn wireIn, StringBuilder sb) + { + sb.append("Query: ") + .append(wireIn.read(FullQueryLogger.QUERY).text()) + .append(System.lineSeparator()); + + List<ByteBuffer> values = options.getValues() != null + ? options.getValues() + : Collections.emptyList(); + + sb.append("Values: ") + .append(System.lineSeparator()); + appendValuesToStringBuilder(values, sb); + sb.append(System.lineSeparator()); + } + + private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder sb) + { + sb.append("Batch type: ") + .append(wireIn.read(FullQueryLogger.BATCH_TYPE).text()) + .append(System.lineSeparator()); + + ValueIn in = wireIn.read(FullQueryLogger.QUERIES); + int numQueries = in.int32(); + List<String> queries = new ArrayList<>(numQueries); + for (int i = 0; i < numQueries; i++) + queries.add(in.text()); + + in = wireIn.read(FullQueryLogger.VALUES); + int numValues = in.int32(); + + for (int i = 0; i < numValues; i++) + { + int numSubValues = in.int32(); + List<ByteBuffer> subValues = new ArrayList<>(numSubValues); + for (int j = 0; j < numSubValues; j++) + subValues.add(ByteBuffer.wrap(in.bytes())); + + sb.append("Query: ") + .append(queries.get(i)) + .append(System.lineSeparator()); + + sb.append("Values: ") + .append(System.lineSeparator()); + appendValuesToStringBuilder(subValues, sb); + } + + sb.append(System.lineSeparator()); + } + + private static void appendValuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb) + { + boolean first = true; + for (ByteBuffer value : values) + { + Bytes bytes = Bytes.wrapForRead(value); + long maxLength2 = Math.min(1024, bytes.readLimit() - bytes.readPosition()); + toHexString(bytes, bytes.readPosition(), maxLength2, sb); + if (maxLength2 < bytes.readLimit() - bytes.readPosition()) + { + sb.append("... truncated").append(System.lineSeparator()); + } + + if (first) + { + first = false; + } + else + { + sb.append("-----").append(System.lineSeparator()); + } + } + } + + //This is from net.openhft.chronicle.bytes, need to pass in the StringBuilder so had to copy + /* + * Copyright 2016 higherfrequencytrading.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + /** + * display the hex data of {@link Bytes} from the position() to the limit() + * + * @param bytes the buffer you wish to toString() + * @return hex representation of the buffer, from example [0D ,OA, FF] + */ + public static String toHexString(final Bytes bytes, long offset, long len, StringBuilder builder) + throws BufferUnderflowException + { + if (len == 0) + return ""; + + int width = 16; + int[] lastLine = new int[width]; + String sep = ""; + long position = bytes.readPosition(); + long limit = bytes.readLimit(); + + try { + bytes.readPositionRemaining(offset, len); + + long start = offset / width * width; + long end = (offset + len + width - 1) / width * width; + for (long i = start; i < end; i += width) { + // check for duplicate rows + if (i + width < end) { + boolean same = true; + + for (int j = 0; j < width && i + j < offset + len; j++) { + int ch = bytes.readUnsignedByte(i + j); + same &= (ch == lastLine[j]); + lastLine[j] = ch; + } + if (i > start && same) { + sep = "........\n"; + continue; + } + } + builder.append(sep); + sep = ""; + String str = Long.toHexString(i); + for (int j = str.length(); j < 8; j++) + builder.append('0'); + builder.append(str); + for (int j = 0; j < width; j++) { + if (j == width / 2) + builder.append(' '); + if (i + j < offset || i + j >= offset + len) { + builder.append(" "); + + } else { + builder.append(' '); + int ch = bytes.readUnsignedByte(i + j); + builder.append(HEXI_DECIMAL[ch >> 4]); + builder.append(HEXI_DECIMAL[ch & 15]); + } + } + builder.append(' '); + for (int j = 0; j < width; j++) { + if (j == width / 2) + builder.append(' '); + if (i + j < offset || i + j >= offset + len) { + builder.append(' '); + + } else { + int ch = bytes.readUnsignedByte(i + j); + if (ch < ' ' || ch > 126) + ch = '\u00B7'; + builder.append((char) ch); + } + } + builder.append("\n"); + } + return builder.toString(); + } finally { + bytes.readLimit(limit); + bytes.readPosition(position); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
