Created jia & AwaitsFix'ed the test ...
https://issues.apache.org/jira/browse/SOLR-16630 : Date: Fri, 20 Jan 2023 13:36:38 -0700 (MST) : From: Chris Hostetter <[email protected]> : To: [email protected] : Cc: "[email protected]" <[email protected]> : Subject: Re: [solr] branch branch_9x updated: test case added for coordinator : role : : : Noble: TestCoordinatorRole.testNRTRestart is breaking on jenkins on 9x a : ridiculous number of times since you added it a week ago. : : IIUC this test has *NEVER* passed on a jenkins 9x build (only on the main : builds) : : -Hoss : : : : Date: Thu, 12 Jan 2023 07:54:33 +0000 : : From: [email protected] : : Reply-To: [email protected] : : To: "[email protected]" <[email protected]> : : Subject: [solr] branch branch_9x updated: test case added for coordinator role : : : : This is an automated email from the ASF dual-hosted git repository. : : : : noble pushed a commit to branch branch_9x : : in repository https://gitbox.apache.org/repos/asf/solr.git : : : : : : The following commit(s) were added to refs/heads/branch_9x by this push: : : new ec9b152c31f test case added for coordinator role : : ec9b152c31f is described below : : : : commit ec9b152c31fac99fe190ccc98e754c1200bd9fd2 : : Author: Noble Paul <[email protected]> : : AuthorDate: Thu Jan 12 18:54:15 2023 +1100 : : : : test case added for coordinator role : : --- : : .../apache/solr/search/TestCoordinatorRole.java | 412 +++++++++++++++++++-- : : 1 file changed, 375 insertions(+), 37 deletions(-) : : : : diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java : : index 5e2dcfb70a8..6c4e845cf5a 100644 : : --- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java : : +++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java : : @@ -17,69 +17,407 @@ : : : : package org.apache.solr.search; : : : : +import static org.apache.solr.common.params.CommonParams.OMIT_HEADER; : : +import static org.apache.solr.common.params.CommonParams.TRUE; : : + : : +import java.lang.invoke.MethodHandles; : : +import java.util.Date; : : +import java.util.EnumSet; : : import java.util.HashSet; : : import java.util.List; : : +import java.util.Random; : : import java.util.Set; : : +import java.util.concurrent.ExecutorService; : : +import java.util.concurrent.Future; : : +import java.util.concurrent.TimeUnit; : : +import java.util.concurrent.atomic.AtomicBoolean; : : +import java.util.function.Consumer; : : import org.apache.solr.client.solrj.SolrQuery; : : import org.apache.solr.client.solrj.impl.CloudSolrClient; : : +import org.apache.solr.client.solrj.impl.HttpSolrClient; : : import org.apache.solr.client.solrj.request.CollectionAdminRequest; : : import org.apache.solr.client.solrj.request.QueryRequest; : : import org.apache.solr.client.solrj.request.UpdateRequest; : : import org.apache.solr.client.solrj.response.QueryResponse; : : +import org.apache.solr.cloud.MiniSolrCloudCluster; : : import org.apache.solr.cloud.SolrCloudTestCase; : : +import org.apache.solr.common.SolrDocumentList; : : +import org.apache.solr.common.SolrException; : : import org.apache.solr.common.SolrInputDocument; : : import org.apache.solr.common.cloud.DocCollection; : : +import org.apache.solr.common.cloud.Replica; : : +import org.apache.solr.common.params.CommonParams; : : +import org.apache.solr.common.util.ExecutorUtil; : : +import org.apache.solr.common.util.SimpleOrderedMap; : : +import org.apache.solr.common.util.SolrNamedThreadFactory; : : +import org.apache.solr.common.util.Utils; : : import org.apache.solr.core.NodeRoles; : : import org.apache.solr.embedded.JettySolrRunner; : : import org.apache.solr.servlet.CoordinatorHttpSolrCall; : : -import org.junit.BeforeClass; : : +import org.slf4j.Logger; : : +import org.slf4j.LoggerFactory; : : : : public class TestCoordinatorRole extends SolrCloudTestCase { : : - : : - @BeforeClass : : - public static void setupCluster() throws Exception { : : - configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure(); : : - } : : + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); : : : : public void testSimple() throws Exception { : : - CloudSolrClient client = cluster.getSolrClient(); : : - String COLLECTION_NAME = "test_coll"; : : - String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf"; : : - CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2) : : - .process(cluster.getSolrClient()); : : - cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4); : : - UpdateRequest ur = new UpdateRequest(); : : - for (int i = 0; i < 10; i++) { : : - SolrInputDocument doc2 = new SolrInputDocument(); : : - doc2.addField("id", "" + i); : : - ur.add(doc2); : : - } : : + MiniSolrCloudCluster cluster = : : + configureCluster(4).addConfig("conf", configset("cloud-minimal")).configure(); : : + try { : : + CloudSolrClient client = cluster.getSolrClient(); : : + String COLLECTION_NAME = "test_coll"; : : + String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf"; : : + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2) : : + .process(cluster.getSolrClient()); : : + cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4); : : + UpdateRequest ur = new UpdateRequest(); : : + for (int i = 0; i < 10; i++) { : : + SolrInputDocument doc2 = new SolrInputDocument(); : : + doc2.addField("id", "" + i); : : + ur.add(doc2); : : + } : : : : - ur.commit(client, COLLECTION_NAME); : : - QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*")); : : - assertEquals(10, rsp.getResults().getNumFound()); : : + ur.commit(client, COLLECTION_NAME); : : + QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*")); : : + assertEquals(10, rsp.getResults().getNumFound()); : : : : + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); : : + JettySolrRunner coordinatorJetty = null; : : + try { : : + coordinatorJetty = cluster.startJettySolrRunner(); : : + } finally { : : + System.clearProperty(NodeRoles.NODE_ROLES_PROP); : : + } : : + QueryResponse rslt = : : + new QueryRequest(new SolrQuery("*:*")) : : + .setPreferredNodes(List.of(coordinatorJetty.getNodeName())) : : + .process(client, COLLECTION_NAME); : : + : : + assertEquals(10, rslt.getResults().size()); : : + : : + DocCollection collection = : : + cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION); : : + assertNotNull(collection); : : + : : + Set<String> expectedNodes = new HashSet<>(); : : + expectedNodes.add(coordinatorJetty.getNodeName()); : : + collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName())); : : + assertTrue(expectedNodes.isEmpty()); : : + } finally { : : + cluster.shutdown(); : : + } : : + } : : + : : + public void testNRTRestart() throws Exception { : : + // we restart jetty and expect to find on disk data - need a local fs directory : : + useFactory(null); : : + String COLL = "coordinator_test_coll"; : : + MiniSolrCloudCluster cluster = : : + configureCluster(3) : : + .withJettyConfig(jetty -> jetty.enableV2(true)) : : + .addConfig("conf", configset("conf2")) : : + .configure(); : : System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); : : - JettySolrRunner coordinatorJetty = null; : : + JettySolrRunner qaJetty = cluster.startJettySolrRunner(); : : + String qaJettyBase = qaJetty.getBaseUrl().toString(); : : + System.clearProperty(NodeRoles.NODE_ROLES_PROP); : : + ExecutorService executor = : : + ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("manipulateJetty")); : : try { : : - coordinatorJetty = cluster.startJettySolrRunner(); : : + CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0, 1) : : + .process(cluster.getSolrClient()); : : + cluster.waitForActiveCollection(COLL, 1, 2); : : + DocCollection docColl = : : + cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL); : : + Replica nrtReplica = docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0); : : + assertNotNull(nrtReplica); : : + String nrtCore = nrtReplica.getCoreName(); : : + Replica pullReplica = docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0); : : + assertNotNull(pullReplica); : : + String pullCore = pullReplica.getCoreName(); : : + : : + SolrInputDocument sid = new SolrInputDocument(); : : + sid.addField("id", "123"); : : + sid.addField("desc_s", "A Document"); : : + JettySolrRunner nrtJetty = null; : : + JettySolrRunner pullJetty = null; : : + for (JettySolrRunner j : cluster.getJettySolrRunners()) { : : + String nodeName = j.getNodeName(); : : + if (nodeName.equals(nrtReplica.getNodeName())) { : : + nrtJetty = j; : : + } else if (nodeName.equals(pullReplica.getNodeName())) { : : + pullJetty = j; : : + } : : + } : : + assertNotNull(nrtJetty); : : + assertNotNull(pullJetty); : : + try (HttpSolrClient client = (HttpSolrClient) pullJetty.newClient()) { : : + client.add(COLL, sid); : : + client.commit(COLL); : : + assertEquals( : : + nrtCore, : : + getHostCoreName( : : + COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:NRT"))); : : + assertEquals( : : + pullCore, : : + getHostCoreName( : : + COLL, qaJettyBase, client, p -> p.add("shards.preference", "replica.type:PULL"))); : : + // Now , kill NRT jetty : : + JettySolrRunner nrtJettyF = nrtJetty; : : + JettySolrRunner pullJettyF = pullJetty; : : + Random r = random(); : : + final long establishBaselineMs = r.nextInt(1000); : : + final long nrtDowntimeMs = r.nextInt(10000); : : + // NOTE: for `pullServiceTimeMs`, it can't be super-short. This is just to simplify our : : + // indexing code, : : + // based on the fact that our indexing is based on a PULL-node client. : : + final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000); : : + Future<?> jettyManipulationFuture = : : + executor.submit( : : + () -> { : : + // we manipulate the jetty instances in a separate thread to more closely mimic : : + // the behavior we'd : : + // see irl. : : + try { : : + Thread.sleep(establishBaselineMs); : : + log.info("stopping NRT jetty ..."); : : + nrtJettyF.stop(); : : + log.info("NRT jetty stopped."); : : + Thread.sleep(nrtDowntimeMs); // let NRT be down for a while : : + log.info("restarting NRT jetty ..."); : : + nrtJettyF.start(true); : : + log.info("NRT jetty restarted."); : : + // once NRT is back up, we expect PULL to continue serving until the TTL on ZK : : + // state : : + // used for query request routing has expired (60s). But here we force a return : : + // to NRT : : + // by stopping the PULL replica after a brief delay ... : : + Thread.sleep(pullServiceTimeMs); : : + log.info("stopping PULL jetty ..."); : : + pullJettyF.stop(); : : + log.info("PULL jetty stopped."); : : + } catch (Exception e) { : : + throw new RuntimeException(e); : : + } : : + }); : : + String hostCore; : : + long start = new Date().getTime(); : : + long individualRequestStart = start; : : + int count = 0; : : + while (nrtCore.equals( : : + hostCore = : : + getHostCoreName( : : + COLL, : : + qaJettyBase, : : + client, : : + p -> p.add("shards.preference", "replica.type:NRT")))) { : : + count++; : : + individualRequestStart = new Date().getTime(); : : + } : : + long now = new Date().getTime(); : : + log.info( : : + "phase1 NRT queries count={}, overall_duration={}, baseline_expected_overall_duration={}, switch-to-pull_duration={}", : : + count, : : + now - start, : : + establishBaselineMs, : : + now - individualRequestStart); : : + // default tolerance of 500ms below should suffice. Failover to PULL for this case should be : : + // very fast, : : + // because our QA-based client already knows both replicas are active, the index is stable, : : + // so the moment : : + // the client finds NRT is down it should be able to failover immediately and transparently : : + // to PULL. : : + assertEquals( : : + "when we break out of the NRT query loop, should be b/c routed to PULL", : : + pullCore, : : + hostCore); : : + SolrInputDocument d = new SolrInputDocument(); : : + d.addField("id", "345"); : : + d.addField("desc_s", "Another Document"); : : + // attempts to add another doc while NRT is down should fail, then eventually succeed when : : + // NRT comes back up : : + count = 0; : : + start = new Date().getTime(); : : + individualRequestStart = start; : : + for (; ; ) { : : + try { : : + client.add(COLL, d); : : + client.commit(COLL); : : + break; : : + } catch (SolrException ex) { : : + // we expect these until nrtJetty is back up. : : + count++; : : + Thread.sleep(100); : : + } : : + individualRequestStart = new Date().getTime(); : : + } : : + now = new Date().getTime(); : : + log.info( : : + "successfully added another doc; duration: {}, overall_duration={}, baseline_expected_overall_duration={}, exception_count={}", : : + now - individualRequestStart, : : + now - start, : : + nrtDowntimeMs, : : + count); : : + // NRT replica is back up, registered as available with Zk, and availability info has been : : + // pulled down by : : + // our PULL-replica-based `client`, forwarded indexing command to NRT, index/commit : : + // completed. All of this : : + // accounts for the 3000ms tolerance allowed for below. This is not a strict value, and if : : + // it causes failures : : + // regularly we should feel free to increase the tolerance; but it's meant to provide a : : + // stable baseline from : : + // which to detect regressions. : : + count = 0; : : + start = new Date().getTime(); : : + individualRequestStart = start; : : + while (pullCore.equals( : : + hostCore = : : + getHostCoreName( : : + COLL, : : + qaJettyBase, : : + client, : : + p -> { : : + p.set(CommonParams.Q, "id:345"); : : + p.add("shards.preference", "replica.type:NRT"); : : + }))) { : : + count++; : : + Thread.sleep(100); : : + individualRequestStart = new Date().getTime(); : : + } : : + now = new Date().getTime(); : : + log.info( : : + "query retries between NRT index-ready and query-ready: {}; overall_duration={}; baseline_expected_overall_duration={}; failover-request_duration={}", : : + count, : : + now - start, : : + pullServiceTimeMs, : : + now - individualRequestStart); : : + assertEquals(nrtCore, hostCore); : : + // allow any exceptions to propagate : : + jettyManipulationFuture.get(); : : + if (true) return; : : + : : + // next phase: just toggle a bunch : : + // TODO: could separate this out into a different test method, but this should suffice for : : + // now : : + pullJetty.start(true); : : + AtomicBoolean done = new AtomicBoolean(); : : + long runMinutes = 1; : : + long finishTimeMs = : : + new Date().getTime() + TimeUnit.MILLISECONDS.convert(runMinutes, TimeUnit.MINUTES); : : + JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF, pullJettyF}; : : + Random threadRandom = new Random(r.nextInt()); : : + Future<Integer> f = : : + executor.submit( : : + () -> { : : + int iteration = 0; : : + while (new Date().getTime() < finishTimeMs && !done.get()) { : : + int idx = iteration++ % jettys.length; : : + JettySolrRunner toManipulate = jettys[idx]; : : + try { : : + int serveTogetherTime = threadRandom.nextInt(7000); : : + int downTime = threadRandom.nextInt(7000); : : + log.info("serving together for {}ms", serveTogetherTime); : : + Thread.sleep(serveTogetherTime); : : + log.info("stopping {} ...", idx); : : + toManipulate.stop(); : : + log.info("stopped {}.", idx); : : + Thread.sleep(downTime); : : + log.info("restarting {} ...", idx); : : + toManipulate.start(true); : : + log.info("restarted {}.", idx); : : + } catch (Exception e) { : : + throw new RuntimeException(e); : : + } : : + } : : + done.set(true); : : + return iteration; : : + }); : : + count = 0; : : + start = new Date().getTime(); : : + try { : : + do { : : + pullCore.equals( : : + hostCore = : : + getHostCoreName( : : + COLL, : : + qaJettyBase, : : + client, : : + p -> { : : + p.set(CommonParams.Q, "id:345"); : : + p.add("shards.preference", "replica.type:NRT"); : : + })); : : + count++; : : + Thread.sleep(100); : : + } while (!done.get()); : : + } finally { : : + final String result; : : + if (done.getAndSet(true)) { : : + result = "Success"; : : + } else { : : + // not yet set to done, completed abnormally (exception will be thrown beyond `finally` : : + // block) : : + result = "Failure"; : : + } : : + Integer toggleCount = f.get(); : : + long secondsDuration = : : + TimeUnit.SECONDS.convert(new Date().getTime() - start, TimeUnit.MILLISECONDS); : : + log.info( : : + "{}! {} seconds, {} toggles, {} requests served", : : + result, : : + secondsDuration, : : + toggleCount, : : + count); : : + } : : + } : : } finally { : : - System.clearProperty(NodeRoles.NODE_ROLES_PROP); : : + try { : : + ExecutorUtil.shutdownAndAwaitTermination(executor); : : + } finally { : : + cluster.shutdown(); : : + } : : } : : - QueryResponse rslt = : : - new QueryRequest(new SolrQuery("*:*")) : : - .setPreferredNodes(List.of(coordinatorJetty.getNodeName())) : : - .process(client, COLLECTION_NAME); : : - : : - assertEquals(10, rslt.getResults().size()); : : + } : : : : - DocCollection collection = : : - cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION); : : - assertNotNull(collection); : : + @SuppressWarnings("rawtypes") : : + private String getHostCoreName( : : + String COLL, String qaNode, HttpSolrClient solrClient, Consumer<SolrQuery> p) : : + throws Exception { : : : : - Set<String> expectedNodes = new HashSet<>(); : : - expectedNodes.add(coordinatorJetty.getNodeName()); : : - collection.forEachReplica((s, replica) -> expectedNodes.remove(replica.getNodeName())); : : - assertTrue(expectedNodes.isEmpty()); : : + boolean found = false; : : + SolrQuery q = new SolrQuery("*:*"); : : + q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER, TRUE); : : + p.accept(q); : : + StringBuilder sb = : : + new StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin"); : : + q.forEach(e -> sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0])); : : + SolrDocumentList docs = null; : : + for (int i = 0; i < 100; i++) { : : + try { : : + SimpleOrderedMap rsp = : : + (SimpleOrderedMap) : : + Utils.executeGET(solrClient.getHttpClient(), sb.toString(), Utils.JAVABINCONSUMER); : : + docs = (SolrDocumentList) rsp.get("response"); : : + if (docs.size() > 0) { : : + found = true; : : + break; : : + } : : + } catch (SolrException ex) { : : + // we know we're doing tricky things that might cause transient errors : : + // TODO: all these query requests go to the QA node -- should QA propagate internal request : : + // errors : : + // to the external client (and the external client retry?) or should QA attempt to failover : : + // transparently : : + // in the event of an error? : : + if (i < 5) { : : + log.info("swallowing transient error", ex); : : + } else { : : + log.error("only expect actual _errors_ within a small window (e.g. 500ms)", ex); : : + fail("initial error time threshold exceeded"); : : + } : : + } : : + Thread.sleep(100); : : + } : : + assertTrue(found); : : + return (String) docs.get(0).getFieldValue("_core_"); : : } : : } : : : : : : -Hoss : http://www.lucidworks.com/ : -Hoss http://www.lucidworks.com/ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
