This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c15f530  Make waiting for session event persistence more reliable in 
SecondaryIndexTest#test_only_coordinator_chooses_index_for_query
c15f530 is described below

commit c15f530b63a1cd4d5b2835bb418197145beb7bb6
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Fri Nov 19 12:58:02 2021 -0600

    Make waiting for session event persistence more reliable in 
SecondaryIndexTest#test_only_coordinator_chooses_index_for_query
    
    patch by Caleb Rackliffe; reviewed by David Capwell for CASSANDRA-17165
---
 .../distributed/test/SecondaryIndexTest.java       | 58 ++++++++++++++--------
 1 file changed, 37 insertions(+), 21 deletions(-)

diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
index 7b1c4ad..b530dcc 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexTest.java
@@ -20,14 +20,16 @@ package org.apache.cassandra.distributed.test;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -43,9 +45,10 @@ public class SecondaryIndexTest extends TestBaseImpl
     private static final int NUM_NODES = 3;
     private static final int REPLICATION_FACTOR = 1;
     private static final String CREATE_TABLE = "CREATE TABLE %s(k int, v int, 
PRIMARY KEY (k))";
-    private static final String CREATE_INDEX = "CREATE INDEX v_index ON %s(v)";
+    private static final String CREATE_INDEX = "CREATE INDEX v_index_%d ON 
%s(v)";
 
     private static final AtomicInteger seq = new AtomicInteger();
+    
     private static String tableName;
     private static Cluster cluster;
 
@@ -65,10 +68,9 @@ public class SecondaryIndexTest extends TestBaseImpl
     @Before
     public void before()
     {
-        // create the table
         tableName = String.format("%s.t_%d", KEYSPACE, seq.getAndIncrement());
         cluster.schemaChange(String.format(CREATE_TABLE, tableName));
-        cluster.schemaChange(String.format(CREATE_INDEX, tableName));
+        cluster.schemaChange(String.format(CREATE_INDEX, seq.get(), 
tableName));
     }
 
     @After
@@ -78,31 +80,45 @@ public class SecondaryIndexTest extends TestBaseImpl
     }
 
     @Test
-    public void test_only_coordinator_chooses_index_for_query() throws 
InterruptedException, UnknownHostException
+    public void test_only_coordinator_chooses_index_for_query()
     {
         for (int i = 0 ; i < 99 ; ++i)
             cluster.coordinator(1).execute(String.format("INSERT INTO %s (k, 
v) VALUES (?, ?)", tableName), ConsistencyLevel.ALL, i, i/3);
         cluster.forEach(i -> i.flush(KEYSPACE));
 
-        for (int i = 0 ; i < 33 ; ++i)
+        Pattern indexScanningPattern =
+                Pattern.compile(String.format("Index mean cardinalities are 
v_index_%d:[0-9]+. Scanning with v_index_%d.", seq.get(), seq.get()));
+
+        for (int i = 0 ; i < 33; ++i)
         {
             UUID trace = UUID.randomUUID();
             Object[][] result = 
cluster.coordinator(1).executeWithTracing(trace, String.format("SELECT * FROM 
%s WHERE v = ?", tableName), ConsistencyLevel.ALL, i);
-            Assert.assertEquals(3, result.length);
-            Thread.sleep(100L);
-            Object[][] traces = 
cluster.coordinator(1).execute(String.format("SELECT source, activity FROM 
system_traces.events WHERE session_id = ?", tableName), ConsistencyLevel.ALL, 
trace);
-            List<InetAddress> scanning = Arrays.stream(traces)
-                                               .filter(t -> 
t[1].toString().matches("Index mean cardinalities are v_index:[0-9]+. Scanning 
with v_index."))
-                                               .map(t -> (InetAddress) t[0])
-                                               
.distinct().collect(Collectors.toList());
-
-            List<InetAddress> executing = Arrays.stream(traces)
-                                                .filter(t -> 
t[1].toString().equals("Executing read on " + tableName + " using index 
v_index"))
-                                                .map(t -> (InetAddress) t[0])
-                                                
.distinct().collect(Collectors.toList());
-
-            
Assert.assertEquals(Collections.singletonList(cluster.get(1).broadcastAddress().getAddress()),
 scanning);
-            Assert.assertEquals(3, executing.size());
+            Assert.assertEquals("Failed on iteration " + i, 3, result.length);
+
+            Awaitility.await("For all events in the tracing session to 
persist")
+                    .pollInterval(100, TimeUnit.MILLISECONDS)
+                    .atMost(10, TimeUnit.SECONDS)
+                    .untilAsserted(() -> 
+                                   {
+                                       Object[][] traces = 
cluster.coordinator(1)
+                                                                  
.execute("SELECT source, activity FROM system_traces.events WHERE session_id = 
?", 
+                                                                           
ConsistencyLevel.ALL, trace);
+
+                                       List<InetAddress> scanning =
+                                               Arrays.stream(traces)
+                                                     .filter(t -> 
indexScanningPattern.matcher(t[1].toString()).matches())
+                                                     .map(t -> (InetAddress) 
t[0])
+                                                     
.distinct().collect(Collectors.toList());
+
+                                       List<InetAddress> executing =
+                                               Arrays.stream(traces)
+                                                     .filter(t -> 
t[1].toString().equals(String.format("Executing read on " + tableName + " using 
index v_index_%d", seq.get())))
+                                                     .map(t -> (InetAddress) 
t[0])
+                                                     
.distinct().collect(Collectors.toList());
+
+                                       
Assert.assertEquals(Collections.singletonList(cluster.get(1).broadcastAddress().getAddress()),
 scanning);
+                                       Assert.assertEquals(3, 
executing.size());
+                                   });
         }
     }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to