Noble: TestCoordinatorRole.testNRTRestart is breaking on jenkins on 9x a 
ridiculous number of times since you added it a week ago.

IIUC this test has *NEVER* passed on a jenkins 9x build (only on the main 
builds)

-Hoss


: Date: Thu, 12 Jan 2023 07:54:33 +0000
: From: no...@apache.org
: Reply-To: dev@solr.apache.org
: To: "comm...@solr.apache.org" <comm...@solr.apache.org>
: Subject: [solr] branch branch_9x updated: test case added for coordinator role
: 
: This is an automated email from the ASF dual-hosted git repository.
: 
: noble pushed a commit to branch branch_9x
: in repository https://gitbox.apache.org/repos/asf/solr.git
: 
: 
: The following commit(s) were added to refs/heads/branch_9x by this push:
:      new ec9b152c31f test case added for coordinator role
: ec9b152c31f is described below
: 
: commit ec9b152c31fac99fe190ccc98e754c1200bd9fd2
: Author: Noble Paul <noble.p...@gmail.com>
: AuthorDate: Thu Jan 12 18:54:15 2023 +1100
: 
:     test case added for coordinator role
: ---
:  .../apache/solr/search/TestCoordinatorRole.java    | 412 
+++++++++++++++++++--
:  1 file changed, 375 insertions(+), 37 deletions(-)
: 
: diff --git 
a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java 
b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: index 5e2dcfb70a8..6c4e845cf5a 100644
: --- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: +++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
: @@ -17,69 +17,407 @@
:  
:  package org.apache.solr.search;
:  
: +import static org.apache.solr.common.params.CommonParams.OMIT_HEADER;
: +import static org.apache.solr.common.params.CommonParams.TRUE;
: +
: +import java.lang.invoke.MethodHandles;
: +import java.util.Date;
: +import java.util.EnumSet;
:  import java.util.HashSet;
:  import java.util.List;
: +import java.util.Random;
:  import java.util.Set;
: +import java.util.concurrent.ExecutorService;
: +import java.util.concurrent.Future;
: +import java.util.concurrent.TimeUnit;
: +import java.util.concurrent.atomic.AtomicBoolean;
: +import java.util.function.Consumer;
:  import org.apache.solr.client.solrj.SolrQuery;
:  import org.apache.solr.client.solrj.impl.CloudSolrClient;
: +import org.apache.solr.client.solrj.impl.HttpSolrClient;
:  import org.apache.solr.client.solrj.request.CollectionAdminRequest;
:  import org.apache.solr.client.solrj.request.QueryRequest;
:  import org.apache.solr.client.solrj.request.UpdateRequest;
:  import org.apache.solr.client.solrj.response.QueryResponse;
: +import org.apache.solr.cloud.MiniSolrCloudCluster;
:  import org.apache.solr.cloud.SolrCloudTestCase;
: +import org.apache.solr.common.SolrDocumentList;
: +import org.apache.solr.common.SolrException;
:  import org.apache.solr.common.SolrInputDocument;
:  import org.apache.solr.common.cloud.DocCollection;
: +import org.apache.solr.common.cloud.Replica;
: +import org.apache.solr.common.params.CommonParams;
: +import org.apache.solr.common.util.ExecutorUtil;
: +import org.apache.solr.common.util.SimpleOrderedMap;
: +import org.apache.solr.common.util.SolrNamedThreadFactory;
: +import org.apache.solr.common.util.Utils;
:  import org.apache.solr.core.NodeRoles;
:  import org.apache.solr.embedded.JettySolrRunner;
:  import org.apache.solr.servlet.CoordinatorHttpSolrCall;
: -import org.junit.BeforeClass;
: +import org.slf4j.Logger;
: +import org.slf4j.LoggerFactory;
:  
:  public class TestCoordinatorRole extends SolrCloudTestCase {
: -
: -  @BeforeClass
: -  public static void setupCluster() throws Exception {
: -    configureCluster(4).addConfig("conf", 
configset("cloud-minimal")).configure();
: -  }
: +  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
:  
:    public void testSimple() throws Exception {
: -    CloudSolrClient client = cluster.getSolrClient();
: -    String COLLECTION_NAME = "test_coll";
: -    String SYNTHETIC_COLLECTION = 
CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
: -    CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
: -        .process(cluster.getSolrClient());
: -    cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
: -    UpdateRequest ur = new UpdateRequest();
: -    for (int i = 0; i < 10; i++) {
: -      SolrInputDocument doc2 = new SolrInputDocument();
: -      doc2.addField("id", "" + i);
: -      ur.add(doc2);
: -    }
: +    MiniSolrCloudCluster cluster =
: +        configureCluster(4).addConfig("conf", 
configset("cloud-minimal")).configure();
: +    try {
: +      CloudSolrClient client = cluster.getSolrClient();
: +      String COLLECTION_NAME = "test_coll";
: +      String SYNTHETIC_COLLECTION = 
CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
: +      CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
: +          .process(cluster.getSolrClient());
: +      cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
: +      UpdateRequest ur = new UpdateRequest();
: +      for (int i = 0; i < 10; i++) {
: +        SolrInputDocument doc2 = new SolrInputDocument();
: +        doc2.addField("id", "" + i);
: +        ur.add(doc2);
: +      }
:  
: -    ur.commit(client, COLLECTION_NAME);
: -    QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*"));
: -    assertEquals(10, rsp.getResults().getNumFound());
: +      ur.commit(client, COLLECTION_NAME);
: +      QueryResponse rsp = client.query(COLLECTION_NAME, new 
SolrQuery("*:*"));
: +      assertEquals(10, rsp.getResults().getNumFound());
:  
: +      System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
: +      JettySolrRunner coordinatorJetty = null;
: +      try {
: +        coordinatorJetty = cluster.startJettySolrRunner();
: +      } finally {
: +        System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: +      }
: +      QueryResponse rslt =
: +          new QueryRequest(new SolrQuery("*:*"))
: +              .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
: +              .process(client, COLLECTION_NAME);
: +
: +      assertEquals(10, rslt.getResults().size());
: +
: +      DocCollection collection =
: +          
cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
: +      assertNotNull(collection);
: +
: +      Set<String> expectedNodes = new HashSet<>();
: +      expectedNodes.add(coordinatorJetty.getNodeName());
: +      collection.forEachReplica((s, replica) -> 
expectedNodes.remove(replica.getNodeName()));
: +      assertTrue(expectedNodes.isEmpty());
: +    } finally {
: +      cluster.shutdown();
: +    }
: +  }
: +
: +  public void testNRTRestart() throws Exception {
: +    // we restart jetty and expect to find on disk data - need a local fs 
directory
: +    useFactory(null);
: +    String COLL = "coordinator_test_coll";
: +    MiniSolrCloudCluster cluster =
: +        configureCluster(3)
: +            .withJettyConfig(jetty -> jetty.enableV2(true))
: +            .addConfig("conf", configset("conf2"))
: +            .configure();
:      System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
: -    JettySolrRunner coordinatorJetty = null;
: +    JettySolrRunner qaJetty = cluster.startJettySolrRunner();
: +    String qaJettyBase = qaJetty.getBaseUrl().toString();
: +    System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: +    ExecutorService executor =
: +        ExecutorUtil.newMDCAwareSingleThreadExecutor(new 
SolrNamedThreadFactory("manipulateJetty"));
:      try {
: -      coordinatorJetty = cluster.startJettySolrRunner();
: +      CollectionAdminRequest.createCollection(COLL, "conf", 1, 1, 0, 1)
: +          .process(cluster.getSolrClient());
: +      cluster.waitForActiveCollection(COLL, 1, 2);
: +      DocCollection docColl =
: +          
cluster.getSolrClient().getClusterStateProvider().getClusterState().getCollection(COLL);
: +      Replica nrtReplica = 
docColl.getReplicas(EnumSet.of(Replica.Type.NRT)).get(0);
: +      assertNotNull(nrtReplica);
: +      String nrtCore = nrtReplica.getCoreName();
: +      Replica pullReplica = 
docColl.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
: +      assertNotNull(pullReplica);
: +      String pullCore = pullReplica.getCoreName();
: +
: +      SolrInputDocument sid = new SolrInputDocument();
: +      sid.addField("id", "123");
: +      sid.addField("desc_s", "A Document");
: +      JettySolrRunner nrtJetty = null;
: +      JettySolrRunner pullJetty = null;
: +      for (JettySolrRunner j : cluster.getJettySolrRunners()) {
: +        String nodeName = j.getNodeName();
: +        if (nodeName.equals(nrtReplica.getNodeName())) {
: +          nrtJetty = j;
: +        } else if (nodeName.equals(pullReplica.getNodeName())) {
: +          pullJetty = j;
: +        }
: +      }
: +      assertNotNull(nrtJetty);
: +      assertNotNull(pullJetty);
: +      try (HttpSolrClient client = (HttpSolrClient) pullJetty.newClient()) {
: +        client.add(COLL, sid);
: +        client.commit(COLL);
: +        assertEquals(
: +            nrtCore,
: +            getHostCoreName(
: +                COLL, qaJettyBase, client, p -> p.add("shards.preference", 
"replica.type:NRT")));
: +        assertEquals(
: +            pullCore,
: +            getHostCoreName(
: +                COLL, qaJettyBase, client, p -> p.add("shards.preference", 
"replica.type:PULL")));
: +        // Now , kill NRT jetty
: +        JettySolrRunner nrtJettyF = nrtJetty;
: +        JettySolrRunner pullJettyF = pullJetty;
: +        Random r = random();
: +        final long establishBaselineMs = r.nextInt(1000);
: +        final long nrtDowntimeMs = r.nextInt(10000);
: +        // NOTE: for `pullServiceTimeMs`, it can't be super-short. This is 
just to simplify our
: +        // indexing code,
: +        // based on the fact that our indexing is based on a PULL-node 
client.
: +        final long pullServiceTimeMs = 1000 + (long) r.nextInt(9000);
: +        Future<?> jettyManipulationFuture =
: +            executor.submit(
: +                () -> {
: +                  // we manipulate the jetty instances in a separate thread 
to more closely mimic
: +                  // the behavior we'd
: +                  // see irl.
: +                  try {
: +                    Thread.sleep(establishBaselineMs);
: +                    log.info("stopping NRT jetty ...");
: +                    nrtJettyF.stop();
: +                    log.info("NRT jetty stopped.");
: +                    Thread.sleep(nrtDowntimeMs); // let NRT be down for a 
while
: +                    log.info("restarting NRT jetty ...");
: +                    nrtJettyF.start(true);
: +                    log.info("NRT jetty restarted.");
: +                    // once NRT is back up, we expect PULL to continue 
serving until the TTL on ZK
: +                    // state
: +                    // used for query request routing has expired (60s). But 
here we force a return
: +                    // to NRT
: +                    // by stopping the PULL replica after a brief delay ...
: +                    Thread.sleep(pullServiceTimeMs);
: +                    log.info("stopping PULL jetty ...");
: +                    pullJettyF.stop();
: +                    log.info("PULL jetty stopped.");
: +                  } catch (Exception e) {
: +                    throw new RuntimeException(e);
: +                  }
: +                });
: +        String hostCore;
: +        long start = new Date().getTime();
: +        long individualRequestStart = start;
: +        int count = 0;
: +        while (nrtCore.equals(
: +            hostCore =
: +                getHostCoreName(
: +                    COLL,
: +                    qaJettyBase,
: +                    client,
: +                    p -> p.add("shards.preference", "replica.type:NRT")))) {
: +          count++;
: +          individualRequestStart = new Date().getTime();
: +        }
: +        long now = new Date().getTime();
: +        log.info(
: +            "phase1 NRT queries count={}, overall_duration={}, 
baseline_expected_overall_duration={}, switch-to-pull_duration={}",
: +            count,
: +            now - start,
: +            establishBaselineMs,
: +            now - individualRequestStart);
: +        // default tolerance of 500ms below should suffice. Failover to PULL 
for this case should be
: +        // very fast,
: +        // because our QA-based client already knows both replicas are 
active, the index is stable,
: +        // so the moment
: +        // the client finds NRT is down it should be able to failover 
immediately and transparently
: +        // to PULL.
: +        assertEquals(
: +            "when we break out of the NRT query loop, should be b/c routed 
to PULL",
: +            pullCore,
: +            hostCore);
: +        SolrInputDocument d = new SolrInputDocument();
: +        d.addField("id", "345");
: +        d.addField("desc_s", "Another Document");
: +        // attempts to add another doc while NRT is down should fail, then 
eventually succeed when
: +        // NRT comes back up
: +        count = 0;
: +        start = new Date().getTime();
: +        individualRequestStart = start;
: +        for (; ; ) {
: +          try {
: +            client.add(COLL, d);
: +            client.commit(COLL);
: +            break;
: +          } catch (SolrException ex) {
: +            // we expect these until nrtJetty is back up.
: +            count++;
: +            Thread.sleep(100);
: +          }
: +          individualRequestStart = new Date().getTime();
: +        }
: +        now = new Date().getTime();
: +        log.info(
: +            "successfully added another doc; duration: {}, 
overall_duration={}, baseline_expected_overall_duration={}, exception_count={}",
: +            now - individualRequestStart,
: +            now - start,
: +            nrtDowntimeMs,
: +            count);
: +        // NRT replica is back up, registered as available with Zk, and 
availability info has been
: +        // pulled down by
: +        // our PULL-replica-based `client`, forwarded indexing command to 
NRT, index/commit
: +        // completed. All of this
: +        // accounts for the 3000ms tolerance allowed for below. This is not 
a strict value, and if
: +        // it causes failures
: +        // regularly we should feel free to increase the tolerance; but it's 
meant to provide a
: +        // stable baseline from
: +        // which to detect regressions.
: +        count = 0;
: +        start = new Date().getTime();
: +        individualRequestStart = start;
: +        while (pullCore.equals(
: +            hostCore =
: +                getHostCoreName(
: +                    COLL,
: +                    qaJettyBase,
: +                    client,
: +                    p -> {
: +                      p.set(CommonParams.Q, "id:345");
: +                      p.add("shards.preference", "replica.type:NRT");
: +                    }))) {
: +          count++;
: +          Thread.sleep(100);
: +          individualRequestStart = new Date().getTime();
: +        }
: +        now = new Date().getTime();
: +        log.info(
: +            "query retries between NRT index-ready and query-ready: {}; 
overall_duration={}; baseline_expected_overall_duration={}; 
failover-request_duration={}",
: +            count,
: +            now - start,
: +            pullServiceTimeMs,
: +            now - individualRequestStart);
: +        assertEquals(nrtCore, hostCore);
: +        // allow any exceptions to propagate
: +        jettyManipulationFuture.get();
: +        if (true) return;
: +
: +        // next phase: just toggle a bunch
: +        // TODO: could separate this out into a different test method, but 
this should suffice for
: +        // now
: +        pullJetty.start(true);
: +        AtomicBoolean done = new AtomicBoolean();
: +        long runMinutes = 1;
: +        long finishTimeMs =
: +            new Date().getTime() + TimeUnit.MILLISECONDS.convert(runMinutes, 
TimeUnit.MINUTES);
: +        JettySolrRunner[] jettys = new JettySolrRunner[] {nrtJettyF, 
pullJettyF};
: +        Random threadRandom = new Random(r.nextInt());
: +        Future<Integer> f =
: +            executor.submit(
: +                () -> {
: +                  int iteration = 0;
: +                  while (new Date().getTime() < finishTimeMs && !done.get()) 
{
: +                    int idx = iteration++ % jettys.length;
: +                    JettySolrRunner toManipulate = jettys[idx];
: +                    try {
: +                      int serveTogetherTime = threadRandom.nextInt(7000);
: +                      int downTime = threadRandom.nextInt(7000);
: +                      log.info("serving together for {}ms", 
serveTogetherTime);
: +                      Thread.sleep(serveTogetherTime);
: +                      log.info("stopping {} ...", idx);
: +                      toManipulate.stop();
: +                      log.info("stopped {}.", idx);
: +                      Thread.sleep(downTime);
: +                      log.info("restarting {} ...", idx);
: +                      toManipulate.start(true);
: +                      log.info("restarted {}.", idx);
: +                    } catch (Exception e) {
: +                      throw new RuntimeException(e);
: +                    }
: +                  }
: +                  done.set(true);
: +                  return iteration;
: +                });
: +        count = 0;
: +        start = new Date().getTime();
: +        try {
: +          do {
: +            pullCore.equals(
: +                hostCore =
: +                    getHostCoreName(
: +                        COLL,
: +                        qaJettyBase,
: +                        client,
: +                        p -> {
: +                          p.set(CommonParams.Q, "id:345");
: +                          p.add("shards.preference", "replica.type:NRT");
: +                        }));
: +            count++;
: +            Thread.sleep(100);
: +          } while (!done.get());
: +        } finally {
: +          final String result;
: +          if (done.getAndSet(true)) {
: +            result = "Success";
: +          } else {
: +            // not yet set to done, completed abnormally (exception will be 
thrown beyond `finally`
: +            // block)
: +            result = "Failure";
: +          }
: +          Integer toggleCount = f.get();
: +          long secondsDuration =
: +              TimeUnit.SECONDS.convert(new Date().getTime() - start, 
TimeUnit.MILLISECONDS);
: +          log.info(
: +              "{}! {} seconds, {} toggles, {} requests served",
: +              result,
: +              secondsDuration,
: +              toggleCount,
: +              count);
: +        }
: +      }
:      } finally {
: -      System.clearProperty(NodeRoles.NODE_ROLES_PROP);
: +      try {
: +        ExecutorUtil.shutdownAndAwaitTermination(executor);
: +      } finally {
: +        cluster.shutdown();
: +      }
:      }
: -    QueryResponse rslt =
: -        new QueryRequest(new SolrQuery("*:*"))
: -            .setPreferredNodes(List.of(coordinatorJetty.getNodeName()))
: -            .process(client, COLLECTION_NAME);
: -
: -    assertEquals(10, rslt.getResults().size());
: +  }
:  
: -    DocCollection collection =
: -        
cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION);
: -    assertNotNull(collection);
: +  @SuppressWarnings("rawtypes")
: +  private String getHostCoreName(
: +      String COLL, String qaNode, HttpSolrClient solrClient, 
Consumer<SolrQuery> p)
: +      throws Exception {
:  
: -    Set<String> expectedNodes = new HashSet<>();
: -    expectedNodes.add(coordinatorJetty.getNodeName());
: -    collection.forEachReplica((s, replica) -> 
expectedNodes.remove(replica.getNodeName()));
: -    assertTrue(expectedNodes.isEmpty());
: +    boolean found = false;
: +    SolrQuery q = new SolrQuery("*:*");
: +    q.add("fl", "id,desc_s,_core_:[core]").add(OMIT_HEADER, TRUE);
: +    p.accept(q);
: +    StringBuilder sb =
: +        new 
StringBuilder(qaNode).append("/").append(COLL).append("/select?wt=javabin");
: +    q.forEach(e -> 
sb.append("&").append(e.getKey()).append("=").append(e.getValue()[0]));
: +    SolrDocumentList docs = null;
: +    for (int i = 0; i < 100; i++) {
: +      try {
: +        SimpleOrderedMap rsp =
: +            (SimpleOrderedMap)
: +                Utils.executeGET(solrClient.getHttpClient(), sb.toString(), 
Utils.JAVABINCONSUMER);
: +        docs = (SolrDocumentList) rsp.get("response");
: +        if (docs.size() > 0) {
: +          found = true;
: +          break;
: +        }
: +      } catch (SolrException ex) {
: +        // we know we're doing tricky things that might cause transient 
errors
: +        // TODO: all these query requests go to the QA node -- should QA 
propagate internal request
: +        // errors
: +        //  to the external client (and the external client retry?) or 
should QA attempt to failover
: +        // transparently
: +        //  in the event of an error?
: +        if (i < 5) {
: +          log.info("swallowing transient error", ex);
: +        } else {
: +          log.error("only expect actual _errors_ within a small window (e.g. 
500ms)", ex);
: +          fail("initial error time threshold exceeded");
: +        }
: +      }
: +      Thread.sleep(100);
: +    }
: +    assertTrue(found);
: +    return (String) docs.get(0).getFieldValue("_core_");
:    }
:  }
: 
: 

-Hoss
http://www.lucidworks.com/

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@solr.apache.org
For additional commands, e-mail: dev-h...@solr.apache.org

Reply via email to