http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java b/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java deleted file mode 100644 index 043ead8..0000000 --- a/src/java/org/apache/cassandra/tools/fqltool/commands/Replay.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.tools.fqltool.commands; - - -import java.io.File; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -import com.google.common.annotations.VisibleForTesting; - -import io.airlift.airline.Arguments; -import io.airlift.airline.Command; -import io.airlift.airline.Option; -import net.openhft.chronicle.core.io.Closeable; -import net.openhft.chronicle.queue.ChronicleQueue; -import net.openhft.chronicle.queue.ChronicleQueueBuilder; - -import org.apache.cassandra.tools.fqltool.FQLQuery; -import org.apache.cassandra.tools.fqltool.FQLQueryIterator; -import org.apache.cassandra.tools.fqltool.QueryReplayer; -import org.apache.cassandra.utils.AbstractIterator; -import org.apache.cassandra.utils.MergeIterator; - -/** - * replay the contents of a list of paths containing full query logs - */ -@Command(name = "replay", description = "Replay full query logs") -public class Replay implements Runnable -{ - @Arguments(usage = "<path1> [<path2>...<pathN>]", description = "Paths containing the full query logs to replay.", required = true) - private List<String> arguments = new ArrayList<>(); - - @Option(title = "target", name = {"--target"}, description = "Hosts to replay the logs to, can be repeated to replay to more hosts.") - private List<String> targetHosts; - - @Option(title = "results", name = { "--results"}, description = "Where to store the results of the queries, this should be a directory. Leave this option out to avoid storing results.") - private String resultPath; - - @Option(title = "keyspace", name = { "--keyspace"}, description = "Only replay queries against this keyspace and queries without keyspace set.") - private String keyspace; - - @Option(title = "debug", name = {"--debug"}, description = "Debug mode, print all queries executed.") - private boolean debug; - - @Option(title = "store_queries", name = {"--store-queries"}, description = "Path to store the queries executed. Stores queries in the same order as the result sets are in the result files. Requires --results") - private String queryStorePath; - - @Override - public void run() - { - try - { - List<File> resultPaths = null; - if (resultPath != null) - { - File basePath = new File(resultPath); - if (!basePath.exists() || !basePath.isDirectory()) - { - System.err.println("The results path (" + basePath + ") should be an existing directory"); - System.exit(1); - } - resultPaths = targetHosts.stream().map(target -> new File(basePath, target)).collect(Collectors.toList()); - resultPaths.forEach(File::mkdir); - } - if (targetHosts.size() < 1) - { - System.err.println("You need to state at least one --target host to replay the query against"); - System.exit(1); - } - replay(keyspace, arguments, targetHosts, resultPaths, queryStorePath, debug); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - public static void replay(String keyspace, List<String> arguments, List<String> targetHosts, List<File> resultPaths, String queryStorePath, boolean debug) - { - int readAhead = 200; // how many fql queries should we read in to memory to be able to sort them? - List<ChronicleQueue> readQueues = null; - List<FQLQueryIterator> iterators = null; - List<Predicate<FQLQuery>> filters = new ArrayList<>(); - - if (keyspace != null) - filters.add(fqlQuery -> fqlQuery.keyspace == null || fqlQuery.keyspace.equals(keyspace)); - - try - { - readQueues = arguments.stream().map(s -> ChronicleQueueBuilder.single(s).readOnly(true).build()).collect(Collectors.toList()); - iterators = readQueues.stream().map(ChronicleQueue::createTailer).map(tailer -> new FQLQueryIterator(tailer, readAhead)).collect(Collectors.toList()); - try (MergeIterator<FQLQuery, List<FQLQuery>> iter = MergeIterator.get(iterators, FQLQuery::compareTo, new Reducer()); - QueryReplayer replayer = new QueryReplayer(iter, targetHosts, resultPaths, filters, System.out, queryStorePath, debug)) - { - replayer.replay(); - } - } - catch (Exception e) - { - throw new RuntimeException(e); - } - finally - { - if (iterators != null) - iterators.forEach(AbstractIterator::close); - if (readQueues != null) - readQueues.forEach(Closeable::close); - } - } - - @VisibleForTesting - public static class Reducer extends MergeIterator.Reducer<FQLQuery, List<FQLQuery>> - { - List<FQLQuery> queries = new ArrayList<>(); - public void reduce(int idx, FQLQuery current) - { - queries.add(current); - } - - protected List<FQLQuery> getReduced() - { - return queries; - } - protected void onKeyChange() - { - queries.clear(); - } - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java b/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java deleted file mode 100644 index a662699..0000000 --- a/test/unit/org/apache/cassandra/tools/fqltool/FQLReplayTest.java +++ /dev/null @@ -1,760 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.tools.fqltool; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Objects; -import java.util.Random; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import org.junit.Test; - -import net.openhft.chronicle.queue.ChronicleQueue; -import net.openhft.chronicle.queue.ChronicleQueueBuilder; -import net.openhft.chronicle.queue.ExcerptAppender; -import net.openhft.chronicle.queue.ExcerptTailer; -import net.openhft.chronicle.wire.ValueIn; -import org.apache.cassandra.audit.FullQueryLogger; -import org.apache.cassandra.cql3.QueryOptions; -import org.apache.cassandra.cql3.statements.BatchStatement; -import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.service.QueryState; -import org.apache.cassandra.tools.Util; -import org.apache.cassandra.tools.fqltool.commands.Replay; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.MergeIterator; -import org.apache.cassandra.utils.Pair; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -public class FQLReplayTest -{ - public FQLReplayTest() - { - Util.initDatabaseDescriptor(); - } - - @Test - public void testOrderedReplay() throws IOException - { - File f = generateQueries(100, true); - int queryCount = 0; - try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build(); - FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 101)) - { - long last = -1; - while (iter.hasNext()) - { - FQLQuery q = iter.next(); - assertTrue(q.queryStartTime >= last); - last = q.queryStartTime; - queryCount++; - } - } - assertEquals(100, queryCount); - } - @Test - public void testMergingIterator() throws IOException - { - File f = generateQueries(100, false); - File f2 = generateQueries(100, false); - int queryCount = 0; - try (ChronicleQueue queue = ChronicleQueueBuilder.single(f).build(); - ChronicleQueue queue2 = ChronicleQueueBuilder.single(f2).build(); - FQLQueryIterator iter = new FQLQueryIterator(queue.createTailer(), 101); - FQLQueryIterator iter2 = new FQLQueryIterator(queue2.createTailer(), 101); - MergeIterator<FQLQuery, List<FQLQuery>> merger = MergeIterator.get(Lists.newArrayList(iter, iter2), FQLQuery::compareTo, new Replay.Reducer())) - { - long last = -1; - - while (merger.hasNext()) - { - List<FQLQuery> qs = merger.next(); - assertEquals(2, qs.size()); - assertEquals(0, qs.get(0).compareTo(qs.get(1))); - assertTrue(qs.get(0).queryStartTime >= last); - last = qs.get(0).queryStartTime; - queryCount++; - } - } - assertEquals(100, queryCount); - } - - @Test - public void testFQLQueryReader() throws IOException - { - FQLQueryReader reader = new FQLQueryReader(); - - try (ChronicleQueue queue = ChronicleQueueBuilder.single(generateQueries(1000, true)).build()) - { - ExcerptTailer tailer = queue.createTailer(); - int queryCount = 0; - while (tailer.readDocument(reader)) - { - assertNotNull(reader.getQuery()); - if (reader.getQuery() instanceof FQLQuery.Single) - { - assertTrue(reader.getQuery().keyspace == null || reader.getQuery().keyspace.equals("querykeyspace")); - } - else - { - assertEquals("someks", reader.getQuery().keyspace); - } - queryCount++; - } - assertEquals(1000, queryCount); - } - } - - @Test - public void testStoringResults() throws Throwable - { - File tmpDir = Files.createTempDirectory("results").toFile(); - File queryDir = Files.createTempDirectory("queries").toFile(); - - ResultHandler.ComparableResultSet res = createResultSet(10, 10, true); - ResultStore rs = new ResultStore(Collections.singletonList(tmpDir), queryDir); - try - { - FQLQuery query = new FQLQuery.Single("abc", 3, QueryOptions.DEFAULT, 12345, 11111, 22, "select * from abc", Collections.emptyList()); - rs.storeColumnDefinitions(query, Collections.singletonList(res.getColumnDefinitions())); - Iterator<ResultHandler.ComparableRow> it = res.iterator(); - while (it.hasNext()) - { - List<ResultHandler.ComparableRow> row = Collections.singletonList(it.next()); - rs.storeRows(row); - } - // this marks the end of the result set: - rs.storeRows(Collections.singletonList(null)); - } - finally - { - rs.close(); - } - - List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> resultSets = readResultFile(tmpDir, queryDir); - assertEquals(1, resultSets.size()); - assertEquals(res, resultSets.get(0).right); - - } - - @Test - public void testCompareColumnDefinitions() - { - ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); - ResultComparator rc = new ResultComparator(); - - List<ResultHandler.ComparableColumnDefinitions> colDefs = new ArrayList<>(100); - List<String> targetHosts = new ArrayList<>(100); - for (int i = 0; i < 100; i++) - { - targetHosts.add("host"+i); - colDefs.add(res.getColumnDefinitions()); - } - assertTrue(rc.compareColumnDefinitions(targetHosts, null, colDefs)); - colDefs.set(50, createResultSet(9, 9, false).getColumnDefinitions()); - assertFalse(rc.compareColumnDefinitions(targetHosts, null, colDefs)); - } - - @Test - public void testCompareEqualRows() - { - ResultComparator rc = new ResultComparator(); - - ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); - ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false); - List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2); - List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList()); - - while (true) - { - List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters); - assertTrue(rc.compareRows(Lists.newArrayList("eq1", "eq2"), null, rows)); - if (rows.stream().allMatch(Objects::isNull)) - break; - } - } - - @Test - public void testCompareRowsDifferentCount() - { - ResultComparator rc = new ResultComparator(); - ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); - ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false); - List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(10, 11, false)); - List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList()); - boolean foundMismatch = false; - while (true) - { - List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters); - if (rows.stream().allMatch(Objects::isNull)) - break; - if (!rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows)) - { - foundMismatch = true; - } - } - assertTrue(foundMismatch); - } - - @Test - public void testCompareRowsDifferentContent() - { - ResultComparator rc = new ResultComparator(); - ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); - ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false); - List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(10, 10, true)); - List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList()); - while (true) - { - List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters); - if (rows.stream().allMatch(Objects::isNull)) - break; - assertFalse(rows.toString(), rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows)); - } - } - - @Test - public void testCompareRowsDifferentColumnCount() - { - ResultComparator rc = new ResultComparator(); - ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); - ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false); - List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, createResultSet(11, 10, false)); - List<Iterator<ResultHandler.ComparableRow>> iters = toCompare.stream().map(Iterable::iterator).collect(Collectors.toList()); - while (true) - { - List<ResultHandler.ComparableRow> rows = ResultHandler.rows(iters); - if (rows.stream().allMatch(Objects::isNull)) - break; - assertFalse(rows.toString(), rc.compareRows(Lists.newArrayList("eq1", "eq2", "diff"), null, rows)); - } - } - - @Test - public void testResultHandler() throws IOException - { - List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc"); - File tmpDir = Files.createTempDirectory("testresulthandler").toFile(); - File queryDir = Files.createTempDirectory("queries").toFile(); - List<File> resultPaths = new ArrayList<>(); - targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);}); - ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir); - ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); - ResultHandler.ComparableResultSet res2 = createResultSet(10, 10, false); - ResultHandler.ComparableResultSet res3 = createResultSet(10, 10, false); - List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, res3); - rh.handleResults(new FQLQuery.Single("abcabc", 3, QueryOptions.DEFAULT, 1111, 2222, 3333, "select * from xyz", Collections.emptyList()), toCompare); - List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(resultPaths.get(0), queryDir); - List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results2 = readResultFile(resultPaths.get(1), queryDir); - List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results3 = readResultFile(resultPaths.get(2), queryDir); - assertEquals(results1, results2); - assertEquals(results1, results3); - assertEquals(Iterables.getOnlyElement(results3).right, res); - } - - @Test - public void testResultHandlerWithDifference() throws IOException - { - List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc"); - File tmpDir = Files.createTempDirectory("testresulthandler").toFile(); - File queryDir = Files.createTempDirectory("queries").toFile(); - List<File> resultPaths = new ArrayList<>(); - targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);}); - ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir); - ResultHandler.ComparableResultSet res = createResultSet(10, 10, false); - ResultHandler.ComparableResultSet res2 = createResultSet(10, 5, false); - ResultHandler.ComparableResultSet res3 = createResultSet(10, 10, false); - List<ResultHandler.ComparableResultSet> toCompare = Lists.newArrayList(res, res2, res3); - rh.handleResults(new FQLQuery.Single("aaa", 3, QueryOptions.DEFAULT, 123123, 11111, 22222, "select * from abcabc", Collections.emptyList()), toCompare); - List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(resultPaths.get(0), queryDir); - List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results2 = readResultFile(resultPaths.get(1), queryDir); - List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results3 = readResultFile(resultPaths.get(2), queryDir); - assertEquals(results1, results3); - assertEquals(results2.get(0).right, res2); - } - - @Test - public void testResultHandlerMultipleResultSets() throws IOException - { - List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc"); - File tmpDir = Files.createTempDirectory("testresulthandler").toFile(); - File queryDir = Files.createTempDirectory("queries").toFile(); - List<File> resultPaths = new ArrayList<>(); - targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);}); - ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir); - List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets = new ArrayList<>(); - Random random = new Random(); - for (int i = 0; i < 10; i++) - { - List<ResultHandler.ComparableResultSet> results = new ArrayList<>(); - List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(i * 50)); - for (int jj = 0; jj < targetHosts.size(); jj++) - { - results.add(createResultSet(5, 1 + random.nextInt(10), true)); - } - FQLQuery q = new FQLQuery.Single("abc"+i, - 3, - QueryOptions.forInternalCalls(values), - i * 1000, - 12345, - 54321, - "select * from xyz where id = "+i, - values); - resultSets.add(Pair.create(q, results)); - } - for (int i = 0; i < resultSets.size(); i++) - rh.handleResults(resultSets.get(i).left, resultSets.get(i).right); - - for (int i = 0; i < targetHosts.size(); i++) - compareWithFile(resultPaths, queryDir, resultSets, i); - } - - @Test - public void testResultHandlerFailedQuery() throws IOException - { - List<String> targetHosts = Lists.newArrayList("hosta", "hostb", "hostc", "hostd"); - File tmpDir = Files.createTempDirectory("testresulthandler").toFile(); - File queryDir = Files.createTempDirectory("queries").toFile(); - List<File> resultPaths = new ArrayList<>(); - targetHosts.forEach(host -> { File f = new File(tmpDir, host); f.mkdir(); resultPaths.add(f);}); - ResultHandler rh = new ResultHandler(targetHosts, resultPaths, queryDir); - List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets = new ArrayList<>(); - Random random = new Random(); - for (int i = 0; i < 10; i++) - { - List<ResultHandler.ComparableResultSet> results = new ArrayList<>(); - List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(i * 50)); - for (int jj = 0; jj < targetHosts.size(); jj++) - { - results.add(createResultSet(5, 1 + random.nextInt(10), true)); - } - results.set(0, FakeResultSet.failed(new RuntimeException("testing abc"))); - results.set(3, FakeResultSet.failed(new RuntimeException("testing abc"))); - FQLQuery q = new FQLQuery.Single("abc"+i, - 3, - QueryOptions.forInternalCalls(values), - i * 1000, - i * 12345, - i * 54321, - "select * from xyz where id = "+i, - values); - resultSets.add(Pair.create(q, results)); - } - for (int i = 0; i < resultSets.size(); i++) - rh.handleResults(resultSets.get(i).left, resultSets.get(i).right); - - for (int i = 0; i < targetHosts.size(); i++) - compareWithFile(resultPaths, queryDir, resultSets, i); - } - - - @Test - public void testCompare() - { - FQLQuery q1 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.emptyList()); - FQLQuery q2 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222,"aaaa", Collections.emptyList()); - - assertEquals(0, q1.compareTo(q2)); - assertEquals(0, q2.compareTo(q1)); - - FQLQuery q3 = new FQLQuery.Batch("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, com.datastax.driver.core.BatchStatement.Type.UNLOGGED, Collections.emptyList(), Collections.emptyList()); - // single queries before batch queries - assertTrue(q1.compareTo(q3) < 0); - assertTrue(q3.compareTo(q1) > 0); - - // check that smaller query time - FQLQuery q4 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 124, 111, 222, "aaaa", Collections.emptyList()); - assertTrue(q1.compareTo(q4) < 0); - assertTrue(q4.compareTo(q1) > 0); - - FQLQuery q5 = new FQLQuery.Batch("abc", 0, QueryOptions.DEFAULT, 124, 111, 222, com.datastax.driver.core.BatchStatement.Type.UNLOGGED, Collections.emptyList(), Collections.emptyList()); - assertTrue(q1.compareTo(q5) < 0); - assertTrue(q5.compareTo(q1) > 0); - - FQLQuery q6 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes(10))); - FQLQuery q7 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.emptyList()); - assertTrue(q6.compareTo(q7) > 0); - assertTrue(q7.compareTo(q6) < 0); - - FQLQuery q8 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes("a"))); - FQLQuery q9 = new FQLQuery.Single("abc", 0, QueryOptions.DEFAULT, 123, 111, 222, "aaaa", Collections.singletonList(ByteBufferUtil.bytes("b"))); - assertTrue(q8.compareTo(q9) < 0); - assertTrue(q9.compareTo(q8) > 0); - } - - private File generateQueries(int count, boolean random) throws IOException - { - Random r = new Random(); - File dir = Files.createTempDirectory("chronicle").toFile(); - try (ChronicleQueue readQueue = ChronicleQueueBuilder.single(dir).build()) - { - ExcerptAppender appender = readQueue.acquireAppender(); - - for (int i = 0; i < count; i++) - { - long timestamp = random ? Math.abs(r.nextLong() % 10000) : i; - if (random ? r.nextBoolean() : i % 2 == 0) - { - String query = "abcdefghijklm " + i; - QueryState qs = r.nextBoolean() ? queryState() : queryState("querykeyspace"); - FullQueryLogger.Query q = new FullQueryLogger.Query(query, QueryOptions.DEFAULT, qs, timestamp); - appender.writeDocument(q); - q.release(); - } - else - { - int batchSize = random ? r.nextInt(99) + 1 : i + 1; - List<String> queries = new ArrayList<>(batchSize); - List<List<ByteBuffer>> values = new ArrayList<>(batchSize); - for (int jj = 0; jj < (random ? r.nextInt(batchSize) : 10); jj++) - { - queries.add("aaaaaa batch "+i+":"+jj); - values.add(Collections.emptyList()); - } - FullQueryLogger.Batch batch = new FullQueryLogger.Batch(BatchStatement.Type.UNLOGGED, - queries, - values, - QueryOptions.DEFAULT, - queryState("someks"), - timestamp); - appender.writeDocument(batch); - batch.release(); - } - } - } - return dir; - } - - private QueryState queryState() - { - return QueryState.forInternalCalls(); - } - - private QueryState queryState(String keyspace) - { - ClientState clientState = ClientState.forInternalCalls(keyspace); - return new QueryState(clientState); - } - - private static ResultHandler.ComparableResultSet createResultSet(int columnCount, int rowCount, boolean random) - { - List<Pair<String, String>> columnDefs = new ArrayList<>(columnCount); - Random r = new Random(); - for (int i = 0; i < columnCount; i++) - { - columnDefs.add(Pair.create("a" + i, "int")); - } - List<List<String>> rows = new ArrayList<>(); - for (int i = 0; i < rowCount; i++) - { - List<String> row = new ArrayList<>(columnCount); - for (int jj = 0; jj < columnCount; jj++) - row.add(i + " col " + jj + (random ? r.nextInt() : "")); - rows.add(row); - } - return new FakeResultSet(columnDefs, rows); - } - - private static void compareWithFile(List<File> dirs, File resultDir, List<Pair<FQLQuery, List<ResultHandler.ComparableResultSet>>> resultSets, int idx) - { - List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> results1 = readResultFile(dirs.get(idx), resultDir); - for (int i = 0; i < results1.size(); i++) - { - assertEquals(results1.get(i).left, resultSets.get(i).left); - assertEquals(results1.get(i).right, resultSets.get(i).right.get(idx)); - } - } - - private static List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> readResultFile(File dir, File queryDir) - { - List<Pair<FQLQuery, ResultHandler.ComparableResultSet>> resultSets = new ArrayList<>(); - try (ChronicleQueue q = ChronicleQueueBuilder.single(dir).build(); - ChronicleQueue queryQ = ChronicleQueueBuilder.single(queryDir).build()) - { - ExcerptTailer tailer = q.createTailer(); - ExcerptTailer queryTailer = queryQ.createTailer(); - List<Pair<String, String>> columnDefinitions = new ArrayList<>(); - List<List<String>> rowColumns = new ArrayList<>(); - AtomicBoolean allRowsRead = new AtomicBoolean(false); - AtomicBoolean failedQuery = new AtomicBoolean(false); - while (tailer.readDocument(wire -> { - String type = wire.read("type").text(); - if (type.equals("column_definitions")) - { - int columnCount = wire.read("column_count").int32(); - for (int i = 0; i < columnCount; i++) - { - ValueIn vi = wire.read("column_definition"); - String name = vi.text(); - String dataType = vi.text(); - columnDefinitions.add(Pair.create(name, dataType)); - } - } - else if (type.equals("row")) - { - int rowColumnCount = wire.read("row_column_count").int32(); - List<String> r = new ArrayList<>(rowColumnCount); - for (int i = 0; i < rowColumnCount; i++) - { - byte[] b = wire.read("column").bytes(); - r.add(new String(b)); - } - rowColumns.add(r); - } - else if (type.equals("end_resultset")) - { - allRowsRead.set(true); - } - else if (type.equals("query_failed")) - { - failedQuery.set(true); - } - })) - { - if (allRowsRead.get()) - { - FQLQueryReader reader = new FQLQueryReader(); - queryTailer.readDocument(reader); - resultSets.add(Pair.create(reader.getQuery(), failedQuery.get() ? FakeResultSet.failed(new RuntimeException("failure")) - : new FakeResultSet(ImmutableList.copyOf(columnDefinitions), ImmutableList.copyOf(rowColumns)))); - allRowsRead.set(false); - failedQuery.set(false); - columnDefinitions.clear(); - rowColumns.clear(); - } - } - } - return resultSets; - } - - private static class FakeResultSet implements ResultHandler.ComparableResultSet - { - private final List<Pair<String, String>> cdStrings; - private final List<List<String>> rows; - private final Throwable ex; - - public FakeResultSet(List<Pair<String, String>> cdStrings, List<List<String>> rows) - { - this(cdStrings, rows, null); - } - - public FakeResultSet(List<Pair<String, String>> cdStrings, List<List<String>> rows, Throwable ex) - { - this.cdStrings = cdStrings; - this.rows = rows; - this.ex = ex; - } - - public static FakeResultSet failed(Throwable ex) - { - return new FakeResultSet(null, null, ex); - } - - public ResultHandler.ComparableColumnDefinitions getColumnDefinitions() - { - return new FakeComparableColumnDefinitions(cdStrings, wasFailed()); - } - - public boolean wasFailed() - { - return getFailureException() != null; - } - - public Throwable getFailureException() - { - return ex; - } - - public Iterator<ResultHandler.ComparableRow> iterator() - { - if (wasFailed()) - return Collections.emptyListIterator(); - return new AbstractIterator<ResultHandler.ComparableRow>() - { - Iterator<List<String>> iter = rows.iterator(); - protected ResultHandler.ComparableRow computeNext() - { - if (iter.hasNext()) - return new FakeComparableRow(iter.next(), cdStrings); - return endOfData(); - } - }; - } - - public boolean equals(Object o) - { - if (this == o) return true; - if (!(o instanceof FakeResultSet)) return false; - FakeResultSet that = (FakeResultSet) o; - if (wasFailed() && that.wasFailed()) - return true; - return Objects.equals(cdStrings, that.cdStrings) && - Objects.equals(rows, that.rows); - } - - public int hashCode() - { - return Objects.hash(cdStrings, rows); - } - - public String toString() - { - return "FakeResultSet{" + - "cdStrings=" + cdStrings + - ", rows=" + rows + - '}'; - } - } - - private static class FakeComparableRow implements ResultHandler.ComparableRow - { - private final List<String> row; - private final List<Pair<String, String>> cds; - - public FakeComparableRow(List<String> row, List<Pair<String,String>> cds) - { - this.row = row; - this.cds = cds; - } - - public ByteBuffer getBytesUnsafe(int i) - { - return ByteBufferUtil.bytes(row.get(i)); - } - - public ResultHandler.ComparableColumnDefinitions getColumnDefinitions() - { - return new FakeComparableColumnDefinitions(cds, false); - } - - public boolean equals(Object other) - { - if (!(other instanceof FakeComparableRow)) - return false; - return row.equals(((FakeComparableRow)other).row); - } - - public String toString() - { - return row.toString(); - } - } - - private static class FakeComparableColumnDefinitions implements ResultHandler.ComparableColumnDefinitions - { - private final List<ResultHandler.ComparableDefinition> defs; - private final boolean failed; - public FakeComparableColumnDefinitions(List<Pair<String, String>> cds, boolean failed) - { - defs = cds != null ? cds.stream().map(FakeComparableDefinition::new).collect(Collectors.toList()) : null; - this.failed = failed; - } - - public List<ResultHandler.ComparableDefinition> asList() - { - if (wasFailed()) - return Collections.emptyList(); - return defs; - } - - public boolean wasFailed() - { - return failed; - } - - public int size() - { - return defs.size(); - } - - public Iterator<ResultHandler.ComparableDefinition> iterator() - { - if (wasFailed()) - return Collections.emptyListIterator(); - return new AbstractIterator<ResultHandler.ComparableDefinition>() - { - Iterator<ResultHandler.ComparableDefinition> iter = defs.iterator(); - protected ResultHandler.ComparableDefinition computeNext() - { - if (iter.hasNext()) - return iter.next(); - return endOfData(); - } - }; - } - public boolean equals(Object other) - { - if (!(other instanceof FakeComparableColumnDefinitions)) - return false; - return defs.equals(((FakeComparableColumnDefinitions)other).defs); - } - - public String toString() - { - return defs.toString(); - } - } - - private static class FakeComparableDefinition implements ResultHandler.ComparableDefinition - { - private final Pair<String, String> p; - - public FakeComparableDefinition(Pair<String, String> p) - { - this.p = p; - } - public String getType() - { - return p.right; - } - - public String getName() - { - return p.left; - } - - public boolean equals(Object other) - { - if (!(other instanceof FakeComparableDefinition)) - return false; - return p.equals(((FakeComparableDefinition)other).p); - } - - public String toString() - { - return getName() + ':' + getType(); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/bin/cassandra.in.bat ---------------------------------------------------------------------- diff --git a/tools/bin/cassandra.in.bat b/tools/bin/cassandra.in.bat index 8804921..4d395e8 100644 --- a/tools/bin/cassandra.in.bat +++ b/tools/bin/cassandra.in.bat @@ -39,7 +39,7 @@ goto :eof :okClasspath REM Include the build\classes\main directory so it works in development -set CASSANDRA_CLASSPATH=%CLASSPATH%;%CASSANDRA_CONF%;"%CASSANDRA_HOME%\build\classes\main";"%CASSANDRA_HOME%\build\classes\stress" +set CASSANDRA_CLASSPATH=%CLASSPATH%;%CASSANDRA_CONF%;"%CASSANDRA_HOME%\build\classes\main";"%CASSANDRA_HOME%\build\classes\stress";"%CASSANDRA_HOME%\build\classes\fqltool" REM Add the default storage location. Can be overridden in conf\cassandra.yaml set CASSANDRA_PARAMS=%CASSANDRA_PARAMS% "-Dcassandra.storagedir=%CASSANDRA_HOME%\data" http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/bin/cassandra.in.sh ---------------------------------------------------------------------- diff --git a/tools/bin/cassandra.in.sh b/tools/bin/cassandra.in.sh index 869eb22..bf1ecc4 100644 --- a/tools/bin/cassandra.in.sh +++ b/tools/bin/cassandra.in.sh @@ -32,7 +32,7 @@ CLASSPATH="$CASSANDRA_CONF" if [ -d $CASSANDRA_HOME/build ] ; then #cassandra_bin="$CASSANDRA_HOME/build/classes/main" cassandra_bin=`ls -1 $CASSANDRA_HOME/build/apache-cassandra*.jar` - cassandra_bin="$cassandra_bin:$CASSANDRA_HOME/build/classes/stress" + cassandra_bin="$cassandra_bin:$CASSANDRA_HOME/build/classes/stress:$CASSANDRA_HOME/build/classes/fqltool" CLASSPATH="$CLASSPATH:$cassandra_bin" fi http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/bin/fqltool ---------------------------------------------------------------------- diff --git a/tools/bin/fqltool b/tools/bin/fqltool new file mode 100755 index 0000000..a34128e --- /dev/null +++ b/tools/bin/fqltool @@ -0,0 +1,76 @@ +#!/bin/sh + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +if [ "x$CASSANDRA_INCLUDE" = "x" ]; then + # Locations (in order) to use when searching for an include file. + for include in "`dirname "$0"`/cassandra.in.sh" \ + "$HOME/.cassandra.in.sh" \ + /usr/share/cassandra/cassandra.in.sh \ + /usr/local/share/cassandra/cassandra.in.sh \ + /opt/cassandra/cassandra.in.sh; do + if [ -r "$include" ]; then + . "$include" + break + fi + done +elif [ -r "$CASSANDRA_INCLUDE" ]; then + . "$CASSANDRA_INCLUDE" +fi + +if [ -z "$CASSANDRA_CONF" -o -z "$CLASSPATH" ]; then + echo "You must set the CASSANDRA_CONF and CLASSPATH vars" >&2 + exit 1 +fi + +# Run cassandra-env.sh to pick up JMX_PORT +if [ -f "$CASSANDRA_CONF/cassandra-env.sh" ]; then + JVM_OPTS_SAVE=$JVM_OPTS + MAX_HEAP_SIZE_SAVE=$MAX_HEAP_SIZE + . "$CASSANDRA_CONF/cassandra-env.sh" + MAX_HEAP_SIZE=$MAX_HEAP_SIZE_SAVE + JVM_OPTS=$JVM_OPTS_SAVE +fi + +# JMX Port passed via cmd line args (-p 9999 / --port 9999 / --port=9999) +# should override the value from cassandra-env.sh +ARGS="" +JVM_ARGS="" +while true +do + if [ ! $1 ]; then break; fi + case $1 in + -D*) + JVM_ARGS="$JVM_ARGS $1" + ;; + *) + ARGS="$ARGS $1" + ;; + esac + shift +done + +if [ "x$MAX_HEAP_SIZE" = "x" ]; then + MAX_HEAP_SIZE="512m" +fi + +"$JAVA" $JAVA_AGENT -ea -da:net.openhft... -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \ + -Dlog4j.configurationFile=log4j2-tools.xml \ + $JVM_ARGS \ + org.apache.cassandra.fqltool.FullQueryLogTool $ARGS + +# vi:ai sw=4 ts=4 tw=0 et http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/bin/fqltool.bat ---------------------------------------------------------------------- diff --git a/tools/bin/fqltool.bat b/tools/bin/fqltool.bat new file mode 100644 index 0000000..acb6d1c --- /dev/null +++ b/tools/bin/fqltool.bat @@ -0,0 +1,36 @@ +@REM +@REM Licensed to the Apache Software Foundation (ASF) under one or more +@REM contributor license agreements. See the NOTICE file distributed with +@REM this work for additional information regarding copyright ownership. +@REM The ASF licenses this file to You under the Apache License, Version 2.0 +@REM (the "License"); you may not use this file except in compliance with +@REM the License. You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, software +@REM distributed under the License is distributed on an "AS IS" BASIS, +@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@REM See the License for the specific language governing permissions and +@REM limitations under the License. + +@echo off +if "%OS%" == "Windows_NT" setlocal + +pushd "%~dp0" +call cassandra.in.bat + +if NOT DEFINED JAVA_HOME goto :err + +set CASSANDRA_PARAMS=%CASSANDRA_PARAMS% -Dcassandra.logdir="%CASSANDRA_HOME%\logs" + +"%JAVA_HOME%\bin\java" -cp %CASSANDRA_CLASSPATH% %CASSANDRA_PARAMS% -Dlog4j.configurationFile=log4j2-tools.xml org.apache.cassandra.fqltool.FullQueryLogTool %* +goto finally + +:err +echo The JAVA_HOME environment variable must be set to run this program! +pause + +:finally +ENDLOCAL & set RC=%ERRORLEVEL% +exit /B %RC% http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/DriverResultSet.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/DriverResultSet.java b/tools/fqltool/src/org/apache/cassandra/fqltool/DriverResultSet.java new file mode 100644 index 0000000..ccff370 --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/DriverResultSet.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.collect.AbstractIterator; + +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import org.apache.cassandra.utils.ByteBufferUtil; + + +/** + * Wraps a result set from the driver so that we can reuse the compare code when reading + * up a result set produced by ResultStore. + */ +public class DriverResultSet implements ResultHandler.ComparableResultSet +{ + private final ResultSet resultSet; + private final Throwable failureException; + + public DriverResultSet(ResultSet resultSet) + { + this(resultSet, null); + } + + private DriverResultSet(ResultSet res, Throwable failureException) + { + resultSet = res; + this.failureException = failureException; + } + + public static DriverResultSet failed(Throwable ex) + { + return new DriverResultSet(null, ex); + } + + public ResultHandler.ComparableColumnDefinitions getColumnDefinitions() + { + if (wasFailed()) + return new DriverColumnDefinitions(null, true, failureException); + + return new DriverColumnDefinitions(resultSet.getColumnDefinitions()); + } + + public boolean wasFailed() + { + return failureException != null; + } + + public Throwable getFailureException() + { + return failureException; + } + + public Iterator<ResultHandler.ComparableRow> iterator() + { + if (wasFailed()) + return Collections.emptyListIterator(); + return new AbstractIterator<ResultHandler.ComparableRow>() + { + Iterator<Row> iter = resultSet.iterator(); + protected ResultHandler.ComparableRow computeNext() + { + if (iter.hasNext()) + return new DriverRow(iter.next()); + return endOfData(); + } + }; + } + + public static class DriverRow implements ResultHandler.ComparableRow + { + private final Row row; + + public DriverRow(Row row) + { + this.row = row; + } + + public ResultHandler.ComparableColumnDefinitions getColumnDefinitions() + { + return new DriverColumnDefinitions(row.getColumnDefinitions()); + } + + public ByteBuffer getBytesUnsafe(int i) + { + return row.getBytesUnsafe(i); + } + + @Override + public boolean equals(Object oo) + { + if (!(oo instanceof ResultHandler.ComparableRow)) + return false; + + ResultHandler.ComparableRow o = (ResultHandler.ComparableRow)oo; + if (getColumnDefinitions().size() != o.getColumnDefinitions().size()) + return false; + + for (int j = 0; j < getColumnDefinitions().size(); j++) + { + ByteBuffer b1 = getBytesUnsafe(j); + ByteBuffer b2 = o.getBytesUnsafe(j); + + if (b1 != null && b2 != null && !b1.equals(b2)) + { + return false; + } + if (b1 == null && b2 != null || b2 == null && b1 != null) + { + return false; + } + } + return true; + } + + public String toString() + { + StringBuilder sb = new StringBuilder(); + List<ResultHandler.ComparableDefinition> colDefs = getColumnDefinitions().asList(); + for (int i = 0; i < getColumnDefinitions().size(); i++) + { + ByteBuffer bb = getBytesUnsafe(i); + String row = bb != null ? ByteBufferUtil.bytesToHex(bb) : "NULL"; + sb.append(colDefs.get(i)).append(':').append(row).append(","); + } + return sb.toString(); + } + } + + public static class DriverColumnDefinitions implements ResultHandler.ComparableColumnDefinitions + { + private final ColumnDefinitions columnDefinitions; + private final boolean failed; + private final Throwable failureException; + + public DriverColumnDefinitions(ColumnDefinitions columnDefinitions) + { + this(columnDefinitions, false, null); + } + + private DriverColumnDefinitions(ColumnDefinitions columnDefinitions, boolean failed, Throwable failureException) + { + this.columnDefinitions = columnDefinitions; + this.failed = failed; + this.failureException = failureException; + } + + public List<ResultHandler.ComparableDefinition> asList() + { + if (wasFailed()) + return Collections.emptyList(); + return columnDefinitions.asList().stream().map(DriverDefinition::new).collect(Collectors.toList()); + } + + public boolean wasFailed() + { + return failed; + } + + public Throwable getFailureException() + { + return failureException; + } + + public int size() + { + return columnDefinitions.size(); + } + + public Iterator<ResultHandler.ComparableDefinition> iterator() + { + return asList().iterator(); + } + + public boolean equals(Object oo) + { + if (!(oo instanceof ResultHandler.ComparableColumnDefinitions)) + return false; + + ResultHandler.ComparableColumnDefinitions o = (ResultHandler.ComparableColumnDefinitions)oo; + if (wasFailed() && o.wasFailed()) + return true; + + if (size() != o.size()) + return false; + + return asList().equals(o.asList()); + } + } + + public static class DriverDefinition implements ResultHandler.ComparableDefinition + { + private final ColumnDefinitions.Definition def; + + public DriverDefinition(ColumnDefinitions.Definition def) + { + this.def = def; + } + + public String getType() + { + return def.getType().toString(); + } + + public String getName() + { + return def.getName(); + } + + public boolean equals(Object oo) + { + if (!(oo instanceof ResultHandler.ComparableDefinition)) + return false; + + return def.equals(((DriverDefinition)oo).def); + } + + public String toString() + { + return getName() + ':' + getType(); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java new file mode 100644 index 0000000..2862e0f --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQuery.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.google.common.primitives.Longs; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import org.apache.cassandra.audit.FullQueryLogger; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.binlog.BinLog; + +public abstract class FQLQuery implements Comparable<FQLQuery> +{ + public final long queryStartTime; + public final QueryOptions queryOptions; + public final int protocolVersion; + public final QueryState queryState; + + public FQLQuery(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds) + { + this.queryStartTime = queryStartTime; + this.queryOptions = queryOptions; + this.protocolVersion = protocolVersion; + this.queryState = queryState(keyspace, generatedTimestamp, generatedNowInSeconds); + } + + public abstract Statement toStatement(); + + /** + * used when storing the queries executed + */ + public abstract BinLog.ReleaseableWriteMarshallable toMarshallable(); + + public String keyspace() + { + return queryState.getClientState().getRawKeyspace(); + } + + private QueryState queryState(String keyspace, long generatedTimestamp, int generatedNowInSeconds) + { + ClientState clientState = keyspace != null ? ClientState.forInternalCalls(keyspace) : ClientState.forInternalCalls(); + return new QueryState(clientState, generatedTimestamp, generatedNowInSeconds); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof FQLQuery)) return false; + FQLQuery fqlQuery = (FQLQuery) o; + return queryStartTime == fqlQuery.queryStartTime && + protocolVersion == fqlQuery.protocolVersion && + queryState.getTimestamp() == fqlQuery.queryState.getTimestamp() && + Objects.equals(queryState.getClientState().getRawKeyspace(), fqlQuery.queryState.getClientState().getRawKeyspace()) && + Objects.equals(queryOptions.getValues(), fqlQuery.queryOptions.getValues()); + } + + public int hashCode() + { + return Objects.hash(queryStartTime, queryOptions, protocolVersion, queryState.getClientState().getRawKeyspace()); + } + + public int compareTo(FQLQuery other) + { + return Longs.compare(queryStartTime, other.queryStartTime); + } + + public String toString() + { + return "FQLQuery{" + + "queryStartTime=" + queryStartTime + + ", protocolVersion=" + protocolVersion + + ", queryState='" + queryState + '\'' + + '}'; + } + + public static class Single extends FQLQuery + { + public final String query; + public final List<ByteBuffer> values; + + public Single(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, String queryString, List<ByteBuffer> values) + { + super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds); + this.query = queryString; + this.values = values; + } + + @Override + public String toString() + { + return String.format("%s%nQuery = %s, Values = %s", + super.toString(), + query, + values.stream().map(ByteBufferUtil::bytesToHex).collect(Collectors.joining(","))); + } + + public Statement toStatement() + { + SimpleStatement ss = new SimpleStatement(query, values.toArray()); + ss.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name())); + ss.setDefaultTimestamp(queryOptions.getTimestamp(queryState)); + return ss; + } + + public BinLog.ReleaseableWriteMarshallable toMarshallable() + { + + return new FullQueryLogger.Query(query, queryOptions, queryState, queryStartTime); + } + + public int compareTo(FQLQuery other) + { + int cmp = super.compareTo(other); + + if (cmp == 0) + { + if (other instanceof Batch) + return -1; + + Single singleQuery = (Single) other; + + cmp = query.compareTo(singleQuery.query); + if (cmp == 0) + { + if (values.size() != singleQuery.values.size()) + return values.size() - singleQuery.values.size(); + for (int i = 0; i < values.size(); i++) + { + cmp = values.get(i).compareTo(singleQuery.values.get(i)); + if (cmp != 0) + return cmp; + } + } + } + return cmp; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof Single)) return false; + if (!super.equals(o)) return false; + Single single = (Single) o; + return Objects.equals(query, single.query) && + Objects.equals(values, single.values); + } + + public int hashCode() + { + return Objects.hash(super.hashCode(), query, values); + } + } + + public static class Batch extends FQLQuery + { + public final BatchStatement.Type batchType; + public final List<Single> queries; + + public Batch(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryStartTime, long generatedTimestamp, int generatedNowInSeconds, BatchStatement.Type batchType, List<String> queries, List<List<ByteBuffer>> values) + { + super(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds); + this.batchType = batchType; + this.queries = new ArrayList<>(queries.size()); + for (int i = 0; i < queries.size(); i++) + this.queries.add(new Single(keyspace, protocolVersion, queryOptions, queryStartTime, generatedTimestamp, generatedNowInSeconds, queries.get(i), values.get(i))); + } + + public Statement toStatement() + { + BatchStatement bs = new BatchStatement(batchType); + for (Single query : queries) + bs.add(query.toStatement()); + bs.setConsistencyLevel(ConsistencyLevel.valueOf(queryOptions.getConsistency().name())); + bs.setDefaultTimestamp(queryOptions.getTimestamp(queryState)); + return bs; + } + + public int compareTo(FQLQuery other) + { + int cmp = super.compareTo(other); + + if (cmp == 0) + { + if (other instanceof Single) + return 1; + + Batch otherBatch = (Batch) other; + if (queries.size() != otherBatch.queries.size()) + return queries.size() - otherBatch.queries.size(); + for (int i = 0; i < queries.size(); i++) + { + cmp = queries.get(i).compareTo(otherBatch.queries.get(i)); + if (cmp != 0) + return cmp; + } + } + return cmp; + } + + public BinLog.ReleaseableWriteMarshallable toMarshallable() + { + List<String> queryStrings = new ArrayList<>(); + List<List<ByteBuffer>> values = new ArrayList<>(); + for (Single q : queries) + { + queryStrings.add(q.query); + values.add(q.values); + } + return new FullQueryLogger.Batch(org.apache.cassandra.cql3.statements.BatchStatement.Type.valueOf(batchType.name()), queryStrings, values, queryOptions, queryState, queryStartTime); + } + + public String toString() + { + StringBuilder sb = new StringBuilder(super.toString()).append("\nbatch: ").append(batchType).append('\n'); + for (Single q : queries) + sb.append(q.toString()).append('\n'); + sb.append("end batch"); + return sb.toString(); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof Batch)) return false; + if (!super.equals(o)) return false; + Batch batch = (Batch) o; + return batchType == batch.batchType && + Objects.equals(queries, batch.queries); + } + + public int hashCode() + { + return Objects.hash(super.hashCode(), batchType, queries); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryIterator.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryIterator.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryIterator.java new file mode 100644 index 0000000..ccbb200 --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryIterator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool; + +import java.util.PriorityQueue; + +import net.openhft.chronicle.queue.ExcerptTailer; +import org.apache.cassandra.utils.AbstractIterator; + +public class FQLQueryIterator extends AbstractIterator<FQLQuery> +{ + // use a priority queue to be able to sort the head of the query logs in memory + private final PriorityQueue<FQLQuery> pq; + private final ExcerptTailer tailer; + private final FQLQueryReader reader; + + /** + * Create an iterator over the FQLQueries in tailer + * + * Reads up to readAhead queries in to memory to be able to sort them (the files are mostly sorted already) + */ + public FQLQueryIterator(ExcerptTailer tailer, int readAhead) + { + assert readAhead > 0 : "readAhead needs to be > 0"; + reader = new FQLQueryReader(); + this.tailer = tailer; + pq = new PriorityQueue<>(readAhead); + for (int i = 0; i < readAhead; i++) + { + FQLQuery next = readNext(); + if (next != null) + pq.add(next); + else + break; + } + } + + protected FQLQuery computeNext() + { + FQLQuery q = pq.poll(); + if (q == null) + return endOfData(); + FQLQuery next = readNext(); + if (next != null) + pq.add(next); + return q; + } + + private FQLQuery readNext() + { + if (tailer.readDocument(reader)) + return reader.getQuery(); + return null; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java new file mode 100644 index 0000000..fd5073c --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FQLQueryReader.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool; + + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import com.datastax.driver.core.BatchStatement; +import io.netty.buffer.Unpooled; +import net.openhft.chronicle.core.io.IORuntimeException; +import net.openhft.chronicle.wire.ReadMarshallable; +import net.openhft.chronicle.wire.ValueIn; +import net.openhft.chronicle.wire.WireIn; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.transport.ProtocolVersion; + +import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_NOW_IN_SECONDS; +import static org.apache.cassandra.audit.FullQueryLogger.GENERATED_TIMESTAMP; +import static org.apache.cassandra.audit.FullQueryLogger.KEYSPACE; +import static org.apache.cassandra.audit.FullQueryLogger.PROTOCOL_VERSION; +import static org.apache.cassandra.audit.FullQueryLogger.QUERY_OPTIONS; +import static org.apache.cassandra.audit.FullQueryLogger.QUERY_START_TIME; +import static org.apache.cassandra.audit.FullQueryLogger.TYPE; +import static org.apache.cassandra.audit.FullQueryLogger.VERSION; +import static org.apache.cassandra.audit.FullQueryLogger.BATCH; +import static org.apache.cassandra.audit.FullQueryLogger.BATCH_TYPE; +import static org.apache.cassandra.audit.FullQueryLogger.QUERIES; +import static org.apache.cassandra.audit.FullQueryLogger.QUERY; +import static org.apache.cassandra.audit.FullQueryLogger.SINGLE_QUERY; +import static org.apache.cassandra.audit.FullQueryLogger.VALUES; + +public class FQLQueryReader implements ReadMarshallable +{ + private FQLQuery query; + + public void readMarshallable(WireIn wireIn) throws IORuntimeException + { + int currentVersion = wireIn.read(VERSION).int16(); + String type = wireIn.read(TYPE).text(); + long queryStartTime = wireIn.read(QUERY_START_TIME).int64(); + int protocolVersion = wireIn.read(PROTOCOL_VERSION).int32(); + QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read(QUERY_OPTIONS).bytes()), ProtocolVersion.decode(protocolVersion)); + long generatedTimestamp = wireIn.read(GENERATED_TIMESTAMP).int64(); + int generatedNowInSeconds = wireIn.read(GENERATED_NOW_IN_SECONDS).int32(); + String keyspace = wireIn.read(KEYSPACE).text(); + + switch (type) + { + case SINGLE_QUERY: + String queryString = wireIn.read(QUERY).text(); + query = new FQLQuery.Single(keyspace, + protocolVersion, + queryOptions, + queryStartTime, + generatedTimestamp, + generatedNowInSeconds, + queryString, + queryOptions.getValues()); + break; + case BATCH: + BatchStatement.Type batchType = BatchStatement.Type.valueOf(wireIn.read(BATCH_TYPE).text()); + ValueIn in = wireIn.read(QUERIES); + int queryCount = in.int32(); + + List<String> queries = new ArrayList<>(queryCount); + for (int i = 0; i < queryCount; i++) + queries.add(in.text()); + in = wireIn.read(VALUES); + int valueCount = in.int32(); + List<List<ByteBuffer>> values = new ArrayList<>(valueCount); + for (int ii = 0; ii < valueCount; ii++) + { + List<ByteBuffer> subValues = new ArrayList<>(); + values.add(subValues); + int numSubValues = in.int32(); + for (int zz = 0; zz < numSubValues; zz++) + subValues.add(ByteBuffer.wrap(in.bytes())); + } + query = new FQLQuery.Batch(keyspace, + protocolVersion, + queryOptions, + queryStartTime, + generatedTimestamp, + generatedNowInSeconds, + batchType, + queries, + values); + break; + default: + throw new RuntimeException("Unknown type: " + type); + } + } + + public FQLQuery getQuery() + { + return query; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/FullQueryLogTool.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/FullQueryLogTool.java b/tools/fqltool/src/org/apache/cassandra/fqltool/FullQueryLogTool.java new file mode 100644 index 0000000..97e7487 --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/FullQueryLogTool.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool; + +import java.util.List; + +import com.google.common.base.Throwables; + +import io.airlift.airline.Cli; +import io.airlift.airline.Help; +import io.airlift.airline.ParseArgumentsMissingException; +import io.airlift.airline.ParseArgumentsUnexpectedException; +import io.airlift.airline.ParseCommandMissingException; +import io.airlift.airline.ParseCommandUnrecognizedException; +import io.airlift.airline.ParseOptionConversionException; +import io.airlift.airline.ParseOptionMissingException; +import io.airlift.airline.ParseOptionMissingValueException; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.fqltool.commands.Compare; +import org.apache.cassandra.fqltool.commands.Dump; +import org.apache.cassandra.fqltool.commands.Replay; + +import static com.google.common.base.Throwables.getStackTraceAsString; +import static com.google.common.collect.Lists.newArrayList; + +public class FullQueryLogTool +{ + public static void main(String... args) + { + DatabaseDescriptor.clientInitialization(); + List<Class<? extends Runnable>> commands = newArrayList( + Help.class, + Dump.class, + Replay.class, + Compare.class + ); + + Cli.CliBuilder<Runnable> builder = Cli.builder("fqltool"); + + builder.withDescription("Manipulate the contents of full query log files") + .withDefaultCommand(Help.class) + .withCommands(commands); + + Cli<Runnable> parser = builder.build(); + + int status = 0; + try + { + parser.parse(args).run(); + } catch (IllegalArgumentException | + IllegalStateException | + ParseArgumentsMissingException | + ParseArgumentsUnexpectedException | + ParseOptionConversionException | + ParseOptionMissingException | + ParseOptionMissingValueException | + ParseCommandMissingException | + ParseCommandUnrecognizedException e) + { + badUse(e); + status = 1; + } catch (Throwable throwable) + { + err(Throwables.getRootCause(throwable)); + status = 2; + } + + System.exit(status); + } + + private static void badUse(Exception e) + { + System.out.println("fqltool: " + e.getMessage()); + System.out.println("See 'fqltool help' or 'fqltool help <command>'."); + } + + private static void err(Throwable e) + { + System.err.println("error: " + e.getMessage()); + System.err.println("-- StackTrace --"); + System.err.println(getStackTraceAsString(e)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f83bd5ac/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java ---------------------------------------------------------------------- diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java b/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java new file mode 100644 index 0000000..d8653e5 --- /dev/null +++ b/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fqltool; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +//import javax.annotation.Nullable; + +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import org.apache.cassandra.utils.FBUtilities; + +public class QueryReplayer implements Closeable +{ + private static final int PRINT_RATE = 5000; + private final ExecutorService es = Executors.newFixedThreadPool(1); + private final Iterator<List<FQLQuery>> queryIterator; + private final List<Cluster> targetClusters; + private final List<Predicate<FQLQuery>> filters; + private final List<Session> sessions; + private final ResultHandler resultHandler; + private final MetricRegistry metrics = new MetricRegistry(); + private final boolean debug; + private final PrintStream out; + + public QueryReplayer(Iterator<List<FQLQuery>> queryIterator, + List<String> targetHosts, + List<File> resultPaths, + List<Predicate<FQLQuery>> filters, + PrintStream out, + String queryFilePathString, + boolean debug) + { + this.queryIterator = queryIterator; + targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList()); + this.filters = filters; + sessions = targetClusters.stream().map(Cluster::connect).collect(Collectors.toList()); + File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null; + resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath); + this.debug = debug; + this.out = out; + } + + public void replay() + { + while (queryIterator.hasNext()) + { + List<FQLQuery> queries = queryIterator.next(); + for (FQLQuery query : queries) + { + if (filters.stream().anyMatch(f -> !f.test(query))) + continue; + try (Timer.Context ctx = metrics.timer("queries").time()) + { + List<ListenableFuture<ResultHandler.ComparableResultSet>> results = new ArrayList<>(sessions.size()); + Statement statement = query.toStatement(); + for (Session session : sessions) + { + maybeSetKeyspace(session, query); + if (debug) + { + out.println("Executing query:"); + out.println(query); + } + ListenableFuture<ResultSet> future = session.executeAsync(statement); + results.add(handleErrors(future)); + } + + ListenableFuture<List<ResultHandler.ComparableResultSet>> resultList = Futures.allAsList(results); + + Futures.addCallback(resultList, new FutureCallback<List<ResultHandler.ComparableResultSet>>() + { + public void onSuccess(/*@Nullable */List<ResultHandler.ComparableResultSet> resultSets) + { + // note that the order of resultSets is signifcant here - resultSets.get(x) should + // be the result from a query against targetHosts.get(x) + resultHandler.handleResults(query, resultSets); + } + + public void onFailure(Throwable throwable) + { + throw new AssertionError("Errors should be handled in FQLQuery.execute", throwable); + } + }, es); + + FBUtilities.waitOnFuture(resultList); + } + catch (Throwable t) + { + out.printf("QUERY %s got exception: %s", query, t.getMessage()); + } + + Timer timer = metrics.timer("queries"); + if (timer.getCount() % PRINT_RATE == 0) + out.printf("%d queries, rate = %.2f%n", timer.getCount(), timer.getOneMinuteRate()); + } + } + } + + private void maybeSetKeyspace(Session session, FQLQuery query) + { + try + { + if (query.keyspace() != null && !query.keyspace().equals(session.getLoggedKeyspace())) + { + if (debug) + out.printf("Switching keyspace from %s to %s%n", session.getLoggedKeyspace(), query.keyspace()); + session.execute("USE " + query.keyspace()); + } + } + catch (Throwable t) + { + out.printf("USE %s failed: %s%n", query.keyspace(), t.getMessage()); + } + } + + /** + * Make sure we catch any query errors + * + * On error, this creates a failed ComparableResultSet with the exception set to be able to store + * this fact in the result file and handle comparison of failed result sets. + */ + private static ListenableFuture<ResultHandler.ComparableResultSet> handleErrors(ListenableFuture<ResultSet> result) + { + FluentFuture<ResultHandler.ComparableResultSet> fluentFuture = FluentFuture.from(result) + .transform(DriverResultSet::new, MoreExecutors.directExecutor()); + return fluentFuture.catching(Throwable.class, DriverResultSet::failed, MoreExecutors.directExecutor()); + } + + public void close() throws IOException + { + sessions.forEach(Session::close); + targetClusters.forEach(Cluster::close); + resultHandler.close(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
