This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1fbd329 Release session from cache when closing QueryReplayer
1fbd329 is described below
commit 1fbd3297a9c8303ca7aa2ff30d182e5ca568de4c
Author: yifan-c <[email protected]>
AuthorDate: Fri Jan 17 22:13:50 2020 -0800
Release session from cache when closing QueryReplayer
Patch by Yifan Cai; reviewed by marcuse for CASSANDRA-15514
---
build.xml | 1 +
.../test/QueryReplayerEndToEndTest.java | 92 ++++++++++++++++++++++
.../apache/cassandra/fqltool/QueryReplayer.java | 10 +--
3 files changed, 98 insertions(+), 5 deletions(-)
diff --git a/build.xml b/build.xml
index 51ba2f5..961af39 100644
--- a/build.xml
+++ b/build.xml
@@ -1313,6 +1313,7 @@
encoding="utf-8">
<classpath>
<path refid="cassandra.classpath"/>
+ <pathelement location="${fqltool.build.classes}"/>
</classpath>
<compilerarg value="-XDignore.symbol.file"/>
<src path="${test.unit.src}"/>
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/QueryReplayerEndToEndTest.java
b/test/distributed/org/apache/cassandra/distributed/test/QueryReplayerEndToEndTest.java
new file mode 100644
index 0000000..9908fcc
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/QueryReplayerEndToEndTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.fqltool.FQLQuery;
+import org.apache.cassandra.fqltool.QueryReplayer;
+
+public class QueryReplayerEndToEndTest extends DistributedTestBase
+{
+ private final AtomicLong queryStartTimeGenerator = new AtomicLong(1000);
+ private final AtomicInteger ckGenerator = new AtomicInteger(1);
+
+ @Test
+ public void testReplayAndCloseMultipleTimes() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(3, conf ->
conf.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP, Feature.NETWORK))))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v int, PRIMARY KEY (pk, ck))");
+ List<String> hosts = cluster.stream()
+ .map(i ->
i.config().broadcastAddressAndPort().address.getHostAddress())
+ .collect(Collectors.toList());
+
+ final int queriesCount = 3;
+ // replay for the first time, it should pass
+
replayAndClose(Collections.singletonList(makeFQLQueries(queriesCount)), hosts);
+ // replay for the second time, it should pass too
+ // however, if the cached sessions are not released, the second
replay will reused the closed sessions from previous replay and fail to insert
+
replayAndClose(Collections.singletonList(makeFQLQueries(queriesCount)), hosts);
+ Object[][] result = cluster.coordinator(1)
+ .execute("SELECT * FROM " + KEYSPACE +
".tbl WHERE pk = ?",
+ ConsistencyLevel.QUORUM, 1);
+ Assert.assertEquals(String.format("Expecting to see %d rows since
it replayed twice and each with %d queries", queriesCount * 2, queriesCount),
+ queriesCount * 2, result.length);
+ }
+ }
+
+ private void replayAndClose(List<List<FQLQuery>> allFqlQueries,
List<String> hosts) throws IOException
+ {
+ List<Predicate<FQLQuery>> allowAll =
Collections.singletonList(fqlQuery -> true);
+ try (QueryReplayer queryReplayer = new
QueryReplayer(allFqlQueries.iterator(), hosts, null, allowAll, null))
+ {
+ queryReplayer.replay();
+ }
+ }
+
+ // generate a new list of FQLQuery for each invocation
+ private List<FQLQuery> makeFQLQueries(int n)
+ {
+ return IntStream.range(0, n)
+ .boxed()
+ .map(i -> new FQLQuery.Single(KEYSPACE,
+
QueryOptions.DEFAULT.getProtocolVersion().asInt(),
+ QueryOptions.DEFAULT,
queryStartTimeGenerator.incrementAndGet(),
+ 2222,
+ 3333,
+ String.format("INSERT
INTO %s.tbl (pk, ck, v) VALUES (1, %d, %d)", KEYSPACE,
ckGenerator.incrementAndGet(), i),
+ Collections.emptyList()))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
b/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
index b17ec53..4524e33 100644
--- a/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
+++ b/tools/fqltool/src/org/apache/cassandra/fqltool/QueryReplayer.java
@@ -257,18 +257,18 @@ public class QueryReplayer implements Closeable
public void close()
{
- for (Session s : sessionCache.values())
- {
- try
+ sessionCache.entrySet().removeIf(entry -> {
+ try (Session s = entry.getValue())
{
- s.close();
s.getCluster().close();
+ return true;
}
catch (Throwable t)
{
logger.error("Could not close connection", t);
+ return false;
}
- }
+ });
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]