This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c15f530 Make waiting for session event persistence more reliable in
SecondaryIndexTest#test_only_coordinator_chooses_index_for_query
c15f530 is described below
commit c15f530b63a1cd4d5b2835bb418197145beb7bb6
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Fri Nov 19 12:58:02 2021 -0600
Make waiting for session event persistence more reliable in
SecondaryIndexTest#test_only_coordinator_chooses_index_for_query
patch by Caleb Rackliffe; reviewed by David Capwell for CASSANDRA-17165
---
.../distributed/test/SecondaryIndexTest.java | 58 ++++++++++++++--------
1 file changed, 37 insertions(+), 21 deletions(-)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
index 7b1c4ad..b530dcc 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
@@ -20,14 +20,16 @@ package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -43,9 +45,10 @@ public class SecondaryIndexTest extends TestBaseImpl
private static final int NUM_NODES = 3;
private static final int REPLICATION_FACTOR = 1;
private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v int,
PRIMARY KEY (k))";
- private static final String CREATE_INDEX = "CREATE INDEX v_index ON %s(v)";
+ private static final String CREATE_INDEX = "CREATE INDEX v_index_%d ON
%s(v)";
private static final AtomicInteger seq = new AtomicInteger();
+
private static String tableName;
private static Cluster cluster;
@@ -65,10 +68,9 @@ public class SecondaryIndexTest extends TestBaseImpl
@Before
public void before()
{
- // create the table
tableName = String.format("%s.t_%d", KEYSPACE, seq.getAndIncrement());
cluster.schemaChange(String.format(CREATE_TABLE, tableName));
- cluster.schemaChange(String.format(CREATE_INDEX, tableName));
+ cluster.schemaChange(String.format(CREATE_INDEX, seq.get(),
tableName));
}
@After
@@ -78,31 +80,45 @@ public class SecondaryIndexTest extends TestBaseImpl
}
@Test
- public void test_only_coordinator_chooses_index_for_query() throws
InterruptedException, UnknownHostException
+ public void test_only_coordinator_chooses_index_for_query()
{
for (int i = 0 ; i < 99 ; ++i)
cluster.coordinator(1).execute(String.format("INSERT INTO %s (k,
v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i, i/3);
cluster.forEach(i -> i.flush(KEYSPACE));
- for (int i = 0 ; i < 33 ; ++i)
+ Pattern indexScanningPattern =
+ Pattern.compile(String.format("Index mean cardinalities are
v_index_%d:[0-9]+. Scanning with v_index_%d.", seq.get(), seq.get()));
+
+ for (int i = 0 ; i < 33; ++i)
{
UUID trace = UUID.randomUUID();
Object[][] result =
cluster.coordinator(1).executeWithTracing(trace, String.format("SELECT * FROM
%s WHERE v = ?", tableName), ConsistencyLevel.ALL, i);
- Assert.assertEquals(3, result.length);
- Thread.sleep(100L);
- Object[][] traces =
cluster.coordinator(1).execute(String.format("SELECT source, activity FROM
system_traces.events WHERE session_id = ?", tableName), ConsistencyLevel.ALL,
trace);
- List<InetAddress> scanning = Arrays.stream(traces)
- .filter(t ->
t[1].toString().matches("Index mean cardinalities are v_index:[0-9]+. Scanning
with v_index."))
- .map(t -> (InetAddress) t[0])
-
.distinct().collect(Collectors.toList());
-
- List<InetAddress> executing = Arrays.stream(traces)
- .filter(t ->
t[1].toString().equals("Executing read on " + tableName + " using index
v_index"))
- .map(t -> (InetAddress) t[0])
-
.distinct().collect(Collectors.toList());
-
-
Assert.assertEquals(Collections.singletonList(cluster.get(1).broadcastAddress().getAddress()),
scanning);
- Assert.assertEquals(3, executing.size());
+ Assert.assertEquals("Failed on iteration " + i, 3, result.length);
+
+ Awaitility.await("For all events in the tracing session to
persist")
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ .atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() ->
+ {
+ Object[][] traces =
cluster.coordinator(1)
+
.execute("SELECT source, activity FROM system_traces.events WHERE session_id =
?",
+
ConsistencyLevel.ALL, trace);
+
+ List<InetAddress> scanning =
+ Arrays.stream(traces)
+ .filter(t ->
indexScanningPattern.matcher(t[1].toString()).matches())
+ .map(t -> (InetAddress)
t[0])
+
.distinct().collect(Collectors.toList());
+
+ List<InetAddress> executing =
+ Arrays.stream(traces)
+ .filter(t ->
t[1].toString().equals(String.format("Executing read on " + tableName + " using
index v_index_%d", seq.get())))
+ .map(t -> (InetAddress)
t[0])
+
.distinct().collect(Collectors.toList());
+
+
Assert.assertEquals(Collections.singletonList(cluster.get(1).broadcastAddress().getAddress()),
scanning);
+ Assert.assertEquals(3,
executing.size());
+ });
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]