Noble: TestCoordinatorRole.testNRTRestart is breaking on jenkins on 9x a ridiculous number of times since you added it a week ago.
IIUC this test has *NEVER* passed on a jenkins 9x build (only on the main builds) -Hoss : Date: Thu, 12 Jan 2023 07:54:33 +0000 : From: no...@apache.org : Reply-To: dev@solr.apache.org : To: "comm...@solr.apache.org" <comm...@solr.apache.org> : Subject: [solr] branch branch_9x updated: test case added for coordinator role : : This is an automated email from the ASF dual-hosted git repository. : : noble pushed a commit to branch branch_9x : in repository https://gitbox.apache.org/repos/asf/solr.git : : : The following commit(s) were added to refs/heads/branch_9x by this push: : new ec9b152c31f test case added for coordinator role : ec9b152c31f is described below : : commit ec9b152c31fac99fe190ccc98e754c1200bd9fd2 : Author: Noble Paul <noble.p...@gmail.com> : AuthorDate: Thu Jan 12 18:54:15 2023 +1100 : : test case added for coordinator role : --- : .../apache/solr/search/TestCoordinatorRole.java | 412 +++++++++++++++++++-- : 1 file changed, 375 insertions(+), 37 deletions(-) : : diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java : index 5e2dcfb70a8..6c4e845cf5a 100644 : --- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java : +++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java : @@ -17,69 +17,407 @@ : : package org.apache.solr.search; : : +import static org.apache.solr.common.params.CommonParams.OMIT_HEADER; : +import static org.apache.solr.common.params.CommonParams.TRUE; : + : +import java.lang.invoke.MethodHandles; : +import java.util.Date; : +import java.util.EnumSet; : import java.util.HashSet; : import java.util.List; : +import java.util.Random; : import java.util.Set; : +import java.util.concurrent.ExecutorService; : +import java.util.concurrent.Future; : +import java.util.concurrent.TimeUnit; : +import java.util.concurrent.atomic.AtomicBoolean; : +import java.util.function.Consumer; : import org.apache.solr.client.solrj.SolrQuery; : import org.apache.solr.client.solrj.impl.CloudSolrClient; : +import org.apache.solr.client.solrj.impl.HttpSolrClient; : import org.apache.solr.client.solrj.request.CollectionAdminRequest; : import org.apache.solr.client.solrj.request.QueryRequest; : import org.apache.solr.client.solrj.request.UpdateRequest; : import org.apache.solr.client.solrj.response.QueryResponse; : +import org.apache.solr.cloud.MiniSolrCloudCluster; : import org.apache.solr.cloud.SolrCloudTestCase; : +import org.apache.solr.common.SolrDocumentList; : +import org.apache.solr.common.SolrException; : import org.apache.solr.common.SolrInputDocument; : import org.apache.solr.common.cloud.DocCollection; : +import org.apache.solr.common.cloud.Replica; : +import org.apache.solr.common.params.CommonParams; : +import org.apache.solr.common.util.ExecutorUtil; : +import org.apache.solr.common.util.SimpleOrderedMap; : +import org.apache.solr.common.util.SolrNamedThreadFactory; : +import org.apache.solr.common.util.Utils; : import org.apache.solr.core.NodeRoles; : import org.apache.solr.embedded.JettySolrRunner; : import org.apache.solr.servlet.CoordinatorHttpSolrCall; : -import org.junit.BeforeClass; : +import org.slf4j.Logger; : +import org.slf4j.LoggerFactory; : : public class TestCoordinatorRole extends SolrCloudTestCase { : - : - @BeforeClass : - public static void setupCluster() throws Exception { : - configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure(); : - } : + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); : : public void testSimple() throws Exception { : - CloudSolrClient client = cluster.getSolrClient(); : - String COLLECTION_NAME = "test_coll"; : - String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf"; : - CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2) : - .process(cluster.getSolrClient()); : - cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4); : - UpdateRequest ur = new UpdateRequest(); : - for (int i = 0; i < 10; i++) { : - SolrInputDocument doc2 = new SolrInputDocument(); : - doc2.addField("id", "" + i); : - ur.add(doc2); : - } : + MiniSolrCloudCluster cluster = : + configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure(); : + try { : + CloudSolrClient client = cluster.getSolrClient(); : + String COLLECTION_NAME = "test_coll"; : + String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf"; : + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2) : + .process(cluster.getSolrClient()); : + cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4); : + UpdateRequest ur = new UpdateRequest(); : + for (int i = 0; i < 10; i++) { : + SolrInputDocument doc2 = new SolrInputDocument(); : + doc2.addField("id", "" + i); : + ur.add(doc2); : + } : : - ur.commit(client, COLLECTION_NAME); : - QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*")); : - assertEquals(10, rsp.getResults().getNumFound()); : + ur.commit(client, COLLECTION_NAME); : + QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*")); : + assertEquals(10, rsp.getResults().getNumFound()); : : + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); : + JettySolrRunner coordinatorJetty = null; : + try { : + coordinatorJetty = cluster.startJettySolrRunner(); : + } finally { : + System.clearProperty(NodeRoles.NODE_ROLES_PROP); : + } : + QueryResponse rslt = : + new QueryRequest(new SolrQuery("*:*")) : + .setPreferredNodes(List.of(coordinatorJetty.getNodeName())) : + .process(client, COLLECTION_NAME); : + : + assertEquals(10, rslt.getResults().size()); : + : + DocCollection collection = : + cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION); : + assertNotNull(collection); : + : + Set<String> expectedNodes = new HashSet<>(); : + expectedNodes.add(coordinatorJetty.getNodeName()); : + collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName())); : + assertTrue(expectedNodes.isEmpty()); : + } finally { : + cluster.shutdown(); : + } : + } : + : + public void testNRTRestart() throws Exception { : + // we restart jetty and expect to find on disk data - need a local fs directory : + useFactory(null); : + String COLL = "coordinator_test_coll"; : + MiniSolrCloudCluster cluster = : + configureCluster(3) : + .withJettyConfig(jetty -> jetty.enableV2(true)) : + .addConfig("conf", configset("conf2")) : + .configure(); : System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); : - JettySolrRunner coordinatorJetty = null; : + JettySolrRunner qaJetty = cluster.startJettySolrRunner(); : + String qaJettyBase = qaJetty.getBaseUrl().toString(); : + System.clearProperty(NodeRoles.NODE_ROLES_PROP); : + ExecutorService executor = : + ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("manipulateJetty")); : try { : - coordinatorJetty = cluster.startJettySolrRunner(); : + CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0, 1) : + .process(cluster.getSolrClient()); : + cluster.waitForActiveCollection(COLL, 1, 2); : + DocCollection docColl = : + cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL); : + Replica nrtReplica = docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0); : + assertNotNull(nrtReplica); : + String nrtCore = nrtReplica.getCoreName(); : + Replica pullReplica = docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0); : + assertNotNull(pullReplica); : + String pullCore = pullReplica.getCoreName(); : + : + SolrInputDocument sid = new SolrInputDocument(); : + sid.addField("id", "123"); : + sid.addField("desc_s", "A Document"); : + JettySolrRunner nrtJetty = null; : + JettySolrRunner pullJetty = null; : + for (JettySolrRunner j : cluster.getJettySolrRunners()) { : + String nodeName = j.getNodeName(); : + if (nodeName.equals(nrtReplica.getNodeName())) { : + nrtJetty = j; : + } else if (nodeName.equals(pullReplica.getNodeName())) { : + pullJetty = j; : + } : + } : + assertNotNull(nrtJetty); : + assertNotNull(pullJetty); : + try (HttpSolrClient client = (HttpSolrClient) pullJetty.newClient()) { : + client.add(COLL, sid); : + client.commit(COLL); : + assertEquals( : + nrtCore, : + getHostCoreName( : + COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:NRT"))); : + assertEquals( : + pullCore, : + getHostCoreName( : + COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:PULL"))); : + // Now , kill NRT jetty : + JettySolrRunner nrtJettyF = nrtJetty; : + JettySolrRunner pullJettyF = pullJetty; : + Random r = random(); : + final long establishBaselineMs = r.nextInt(1000); : + final long nrtDowntimeMs = r.nextInt(10000); : + // NOTE: for `pullServiceTimeMs`, it can't be super-short. This is just to simplify our : + // indexing code, : + // based on the fact that our indexing is based on a PULL-node client. : + final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000); : + Future<?> jettyManipulationFuture = : + executor.submit( : + () -> { : + // we manipulate the jetty instances in a separate thread to more closely mimic : + // the behavior we'd : + // see irl. : + try { : + Thread.sleep(establishBaselineMs); : + log.info("stopping NRT jetty ..."); : + nrtJettyF.stop(); : + log.info("NRT jetty stopped."); : + Thread.sleep(nrtDowntimeMs); // let NRT be down for a while : + log.info("restarting NRT jetty ..."); : + nrtJettyF.start(true); : + log.info("NRT jetty restarted."); : + // once NRT is back up, we expect PULL to continue serving until the TTL on ZK : + // state : + // used for query request routing has expired (60s). But here we force a return : + // to NRT : + // by stopping the PULL replica after a brief delay ... : + Thread.sleep(pullServiceTimeMs); : + log.info("stopping PULL jetty ..."); : + pullJettyF.stop(); : + log.info("PULL jetty stopped."); : + } catch (Exception e) { : + throw new RuntimeException(e); : + } : + }); : + String hostCore; : + long start = new Date().getTime(); : + long individualRequestStart = start; : + int count = 0; : + while (nrtCore.equals( : + hostCore = : + getHostCoreName( : + COLL, : + qaJettyBase, : + client, : + p -> p.add("shards.preference", "replica.type:NRT")))) { : + count++; : + individualRequestStart = new Date().getTime(); : + } : + long now = new Date().getTime(); : + log.info( : + "phase1 NRT queries count={}, overall_duration={}, baseline_expected_overall_duration={}, switch-to-pull_duration={}", : + count, : + now - start, : + establishBaselineMs, : + now - individualRequestStart); : + // default tolerance of 500ms below should suffice. Failover to PULL for this case should be : + // very fast, : + // because our QA-based client already knows both replicas are active, the index is stable, : + // so the moment : + // the client finds NRT is down it should be able to failover immediately and transparently : + // to PULL. : + assertEquals( : + "when we break out of the NRT query loop, should be b/c routed to PULL", : + pullCore, : + hostCore); : + SolrInputDocument d = new SolrInputDocument(); : + d.addField("id", "345"); : + d.addField("desc_s", "Another Document"); : + // attempts to add another doc while NRT is down should fail, then eventually succeed when : + // NRT comes back up : + count = 0; : + start = new Date().getTime(); : + individualRequestStart = start; : + for (; ; ) { : + try { : + client.add(COLL, d); : + client.commit(COLL); : + break; : + } catch (SolrException ex) { : + // we expect these until nrtJetty is back up. : + count++; : + Thread.sleep(100); : + } : + individualRequestStart = new Date().getTime(); : + } : + now = new Date().getTime(); : + log.info( : + "successfully added another doc; duration: {}, overall_duration={}, baseline_expected_overall_duration={}, exception_count={}", : + now - individualRequestStart, : + now - start, : + nrtDowntimeMs, : + count); : + // NRT replica is back up, registered as available with Zk, and availability info has been : + // pulled down by : + // our PULL-replica-based `client`, forwarded indexing command to NRT, index/commit : + // completed. All of this : + // accounts for the 3000ms tolerance allowed for below. This is not a strict value, and if : + // it causes failures : + // regularly we should feel free to increase the tolerance; but it's meant to provide a : + // stable baseline from : + // which to detect regressions. : + count = 0; : + start = new Date().getTime(); : + individualRequestStart = start; : + while (pullCore.equals( : + hostCore = : + getHostCoreName( : + COLL, : + qaJettyBase, : + client, : + p -> { : + p.set(CommonParams.Q, "id:345"); : + p.add("shards.preference", "replica.type:NRT"); : + }))) { : + count++; : + Thread.sleep(100); : + individualRequestStart = new Date().getTime(); : + } : + now = new Date().getTime(); : + log.info( : + "query retries between NRT index-ready and query-ready: {}; overall_duration={}; baseline_expected_overall_duration={}; failover-request_duration={}", : + count, : + now - start, : + pullServiceTimeMs, : + now - individualRequestStart); : + assertEquals(nrtCore, hostCore); : + // allow any exceptions to propagate : + jettyManipulationFuture.get(); : + if (true) return; : + : + // next phase: just toggle a bunch : + // TODO: could separate this out into a different test method, but this should suffice for : + // now : + pullJetty.start(true); : + AtomicBoolean done = new AtomicBoolean(); : + long runMinutes = 1; : + long finishTimeMs = : + new Date().getTime() + TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES); : + JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF, pullJettyF}; : + Random threadRandom = new Random(r.nextInt()); : + Future<Integer> f = : + executor.submit( : + () -> { : + int iteration = 0; : + while (new Date().getTime() < finishTimeMs && !done.get()) { : + int idx = iteration++ % jettys.length; : + JettySolrRunner toManipulate = jettys[idx]; : + try { : + int serveTogetherTime = threadRandom.nextInt(7000); : + int downTime = threadRandom.nextInt(7000); : + log.info("serving together for {}ms", serveTogetherTime); : + Thread.sleep(serveTogetherTime); : + log.info("stopping {} ...", idx); : + toManipulate.stop(); : + log.info("stopped {}.", idx); : + Thread.sleep(downTime); : + log.info("restarting {} ...", idx); : + toManipulate.start(true); : + log.info("restarted {}.", idx); : + } catch (Exception e) { : + throw new RuntimeException(e); : + } : + } : + done.set(true); : + return iteration; : + }); : + count = 0; : + start = new Date().getTime(); : + try { : + do { : + pullCore.equals( : + hostCore = : + getHostCoreName( : + COLL, : + qaJettyBase, : + client, : + p -> { : + p.set(CommonParams.Q, "id:345"); : + p.add("shards.preference", "replica.type:NRT"); : + })); : + count++; : + Thread.sleep(100); : + } while (!done.get()); : + } finally { : + final String result; : + if (done.getAndSet(true)) { : + result = "Success"; : + } else { : + // not yet set to done, completed abnormally (exception will be thrown beyond `finally` : + // block) : + result = "Failure"; : + } : + Integer toggleCount = f.get(); : + long secondsDuration = : + TimeUnit.SECONDS.convert(new Date().getTime() - start, TimeUnit.MILLISECONDS); : + log.info( : + "{}! {} seconds, {} toggles, {} requests served", : + result, : + secondsDuration, : + toggleCount, : + count); : + } : + } : } finally { : - System.clearProperty(NodeRoles.NODE_ROLES_PROP); : + try { : + ExecutorUtil.shutdownAndAwaitTermination(executor); : + } finally { : + cluster.shutdown(); : + } : } : - QueryResponse rslt = : - new QueryRequest(new SolrQuery("*:*")) : - .setPreferredNodes(List.of(coordinatorJetty.getNodeName())) : - .process(client, COLLECTION_NAME); : - : - assertEquals(10, rslt.getResults().size()); : + } : : - DocCollection collection = : - cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION); : - assertNotNull(collection); : + @SuppressWarnings("rawtypes") : + private String getHostCoreName( : + String COLL, String qaNode, HttpSolrClient solrClient, Consumer<SolrQuery> p) : + throws Exception { : : - Set<String> expectedNodes = new HashSet<>(); : - expectedNodes.add(coordinatorJetty.getNodeName()); : - collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName())); : - assertTrue(expectedNodes.isEmpty()); : + boolean found = false; : + SolrQuery q = new SolrQuery("*:*"); : + q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER, TRUE); : + p.accept(q); : + StringBuilder sb = : + new StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin"); : + q.forEach(e -> sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0])); : + SolrDocumentList docs = null; : + for (int i = 0; i < 100; i++) { : + try { : + SimpleOrderedMap rsp = : + (SimpleOrderedMap) : + Utils.executeGET(solrClient.getHttpClient(), sb.toString(), Utils.JAVABINCONSUMER); : + docs = (SolrDocumentList) rsp.get("response"); : + if (docs.size() > 0) { : + found = true; : + break; : + } : + } catch (SolrException ex) { : + // we know we're doing tricky things that might cause transient errors : + // TODO: all these query requests go to the QA node -- should QA propagate internal request : + // errors : + // to the external client (and the external client retry?) or should QA attempt to failover : + // transparently : + // in the event of an error? : + if (i < 5) { : + log.info("swallowing transient error", ex); : + } else { : + log.error("only expect actual _errors_ within a small window (e.g. 500ms)", ex); : + fail("initial error time threshold exceeded"); : + } : + } : + Thread.sleep(100); : + } : + assertTrue(found); : + return (String) docs.get(0).getFieldValue("_core_"); : } : } : : -Hoss http://www.lucidworks.com/ --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@solr.apache.org For additional commands, e-mail: dev-h...@solr.apache.org