http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java new file mode 100644 index 0000000..d8d419a --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultComparator.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool; + + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.collect.Streams; + +public class ResultComparator +{ + /** + * Compares the rows in rows + * the row at position x in rows will have come from host at position x in targetHosts + */ + public boolean compareRows(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows) + { + if (rows.size() < 2 || rows.stream().allMatch(Objects::isNull)) + return true; + + if (rows.stream().anyMatch(Objects::isNull)) + { + handleMismatch(targetHosts, query, rows); + return false; + } + + ResultHandler.ComparableRow ref = rows.get(0); + boolean equal = true; + for (int i = 1; i < rows.size(); i++) + { + ResultHandler.ComparableRow compare = rows.get(i); + if (!ref.equals(compare)) + equal = false; + } + if (!equal) + handleMismatch(targetHosts, query, rows); + return equal; + } + + /** + * Compares the column definitions + * + * the column definitions at position x in cds will have come from host at position x in targetHosts + */ + public boolean compareColumnDefinitions(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds) + { + if (cds.size() < 2) + return true; + + boolean equal = true; + List<ResultHandler.ComparableDefinition> refDefs = cds.get(0).asList(); + for (int i = 1; i < cds.size(); i++) + { + List<ResultHandler.ComparableDefinition> toCompare = cds.get(i).asList(); + if (!refDefs.equals(toCompare)) + equal = false; + } + if (!equal) + handleColumnDefMismatch(targetHosts, query, cds); + return equal; + } + + private void handleMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableRow> rows) + { + System.out.println("MISMATCH:"); + System.out.println("Query = " + query); + System.out.println("Results:"); + System.out.println(Streams.zip(rows.stream(), targetHosts.stream(), (r, host) -> String.format("%s: %s%n", host, r == null ? "null" : r)).collect(Collectors.joining())); + } + + private void handleColumnDefMismatch(List<String> targetHosts, FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds) + { + System.out.println("COLUMN DEFINITION MISMATCH:"); + System.out.println("Query = " + query); + System.out.println("Results: "); + System.out.println(Streams.zip(cds.stream(), targetHosts.stream(), (cd, host) -> String.format("%s: %s%n", host, columnDefinitionsString(cd))).collect(Collectors.joining())); + } + + private String columnDefinitionsString(ResultHandler.ComparableColumnDefinitions cd) + { + StringBuilder sb = new StringBuilder(); + if (cd == null) + sb.append("NULL"); + else if (cd.wasFailed()) + sb.append("FAILED"); + else + { + for (ResultHandler.ComparableDefinition def : cd) + { + sb.append(def.toString()); + } + } + return sb.toString(); + } + + + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java new file mode 100644 index 0000000..8c4c018 --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultHandler.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; + +public class ResultHandler implements Closeable +{ + private final ResultStore resultStore; + private final ResultComparator resultComparator; + private final List<String> targetHosts; + + public ResultHandler(List<String> targetHosts, List<File> resultPaths, File queryFilePath) + { + this.targetHosts = targetHosts; + resultStore = resultPaths != null ? new ResultStore(resultPaths, queryFilePath) : null; + resultComparator = new ResultComparator(); + } + + /** + * Since we can't iterate a ResultSet more than once, and we don't want to keep the entire result set in memory + * we feed the rows one-by-one to resultComparator and resultStore. + * + * results.get(x) should be the results from executing query against targetHosts.get(x) + */ + public void handleResults(FQLQuery query, List<ComparableResultSet> results) + { + for (int i = 0; i < targetHosts.size(); i++) + { + if (results.get(i).wasFailed()) + { + System.out.println("Query against "+targetHosts.get(i)+" failure:"); + System.out.println(query); + System.out.println("Message: "+results.get(i).getFailureException().getMessage()); + } + } + + List<ComparableColumnDefinitions> columnDefinitions = results.stream().map(ComparableResultSet::getColumnDefinitions).collect(Collectors.toList()); + resultComparator.compareColumnDefinitions(targetHosts, query, columnDefinitions); + if (resultStore != null) + resultStore.storeColumnDefinitions(query, columnDefinitions); + List<Iterator<ComparableRow>> iters = results.stream().map(Iterable::iterator).collect(Collectors.toList()); + + while (true) + { + List<ComparableRow> rows = rows(iters); + resultComparator.compareRows(targetHosts, query, rows); + if (resultStore != null) + resultStore.storeRows(rows); + // all rows being null marks end of all resultsets, we need to call compareRows + // and storeRows once with everything null to mark that fact + if (rows.stream().allMatch(Objects::isNull)) + return; + } + } + + /** + * Get the first row from each of the iterators, if the iterator has run out, null will mark that in the list + */ + @VisibleForTesting + public static List<ComparableRow> rows(List<Iterator<ComparableRow>> iters) + { + List<ComparableRow> rows = new ArrayList<>(iters.size()); + for (Iterator<ComparableRow> iter : iters) + { + if (iter.hasNext()) + rows.add(iter.next()); + else + rows.add(null); + } + return rows; + } + + public void close() throws IOException + { + if (resultStore != null) + resultStore.close(); + } + + public interface ComparableResultSet extends Iterable<ComparableRow> + { + public ComparableColumnDefinitions getColumnDefinitions(); + public boolean wasFailed(); + public Throwable getFailureException(); + } + + public interface ComparableColumnDefinitions extends Iterable<ComparableDefinition> + { + public List<ComparableDefinition> asList(); + public boolean wasFailed(); + public Throwable getFailureException(); + public int size(); + } + + public interface ComparableDefinition + { + public String getType(); + public String getName(); + } + + public interface ComparableRow + { + public ByteBuffer getBytesUnsafe(int i); + public ComparableColumnDefinitions getColumnDefinitions(); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/ResultStore.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/ResultStore.java b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultStore.java new file mode 100644 index 0000000..d128717 --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/ResultStore.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool; + + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import net.openhft.chronicle.bytes.BytesStore; +import net.openhft.chronicle.core.io.Closeable; +import net.openhft.chronicle.core.io.IORuntimeException; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; +import net.openhft.chronicle.queue.ExcerptAppender; +import net.openhft.chronicle.wire.ReadMarshallable; +import net.openhft.chronicle.wire.ValueIn; +import net.openhft.chronicle.wire.ValueOut; +import net.openhft.chronicle.wire.WireIn; +import net.openhft.chronicle.wire.WireOut; +import net.openhft.chronicle.wire.WriteMarshallable; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.binlog.BinLog; + +/** + * note that we store each row as a separate chronicle document to be able to + * avoid reading up the entire result set in memory when comparing + * + * document formats: + * to mark the start of a new result set: + * ------------------- + * version: int16 + * type: column_definitions + * column_count: int32; + * column_definition: text, text + * column_definition: text, text + * .... + * -------------------- + * + * to mark a failed query: + * --------------------- + * version: int16 + * type: query_failed + * message: text + * --------------------- + * + * row: + * -------------------- + * version: int16 + * type: row + * row_column_count: int32 + * column: bytes + * --------------------- + * + * to mark the end of a result set: + * ------------------- + * version: int16 + * type: end_resultset + * ------------------- + * + */ +public class ResultStore +{ + private static final String VERSION = "version"; + private static final String TYPE = "type"; + // types: + private static final String ROW = "row"; + private static final String END = "end_resultset"; + private static final String FAILURE = "query_failed"; + private static final String COLUMN_DEFINITIONS = "column_definitions"; + // fields: + private static final String COLUMN_DEFINITION = "column_definition"; + private static final String COLUMN_COUNT = "column_count"; + private static final String MESSAGE = "message"; + private static final String ROW_COLUMN_COUNT = "row_column_count"; + private static final String COLUMN = "column"; + + private static final int CURRENT_VERSION = 0; + + private final List<ChronicleQueue> queues; + private final List<ExcerptAppender> appenders; + private final ChronicleQueue queryStoreQueue; + private final ExcerptAppender queryStoreAppender; + private final Set<Integer> finishedHosts = new HashSet<>(); + + public ResultStore(List<File> resultPaths, File queryFilePath) + { + queues = resultPaths.stream().map(path -> ChronicleQueueBuilder.single(path).build()).collect(Collectors.toList()); + appenders = queues.stream().map(ChronicleQueue::acquireAppender).collect(Collectors.toList()); + queryStoreQueue = queryFilePath != null ? ChronicleQueueBuilder.single(queryFilePath).build() : null; + queryStoreAppender = queryStoreQueue != null ? queryStoreQueue.acquireAppender() : null; + } + + /** + * Store the column definitions in cds + * + * the ColumnDefinitions at position x will get stored by the appender at position x + * + * Calling this method indicates that we are starting a new result set from a query, it must be called before + * calling storeRows. + * + */ + public void storeColumnDefinitions(FQLQuery query, List<ResultHandler.ComparableColumnDefinitions> cds) + { + finishedHosts.clear(); + if (queryStoreAppender != null) + { + BinLog.ReleaseableWriteMarshallable writeMarshallableQuery = query.toMarshallable(); + queryStoreAppender.writeDocument(writeMarshallableQuery); + writeMarshallableQuery.release(); + } + for (int i = 0; i < cds.size(); i++) + { + ResultHandler.ComparableColumnDefinitions cd = cds.get(i); + appenders.get(i).writeDocument(new ColumnDefsWriter(cd)); + } + } + + /** + * Store rows + * + * the row at position x will get stored by appender at position x + * + * Before calling this for a new result set, storeColumnDefinitions must be called. + */ + public void storeRows(List<ResultHandler.ComparableRow> rows) + { + for (int i = 0; i < rows.size(); i++) + { + ResultHandler.ComparableRow row = rows.get(i); + if (row == null && !finishedHosts.contains(i)) + { + appenders.get(i).writeDocument(wire -> { + wire.write(VERSION).int16(CURRENT_VERSION); + wire.write(TYPE).text(END); + }); + finishedHosts.add(i); + } + else if (row != null) + { + appenders.get(i).writeDocument(new RowWriter(row)); + } + } + } + + public void close() + { + queues.forEach(Closeable::close); + if (queryStoreQueue != null) + queryStoreQueue.close(); + } + + static class ColumnDefsWriter implements WriteMarshallable + { + private final ResultHandler.ComparableColumnDefinitions defs; + + ColumnDefsWriter(ResultHandler.ComparableColumnDefinitions defs) + { + this.defs = defs; + } + + public void writeMarshallable(WireOut wire) + { + wire.write(VERSION).int16(CURRENT_VERSION); + if (!defs.wasFailed()) + { + wire.write(TYPE).text(COLUMN_DEFINITIONS); + wire.write(COLUMN_COUNT).int32(defs.size()); + for (ResultHandler.ComparableDefinition d : defs.asList()) + { + ValueOut vo = wire.write(COLUMN_DEFINITION); + vo.text(d.getName()); + vo.text(d.getType()); + } + } + else + { + wire.write(TYPE).text(FAILURE); + wire.write(MESSAGE).text(defs.getFailureException().getMessage()); + } + } + } + + static class ColumnDefsReader implements ReadMarshallable + { + boolean wasFailed; + String failureMessage; + List<Pair<String, String>> columnDefinitions = new ArrayList<>(); + + public void readMarshallable(WireIn wire) throws IORuntimeException + { + int version = wire.read(VERSION).int16(); + String type = wire.read(TYPE).text(); + if (type.equals(FAILURE)) + { + wasFailed = true; + failureMessage = wire.read(MESSAGE).text(); + } + else if (type.equals(COLUMN_DEFINITION)) + { + int columnCount = wire.read(COLUMN_COUNT).int32(); + for (int i = 0; i < columnCount; i++) + { + ValueIn vi = wire.read(COLUMN_DEFINITION); + String name = vi.text(); + String dataType = vi.text(); + columnDefinitions.add(Pair.create(name, dataType)); + } + } + } + } + + /** + * read a single row from the wire, or, marks itself finished if we read "end_resultset" + */ + static class RowReader implements ReadMarshallable + { + boolean isFinished; + List<ByteBuffer> rows = new ArrayList<>(); + + public void readMarshallable(WireIn wire) throws IORuntimeException + { + int version = wire.read(VERSION).int32(); + String type = wire.read(TYPE).text(); + if (!type.equals(END)) + { + isFinished = false; + int rowColumnCount = wire.read(ROW_COLUMN_COUNT).int32(); + + for (int i = 0; i < rowColumnCount; i++) + { + byte[] b = wire.read(COLUMN).bytes(); + rows.add(ByteBuffer.wrap(b)); + } + } + else + { + isFinished = true; + } + } + } + + /** + * Writes a single row to the given wire + */ + static class RowWriter implements WriteMarshallable + { + private final ResultHandler.ComparableRow row; + + RowWriter(ResultHandler.ComparableRow row) + { + this.row = row; + } + + public void writeMarshallable(WireOut wire) + { + wire.write(VERSION).int16(CURRENT_VERSION); + wire.write(TYPE).text(ROW); + wire.write(ROW_COLUMN_COUNT).int32(row.getColumnDefinitions().size()); + for (int jj = 0; jj < row.getColumnDefinitions().size(); jj++) + { + ByteBuffer bb = row.getBytesUnsafe(jj); + if (bb != null) + wire.write(COLUMN).bytes(BytesStore.wrap(bb)); + else + wire.write(COLUMN).bytes("NULL".getBytes()); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/StoredResultSet.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/StoredResultSet.java b/tools/fqltool/src/org/apache/cassandra/fqltool/StoredResultSet.java new file mode 100644 index 0000000..b08861d --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/StoredResultSet.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool; + + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import com.google.common.collect.AbstractIterator; + +import net.openhft.chronicle.queue.ExcerptTailer; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; + +/** + * represents a resultset defined by the format in ResultStore on disk + * + * todo: Currently all iterators need to be consumed fully while iterating over result sets + * if this is created from a tailer. This can probably be improved, but for all current uses it is fine. + */ +public class StoredResultSet implements ResultHandler.ComparableResultSet +{ + private final ResultHandler.ComparableColumnDefinitions defs; + public final boolean hasMoreResultSets; + private final Supplier<Iterator<ResultHandler.ComparableRow>> rowIteratorSupplier; + private final boolean wasFailed; + private final Throwable failureException; + + /** + * create a new StoredResultSet + * + * note that we use an iteratorSupplier to be able to iterate over the same in-memory rows several times *in tests* + */ + public StoredResultSet(ResultHandler.ComparableColumnDefinitions defs, + boolean hasMoreResultSets, + boolean wasFailed, + Throwable failure, + Supplier<Iterator<ResultHandler.ComparableRow>> iteratorSupplier) + { + this.defs = defs; + this.hasMoreResultSets = hasMoreResultSets; + this.wasFailed = wasFailed; + this.failureException = failure; + this.rowIteratorSupplier = iteratorSupplier; + } + + /** + * creates a ComparableResultSet based on the data in tailer + */ + public static StoredResultSet fromTailer(ExcerptTailer tailer) + { + ResultStore.ColumnDefsReader reader = new ResultStore.ColumnDefsReader(); + boolean hasMoreResultSets = tailer.readDocument(reader); + ResultHandler.ComparableColumnDefinitions defs = new StoredComparableColumnDefinitions(reader.columnDefinitions, + reader.wasFailed, + new RuntimeException(reader.failureMessage)); + + + Iterator<ResultHandler.ComparableRow> rowIterator = new AbstractIterator<ResultHandler.ComparableRow>() + { + protected ResultHandler.ComparableRow computeNext() + { + ResultStore.RowReader rowReader = new ResultStore.RowReader(); + tailer.readDocument(rowReader); + if (rowReader.isFinished) + return endOfData(); + return new StoredComparableRow(rowReader.rows, defs); + } + }; + + return new StoredResultSet(defs, + hasMoreResultSets, + reader.wasFailed, + new RuntimeException(reader.failureMessage), + () -> rowIterator); + } + + public static ResultHandler.ComparableResultSet failed(String failureMessage) + { + return new FailedComparableResultSet(new RuntimeException(failureMessage)); + } + + public Iterator<ResultHandler.ComparableRow> iterator() + { + return rowIteratorSupplier.get(); + } + + public ResultHandler.ComparableColumnDefinitions getColumnDefinitions() + { + return defs; + } + + public boolean wasFailed() + { + return wasFailed; + } + + public Throwable getFailureException() + { + return failureException; + } + + static class StoredComparableRow implements ResultHandler.ComparableRow + { + private final List<ByteBuffer> row; + private final ResultHandler.ComparableColumnDefinitions cds; + + public StoredComparableRow(List<ByteBuffer> row, ResultHandler.ComparableColumnDefinitions cds) + { + this.row = row; + this.cds = cds; + } + + public ByteBuffer getBytesUnsafe(int i) + { + return row.get(i); + } + + public ResultHandler.ComparableColumnDefinitions getColumnDefinitions() + { + return cds; + } + + public boolean equals(Object other) + { + if (!(other instanceof StoredComparableRow)) + return false; + return row.equals(((StoredComparableRow)other).row); + } + + public String toString() + { + return row.stream().map(ByteBufferUtil::bytesToHex).collect(Collectors.joining(",")); + } + } + + static class StoredComparableColumnDefinitions implements ResultHandler.ComparableColumnDefinitions + { + private final List<ResultHandler.ComparableDefinition> defs; + private final boolean wasFailed; + private final Throwable failureException; + + public StoredComparableColumnDefinitions(List<Pair<String, String>> cds, boolean wasFailed, Throwable failureException) + { + defs = cds != null ? cds.stream().map(StoredComparableDefinition::new).collect(Collectors.toList()) : Collections.emptyList(); + this.wasFailed = wasFailed; + this.failureException = failureException; + } + public List<ResultHandler.ComparableDefinition> asList() + { + return wasFailed() ? Collections.emptyList() : defs; + } + + public boolean wasFailed() + { + return wasFailed; + } + + public Throwable getFailureException() + { + return failureException; + } + + public int size() + { + return asList().size(); + } + + public Iterator<ResultHandler.ComparableDefinition> iterator() + { + return defs.iterator(); + } + + public boolean equals(Object other) + { + if (!(other instanceof StoredComparableColumnDefinitions)) + return false; + return defs.equals(((StoredComparableColumnDefinitions)other).defs); + } + + public String toString() + { + return defs.toString(); + } + } + + private static class StoredComparableDefinition implements ResultHandler.ComparableDefinition + { + private final Pair<String, String> p; + + public StoredComparableDefinition(Pair<String, String> p) + { + this.p = p; + } + public String getType() + { + return p.right; + } + + public String getName() + { + return p.left; + } + + public boolean equals(Object other) + { + if (!(other instanceof StoredComparableDefinition)) + return false; + return p.equals(((StoredComparableDefinition)other).p); + } + + public String toString() + { + return getName() + ':' + getType(); + } + } + + private static class FailedComparableResultSet implements ResultHandler.ComparableResultSet + { + private final Throwable exception; + + public FailedComparableResultSet(Throwable exception) + { + this.exception = exception; + } + public ResultHandler.ComparableColumnDefinitions getColumnDefinitions() + { + return new ResultHandler.ComparableColumnDefinitions() + { + public List<ResultHandler.ComparableDefinition> asList() + { + return Collections.emptyList(); + } + + public boolean wasFailed() + { + return true; + } + + public Throwable getFailureException() + { + return exception; + } + + public int size() + { + return 0; + } + + public Iterator<ResultHandler.ComparableDefinition> iterator() + { + return asList().iterator(); + } + }; + } + + public boolean wasFailed() + { + return true; + } + + public Throwable getFailureException() + { + return new RuntimeException(); + } + + public Iterator<ResultHandler.ComparableRow> iterator() + { + return Collections.emptyListIterator(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Compare.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Compare.java b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Compare.java new file mode 100644 index 0000000..2375296 --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Compare.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool.commands; + + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.AbstractIterator; + +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import net.openhft.chronicle.core.io.Closeable; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; +import net.openhft.chronicle.queue.ExcerptTailer; +import org.apache.cassandra.fqltool.FQLQueryIterator; +import org.apache.cassandra.fqltool.ResultHandler; +import org.apache.cassandra.fqltool.StoredResultSet; + +/** + */ +@Command(name = "compare", description = "Compare result files generated by fqltool replay") +public class Compare implements Runnable +{ + @Arguments(usage = "<path1> [<path2>...<pathN>]", + description = "Directories containing result files to compare.", + required = true) + private List<String> arguments = new ArrayList<>(); + + @Option(title = "queries", + name = { "--queries"}, + description = "Directory to read the queries from. It is produced by the fqltool replay --store-queries option. ", + required = true) + private String querylog; + + @Override + public void run() + { + compare(querylog, arguments); + } + + public static void compare(String querylog, List<String> arguments) + { + List<ChronicleQueue> readQueues = null; + try (ResultHandler rh = new ResultHandler(arguments, null, null); + ChronicleQueue queryQ = ChronicleQueueBuilder.single(querylog).readOnly(true).build(); + FQLQueryIterator queries = new FQLQueryIterator(queryQ.createTailer(), 1)) + { + readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList()); + List<Iterator<ResultHandler.ComparableResultSet>> its = readQueues.stream().map(q -> new StoredResultSetIterator(q.createTailer())).collect(Collectors.toList()); + while (queries.hasNext()) + rh.handleResults(queries.next(), resultSets(its)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + finally + { + if (readQueues != null) + readQueues.forEach(Closeable::close); + } + } + + @VisibleForTesting + public static List<ResultHandler.ComparableResultSet> resultSets(List<Iterator<ResultHandler.ComparableResultSet>> its) + { + List<ResultHandler.ComparableResultSet> resultSets = new ArrayList<>(its.size()); + for (Iterator<ResultHandler.ComparableResultSet> it : its) + { + if (it.hasNext()) + resultSets.add(it.next()); + else + resultSets.add(null); + } + return resultSets; + } + + @VisibleForTesting + public static class StoredResultSetIterator extends AbstractIterator<ResultHandler.ComparableResultSet> + { + private final ExcerptTailer tailer; + + public StoredResultSetIterator(ExcerptTailer tailer) + { + this.tailer = tailer; + } + + protected ResultHandler.ComparableResultSet computeNext() + { + StoredResultSet srs = StoredResultSet.fromTailer(tailer); + if (srs.hasMoreResultSets) + return srs; + return endOfData(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java new file mode 100644 index 0000000..1263a11 --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Dump.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool.commands; + +import java.io.File; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import io.netty.buffer.Unpooled; +import net.openhft.chronicle.bytes.Bytes; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; +import net.openhft.chronicle.queue.ExcerptTailer; +import net.openhft.chronicle.queue.RollCycles; +import net.openhft.chronicle.threads.Pauser; +import net.openhft.chronicle.wire.ReadMarshallable; +import net.openhft.chronicle.wire.ValueIn; +import net.openhft.chronicle.wire.WireIn; +import org.apache.cassandra.audit.FullQueryLogger; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.transport.ProtocolVersion; + +/** + * Dump the contents of a list of paths containing full query logs + */ +@Command(name = "dump", description = "Dump the contents of a full query log") +public class Dump implements Runnable +{ + static final char[] HEXI_DECIMAL = "0123456789ABCDEF".toCharArray(); + + @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Path containing the full query logs to dump.", required = true) + private List<String> arguments = new ArrayList<>(); + + @Option(title = "roll_cycle", name = {"--roll-cycle"}, description = "How often to roll the log file was rolled. May be necessary for Chronicle to correctly parse file names. (MINUTELY, HOURLY, DAILY). Default HOURLY.") + private String rollCycle = "HOURLY"; + + @Option(title = "follow", name = {"--follow"}, description = "Upon reacahing the end of the log continue indefinitely waiting for more records") + private boolean follow = false; + + @Override + public void run() + { + dump(arguments, rollCycle, follow); + } + + public static void dump(List<String> arguments, String rollCycle, boolean follow) + { + StringBuilder sb = new StringBuilder(); + ReadMarshallable reader = wireIn -> + { + sb.setLength(0); + + int version = wireIn.read(FullQueryLogger.VERSION).int16(); + if (version != FullQueryLogger.CURRENT_VERSION) + throw new UnsupportedOperationException("Full query log of unexpected version " + version + " encountered"); + + String type = wireIn.read(FullQueryLogger.TYPE).text(); + sb.append("Type: ") + .append(type) + .append(System.lineSeparator()); + + long queryStartTime = wireIn.read(FullQueryLogger.QUERY_START_TIME).int64(); + sb.append("Query start time: ") + .append(queryStartTime) + .append(System.lineSeparator()); + + int protocolVersion = wireIn.read(FullQueryLogger.PROTOCOL_VERSION).int32(); + sb.append("Protocol version: ") + .append(protocolVersion) + .append(System.lineSeparator()); + + QueryOptions options = + QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(FullQueryLogger.QUERY_OPTIONS).bytes()), + ProtocolVersion.decode(protocolVersion)); + + long generatedTimestamp = wireIn.read(FullQueryLogger.GENERATED_TIMESTAMP).int64(); + sb.append("Generated timestamp:") + .append(generatedTimestamp) + .append(System.lineSeparator()); + + int generatedNowInSeconds = wireIn.read(FullQueryLogger.GENERATED_NOW_IN_SECONDS).int32(); + sb.append("Generated nowInSeconds:") + .append(generatedNowInSeconds) + .append(System.lineSeparator()); + + switch (type) + { + case (FullQueryLogger.SINGLE_QUERY): + dumpQuery(options, wireIn, sb); + break; + + case (FullQueryLogger.BATCH): + dumpBatch(options, wireIn, sb); + break; + + default: + throw new UnsupportedOperationException("Log entry of unsupported type " + type); + } + + System.out.print(sb.toString()); + System.out.flush(); + }; + + //Backoff strategy for spinning on the queue, not aggressive at all as this doesn't need to be low latency + Pauser pauser = Pauser.millis(100); + List<ChronicleQueue> queues = arguments.stream().distinct().map(path -> ChronicleQueueBuilder.single(new File(path)).readOnly(true).rollCycle(RollCycles.valueOf(rollCycle)).build()).collect(Collectors.toList()); + List<ExcerptTailer> tailers = queues.stream().map(ChronicleQueue::createTailer).collect(Collectors.toList()); + boolean hadWork = true; + while (hadWork) + { + hadWork = false; + for (ExcerptTailer tailer : tailers) + { + while (tailer.readDocument(reader)) + { + hadWork = true; + } + } + + if (follow) + { + if (!hadWork) + { + //Chronicle queue doesn't support blocking so use this backoff strategy + pauser.pause(); + } + //Don't terminate the loop even if there wasn't work + hadWork = true; + } + } + } + + private static void dumpQuery(QueryOptions options, WireIn wireIn, StringBuilder sb) + { + sb.append("Query: ") + .append(wireIn.read(FullQueryLogger.QUERY).text()) + .append(System.lineSeparator()); + + List<ByteBuffer> values = options.getValues() != null + ? options.getValues() + : Collections.emptyList(); + + sb.append("Values: ") + .append(System.lineSeparator()); + appendValuesToStringBuilder(values, sb); + sb.append(System.lineSeparator()); + } + + private static void dumpBatch(QueryOptions options, WireIn wireIn, StringBuilder sb) + { + sb.append("Batch type: ") + .append(wireIn.read(FullQueryLogger.BATCH_TYPE).text()) + .append(System.lineSeparator()); + + ValueIn in = wireIn.read(FullQueryLogger.QUERIES); + int numQueries = in.int32(); + List<String> queries = new ArrayList<>(numQueries); + for (int i = 0; i < numQueries; i++) + queries.add(in.text()); + + in = wireIn.read(FullQueryLogger.VALUES); + int numValues = in.int32(); + + for (int i = 0; i < numValues; i++) + { + int numSubValues = in.int32(); + List<ByteBuffer> subValues = new ArrayList<>(numSubValues); + for (int j = 0; j < numSubValues; j++) + subValues.add(ByteBuffer.wrap(in.bytes())); + + sb.append("Query: ") + .append(queries.get(i)) + .append(System.lineSeparator()); + + sb.append("Values: ") + .append(System.lineSeparator()); + appendValuesToStringBuilder(subValues, sb); + } + + sb.append(System.lineSeparator()); + } + + private static void appendValuesToStringBuilder(List<ByteBuffer> values, StringBuilder sb) + { + boolean first = true; + for (ByteBuffer value : values) + { + Bytes bytes = Bytes.wrapForRead(value); + long maxLength2 = Math.min(1024, bytes.readLimit() - bytes.readPosition()); + toHexString(bytes, bytes.readPosition(), maxLength2, sb); + if (maxLength2 < bytes.readLimit() - bytes.readPosition()) + { + sb.append("... truncated").append(System.lineSeparator()); + } + + if (first) + { + first = false; + } + else + { + sb.append("-----").append(System.lineSeparator()); + } + } + } + + //This is from net.openhft.chronicle.bytes, need to pass in the StringBuilder so had to copy + /* + * Copyright 2016 higherfrequencytrading.com + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + /** + * display the hex data of {@link Bytes} from the position() to the limit() + * + * @param bytes the buffer you wish to toString() + * @return hex representation of the buffer, from example [0D ,OA, FF] + */ + public static String toHexString(final Bytes bytes, long offset, long len, StringBuilder builder) + throws BufferUnderflowException + { + if (len == 0) + return ""; + + int width = 16; + int[] lastLine = new int[width]; + String sep = ""; + long position = bytes.readPosition(); + long limit = bytes.readLimit(); + + try { + bytes.readPositionRemaining(offset, len); + + long start = offset / width * width; + long end = (offset + len + width - 1) / width * width; + for (long i = start; i < end; i += width) { + // check for duplicate rows + if (i + width < end) { + boolean same = true; + + for (int j = 0; j < width && i + j < offset + len; j++) { + int ch = bytes.readUnsignedByte(i + j); + same &= (ch == lastLine[j]); + lastLine[j] = ch; + } + if (i > start && same) { + sep = "........\n"; + continue; + } + } + builder.append(sep); + sep = ""; + String str = Long.toHexString(i); + for (int j = str.length(); j < 8; j++) + builder.append('0'); + builder.append(str); + for (int j = 0; j < width; j++) { + if (j == width / 2) + builder.append(' '); + if (i + j < offset || i + j >= offset + len) { + builder.append(" "); + + } else { + builder.append(' '); + int ch = bytes.readUnsignedByte(i + j); + builder.append(HEXI_DECIMAL[ch >> 4]); + builder.append(HEXI_DECIMAL[ch & 15]); + } + } + builder.append(' '); + for (int j = 0; j < width; j++) { + if (j == width / 2) + builder.append(' '); + if (i + j < offset || i + j >= offset + len) { + builder.append(' '); + + } else { + int ch = bytes.readUnsignedByte(i + j); + if (ch < ' ' || ch > 126) + ch = '\u00B7'; + builder.append((char) ch); + } + } + builder.append("\n"); + } + return builder.toString(); + } finally { + bytes.readLimit(limit); + bytes.readPosition(position); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java new file mode 100644 index 0000000..adea742 --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/commands/Replay.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool.commands; + + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; + +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import net.openhft.chronicle.core.io.Closeable; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; + +import org.apache.cassandra.fqltool.FQLQuery; +import org.apache.cassandra.fqltool.FQLQueryIterator; +import org.apache.cassandra.fqltool.QueryReplayer; +import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.MergeIterator; + +/** + * replay the contents of a list of paths containing full query logs + */ +@Command(name = "replay", description = "Replay full query logs") +public class Replay implements Runnable +{ + @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Paths containing the full query logs to replay.", required = true) + private List<String> arguments = new ArrayList<>(); + + @Option(title = "target", name = {"--target"}, description = "Hosts to replay the logs to, can be repeated to replay to more hosts.") + private List<String> targetHosts; + + @Option(title = "results", name = { "--results"}, description = "Where to store the results of the queries, this should be a directory. Leave this option out to avoid storing results.") + private String resultPath; + + @Option(title = "keyspace", name = { "--keyspace"}, description = "Only replay queries against this keyspace and queries without keyspace set.") + private String keyspace; + + @Option(title = "debug", name = {"--debug"}, description = "Debug mode, print all queries executed.") + private boolean debug; + + @Option(title = "store_queries", name = {"--store-queries"}, description = "Path to store the queries executed. Stores queries in the same order as the result sets are in the result files. Requires --results") + private String queryStorePath; + + @Override + public void run() + { + try + { + List<File> resultPaths = null; + if (resultPath != null) + { + File basePath = new File(resultPath); + if (!basePath.exists() || !basePath.isDirectory()) + { + System.err.println("The results path (" + basePath + ") should be an existing directory"); + System.exit(1); + } + resultPaths = targetHosts.stream().map(target -> new File(basePath, target)).collect(Collectors.toList()); + resultPaths.forEach(File::mkdir); + } + if (targetHosts.size() < 1) + { + System.err.println("You need to state at least one --target host to replay the query against"); + System.exit(1); + } + replay(keyspace, arguments, targetHosts, resultPaths, queryStorePath, debug); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + public static void replay(String keyspace, List<String> arguments, List<String> targetHosts, List<File> resultPaths, String queryStorePath, boolean debug) + { + int readAhead = 200; // how many fql queries should we read in to memory to be able to sort them? + List<ChronicleQueue> readQueues = null; + List<FQLQueryIterator> iterators = null; + List<Predicate<FQLQuery>> filters = new ArrayList<>(); + + if (keyspace != null) + filters.add(fqlQuery -> fqlQuery.keyspace() == null || fqlQuery.keyspace().equals(keyspace)); + + try + { + readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList()); + iterators = readQueues.stream().map(ChronicleQueue::createTailer).map(tailer -> new FQLQueryIterator(tailer, readAhead)).collect(Collectors.toList()); + try (MergeIterator<FQLQuery, List<FQLQuery>> iter = MergeIterator.get(iterators, FQLQuery::compareTo, new Reducer()); + QueryReplayer replayer = new QueryReplayer(iter, targetHosts, resultPaths, filters, System.out, queryStorePath, debug)) + { + replayer.replay(); + } + } + catch (Exception e) + { + throw new RuntimeException(e); + } + finally + { + if (iterators != null) + iterators.forEach(AbstractIterator::close); + if (readQueues != null) + readQueues.forEach(Closeable::close); + } + } + + @VisibleForTesting + public static class Reducer extends MergeIterator.Reducer<FQLQuery, List<FQLQuery>> + { + List<FQLQuery> queries = new ArrayList<>(); + public void reduce(int idx, FQLQuery current) + { + queries.add(current); + } + + protected List<FQLQuery> getReduced() + { + return queries; + } + protected void onKeyChange() + { + queries.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLCompareTest.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLCompareTest.java b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLCompareTest.java new file mode 100644 index 0000000..7990b7e --- /dev/null +++ b/tools/fqltool/test/unit/org/apache/cassandra/fqltool/FQLCompareTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.collect.Lists; +import org.junit.Test; + +import net.openhft.chronicle.core.io.Closeable; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.fqltool.commands.Compare; +import org.apache.cassandra.tools.Util; + + +import static org.psjava.util.AssertStatus.assertTrue; + +public class FQLCompareTest +{ + public FQLCompareTest() + { + Util.initDatabaseDescriptor(); + } + + @Test + public void endToEnd() throws IOException + { + List<String> targetHosts = Lists.newArrayList("hosta", "hostb"); + File tmpDir = Files.createTempDirectory("testresulthandler").toFile(); + File queryDir = Files.createTempDirectory("queries").toFile(); + List<File> resultPaths = generateResultSets(targetHosts, tmpDir, queryDir, true, false); + Compare.compare(queryDir.toString(), resultPaths.stream().map(File::toString).collect(Collectors.toList())); + } + + @Test + public void endToEndQueryFailures() throws IOException + { + List<String> targetHosts = Lists.newArrayList("hosta", "hostb"); + File tmpDir = Files.createTempDirectory("testresulthandler").toFile(); + File queryDir = Files.createTempDirectory("queries").toFile(); + List<File> resultPaths = generateResultSets(targetHosts, tmpDir, queryDir, true,true); + Compare.compare(queryDir.toString(), resultPaths.stream().map(File::toString).collect(Collectors.toList())); + } + + @Test + public void compareEqual() throws IOException + { + List<String> targetHosts = Lists.newArrayList("hosta", "hostb"); + File tmpDir = Files.createTempDirectory("testresulthandler").toFile(); + File queryDir = Files.createTempDirectory("queries").toFile(); + List<File> resultPaths = generateResultSets(targetHosts, tmpDir, queryDir, false,false); + + ResultComparator comparator = new ResultComparator(); + List<ChronicleQueue> readQueues = null; + try + { + readQueues = resultPaths.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList()); + List<Iterator<ResultHandler.ComparableResultSet>> its = readQueues.stream().map(q -> new Compare.StoredResultSetIterator(q.createTailer())).collect(Collectors.toList()); + List<ResultHandler.ComparableResultSet> resultSets = Compare.resultSets(its); + while(resultSets.stream().allMatch(Objects::nonNull)) + { + assertTrue(comparator.compareColumnDefinitions(targetHosts, query(), resultSets.stream().map(ResultHandler.ComparableResultSet::getColumnDefinitions).collect(Collectors.toList()))); + List<Iterator<ResultHandler.ComparableRow>> rows = resultSets.stream().map(Iterable::iterator).collect(Collectors.toList()); + + List<ResultHandler.ComparableRow> toCompare = ResultHandler.rows(rows); + + while (toCompare.stream().allMatch(Objects::nonNull)) + { + assertTrue(comparator.compareRows(targetHosts, query(), ResultHandler.rows(rows))); + toCompare = ResultHandler.rows(rows); + } + resultSets = Compare.resultSets(its); + } + } + finally + { + if (readQueues != null) + readQueues.forEach(Closeable::close); + } + } + + private List<File> generateResultSets(List<String> targetHosts, File resultDir, File queryDir, boolean random, boolean includeFailures) throws IOException + { + List<File> resultPaths = new ArrayList<>(); + targetHosts.forEach(host -> { File f = new File(resultDir, host); f.mkdir(); resultPaths.add(f);}); + + try (ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir)) + { + for (int i = 0; i < 100; i++) + { + ResultHandler.ComparableResultSet resultSet1 = includeFailures && (i % 10 == 0) + ? StoredResultSet.failed("test failure!") + : FQLReplayTest.createResultSet(10, 10, random); + ResultHandler.ComparableResultSet resultSet2 = FQLReplayTest.createResultSet(10, 10, random); + rh.handleResults(query(), Lists.newArrayList(resultSet1, resultSet2)); + } + } + return resultPaths; + } + + private FQLQuery.Single query() + { + return new FQLQuery.Single("abc", QueryOptions.DEFAULT.getProtocolVersion().asInt(), QueryOptions.DEFAULT, 12345, 5555, 6666, "select * from xyz", Collections.emptyList()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
