This is an automated email from the ASF dual-hosted git repository. krisden pushed a commit to branch branch_9_0 in repository https://gitbox.apache.org/repos/asf/solr.git
commit 3a56ddf0918d82da179e374ba6498cbff4778a52 Author: Kevin Risden <[email protected]> AuthorDate: Mon May 2 15:05:59 2022 -0400 SOLR-16174: Modernize TestBulkSchemaConcurrent (#829) --- .../solr/schema/TestBulkSchemaConcurrent.java | 289 +++++++++++---------- 1 file changed, 157 insertions(+), 132 deletions(-) diff --git a/solr/core/src/test/org/apache/solr/schema/TestBulkSchemaConcurrent.java b/solr/core/src/test/org/apache/solr/schema/TestBulkSchemaConcurrent.java index 7aaed143e3e..e2d5b5a4257 100644 --- a/solr/core/src/test/org/apache/solr/schema/TestBulkSchemaConcurrent.java +++ b/solr/core/src/test/org/apache/solr/schema/TestBulkSchemaConcurrent.java @@ -27,12 +27,20 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.cloud.AbstractFullDistribZkTestBase; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.common.util.StrUtils; +import org.apache.solr.common.util.TimeSource; import org.apache.solr.common.util.Utils; import org.apache.solr.util.RestTestHarness; +import org.apache.solr.util.TimeOut; +import org.junit.After; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.slf4j.Logger; @@ -41,6 +49,9 @@ import org.slf4j.LoggerFactory; public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final long TIMEOUT = TEST_NIGHTLY ? 10 : 1; // in seconds + private static final int THREAD_COUNT = TEST_NIGHTLY ? 5 : 2; + @BeforeClass public static void initSysProperties() { System.setProperty("managed.schema.mutable", "true"); @@ -51,43 +62,56 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase { return "solrconfig-managed-schema.xml"; } + @Before + public void setupTest() { + setupRestTestHarnesses(); + } + + @After + public void teardownTest() throws Exception { + closeRestTestHarnesses(); + } + @Test - @SuppressWarnings({"unchecked"}) public void test() throws Exception { + final List<List<String>> collectErrors = Collections.synchronizedList(new ArrayList<>()); - final int threadCount = 5; - setupRestTestHarnesses(); - Thread[] threads = new Thread[threadCount]; - @SuppressWarnings({"rawtypes"}) - final List<List> collectErrors = Collections.synchronizedList(new ArrayList<>()); + final ExecutorService executorService = + ExecutorUtil.newMDCAwareFixedThreadPool( + THREAD_COUNT, new SolrNamedThreadFactory(this.getClass().getSimpleName())); - for (int i = 0; i < threadCount; i++) { + List<Callable<Void>> callees = new ArrayList<>(THREAD_COUNT); + for (int i = 0; i < THREAD_COUNT; i++) { final int finalI = i; - threads[i] = - new Thread() { - @Override - public void run() { - @SuppressWarnings({"rawtypes"}) - ArrayList errs = new ArrayList(); - collectErrors.add(errs); - try { - invokeBulkAddCall(finalI, errs); - invokeBulkReplaceCall(finalI, errs); - invokeBulkDeleteCall(finalI, errs); - } catch (Exception e) { - e.printStackTrace(); - } + Callable<Void> call = + () -> { + List<String> errs = new ArrayList<>(); + collectErrors.add(errs); + try { + invokeBulkAddCall(finalI, errs); + invokeBulkReplaceCall(finalI, errs); + invokeBulkDeleteCall(finalI, errs); + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + // TODO this might be double logged, but safer to log here anyway + log.error("Exception from thread {}", finalI, e); } + return null; }; - - threads[i].start(); + callees.add(call); } - for (Thread thread : threads) thread.join(); + executorService.invokeAll(callees); + executorService.shutdown(); + + // TIMEOUT * 3 there are 3 tests - add, replace, delete each running for the length of TIMEOUT + assertTrue( + "Running for too long...", executorService.awaitTermination(TIMEOUT * 3, TimeUnit.SECONDS)); boolean success = true; - for (@SuppressWarnings({"rawtypes"}) List e : collectErrors) { + for (List<String> e : collectErrors) { if (e != null && !e.isEmpty()) { success = false; log.error("{}", e); @@ -98,7 +122,7 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase { } @SuppressWarnings({"unchecked"}) - private void invokeBulkAddCall(int seed, ArrayList<String> errs) throws Exception { + private void invokeBulkAddCall(int seed, List<String> errs) throws Exception { String payload = "{\n" + " 'add-field' : {\n" @@ -133,10 +157,10 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase { payload = payload.replace("replaceDynamicCopyFieldDest", dynamicCopyFldDest); payload = payload.replace("myNewFieldTypeName", newFieldTypeName); + // don't close publisher - gets closed at teardown RestTestHarness publisher = randomRestTestHarness(r); String response = publisher.post("/schema", SolrTestCaseJ4.json(payload)); - @SuppressWarnings({"rawtypes"}) - Map map = (Map) Utils.fromJSONString(response); + Map<String, Object> map = (Map<String, Object>) Utils.fromJSONString(response); Object errors = map.get("errors"); if (errors != null) { errs.add(new String(Utils.toJSON(errors), StandardCharsets.UTF_8)); @@ -145,38 +169,38 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase { // get another node Set<String> errmessages = new HashSet<>(); + // don't close harness - gets closed at teardown RestTestHarness harness = randomRestTestHarness(r); - try { - long startTime = System.nanoTime(); - long maxTimeoutMillis = 100000; - while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) - < maxTimeoutMillis) { - errmessages.clear(); - @SuppressWarnings({"rawtypes"}) - Map m = getObj(harness, aField, "fields"); - if (m == null) errmessages.add(StrUtils.formatString("field {0} not created", aField)); - - m = getObj(harness, dynamicFldName, "dynamicFields"); - if (m == null) - errmessages.add(StrUtils.formatString("dynamic field {0} not created", dynamicFldName)); - - @SuppressWarnings({"rawtypes"}) - List l = getSourceCopyFields(harness, aField); - if (!checkCopyField(l, aField, dynamicCopyFldDest)) - errmessages.add( - StrUtils.formatString( - "CopyField source={0},dest={1} not created", aField, dynamicCopyFldDest)); - - m = getObj(harness, newFieldTypeName, "fieldTypes"); - if (m == null) - errmessages.add(StrUtils.formatString("new type {0} not created", newFieldTypeName)); - - if (errmessages.isEmpty()) break; - - Thread.sleep(10); + TimeOut timeout = new TimeOut(TIMEOUT, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while (!timeout.hasTimedOut()) { + errmessages.clear(); + Map<?, ?> m = getObj(harness, aField, "fields"); + if (m == null) { + errmessages.add(StrUtils.formatString("field {0} not created", aField)); } - } finally { - harness.close(); + + m = getObj(harness, dynamicFldName, "dynamicFields"); + if (m == null) { + errmessages.add(StrUtils.formatString("dynamic field {0} not created", dynamicFldName)); + } + + List<Map<String, String>> l = getSourceCopyFields(harness, aField); + if (!checkCopyField(l, aField, dynamicCopyFldDest)) { + errmessages.add( + StrUtils.formatString( + "CopyField source={0},dest={1} not created", aField, dynamicCopyFldDest)); + } + + m = getObj(harness, newFieldTypeName, "fieldTypes"); + if (m == null) { + errmessages.add(StrUtils.formatString("new type {0} not created", newFieldTypeName)); + } + + if (errmessages.isEmpty()) { + break; + } + + timeout.sleep(10); } if (!errmessages.isEmpty()) { errs.addAll(errmessages); @@ -184,7 +208,7 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase { } @SuppressWarnings({"unchecked"}) - private void invokeBulkReplaceCall(int seed, ArrayList<String> errs) throws Exception { + private void invokeBulkReplaceCall(int seed, List<String> errs) throws Exception { String payload = "{\n" + " 'replace-field' : {\n" @@ -213,10 +237,10 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase { payload = payload.replace("replaceDynamicField", dynamicFldName); payload = payload.replace("myNewFieldTypeName", newFieldTypeName); + // don't close publisher - gets closed at teardown RestTestHarness publisher = randomRestTestHarness(r); String response = publisher.post("/schema", SolrTestCaseJ4.json(payload)); - @SuppressWarnings({"rawtypes"}) - Map map = (Map) Utils.fromJSONString(response); + Map<String, Object> map = (Map<String, Object>) Utils.fromJSONString(response); Object errors = map.get("errors"); if (errors != null) { errs.add(new String(Utils.toJSON(errors), StandardCharsets.UTF_8)); @@ -225,41 +249,39 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase { // get another node Set<String> errmessages = new HashSet<>(); + // don't close harness - gets closed at teardown RestTestHarness harness = randomRestTestHarness(r); - try { - long startTime = System.nanoTime(); - long maxTimeoutMillis = 100000; - while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) - < maxTimeoutMillis) { - errmessages.clear(); - @SuppressWarnings({"rawtypes"}) - Map m = getObj(harness, aField, "fields"); - if (m == null) - errmessages.add(StrUtils.formatString("field {0} no longer present", aField)); - - m = getObj(harness, dynamicFldName, "dynamicFields"); - if (m == null) - errmessages.add( - StrUtils.formatString("dynamic field {0} no longer present", dynamicFldName)); - - @SuppressWarnings({"rawtypes"}) - List l = getSourceCopyFields(harness, aField); - if (!checkCopyField(l, aField, dynamicCopyFldDest)) - errmessages.add( - StrUtils.formatString( - "CopyField source={0},dest={1} no longer present", aField, dynamicCopyFldDest)); - - m = getObj(harness, newFieldTypeName, "fieldTypes"); - if (m == null) - errmessages.add( - StrUtils.formatString("new type {0} no longer present", newFieldTypeName)); - - if (errmessages.isEmpty()) break; - - Thread.sleep(10); + TimeOut timeout = new TimeOut(TIMEOUT, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while (!timeout.hasTimedOut()) { + errmessages.clear(); + Map<?, ?> m = getObj(harness, aField, "fields"); + if (m == null) { + errmessages.add(StrUtils.formatString("field {0} no longer present", aField)); } - } finally { - harness.close(); + + m = getObj(harness, dynamicFldName, "dynamicFields"); + if (m == null) { + errmessages.add( + StrUtils.formatString("dynamic field {0} no longer present", dynamicFldName)); + } + + List<Map<String, String>> l = getSourceCopyFields(harness, aField); + if (!checkCopyField(l, aField, dynamicCopyFldDest)) { + errmessages.add( + StrUtils.formatString( + "CopyField source={0},dest={1} no longer present", aField, dynamicCopyFldDest)); + } + + m = getObj(harness, newFieldTypeName, "fieldTypes"); + if (m == null) { + errmessages.add(StrUtils.formatString("new type {0} no longer present", newFieldTypeName)); + } + + if (errmessages.isEmpty()) { + break; + } + + timeout.sleep(10); } if (!errmessages.isEmpty()) { errs.addAll(errmessages); @@ -267,7 +289,7 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase { } @SuppressWarnings({"unchecked"}) - private void invokeBulkDeleteCall(int seed, ArrayList<String> errs) throws Exception { + private void invokeBulkDeleteCall(int seed, List<String> errs) throws Exception { String payload = "{\n" + " 'delete-copy-field' : {\n" @@ -288,10 +310,10 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase { payload = payload.replace("replaceDynamicCopyFieldDest", dynamicCopyFldDest); payload = payload.replace("myNewFieldTypeName", newFieldTypeName); + // don't close publisher - gets closed at teardown RestTestHarness publisher = randomRestTestHarness(r); String response = publisher.post("/schema", SolrTestCaseJ4.json(payload)); - @SuppressWarnings({"rawtypes"}) - Map map = (Map) Utils.fromJSONString(response); + Map<String, Object> map = (Map<String, Object>) Utils.fromJSONString(response); Object errors = map.get("errors"); if (errors != null) { errs.add(new String(Utils.toJSON(errors), StandardCharsets.UTF_8)); @@ -300,49 +322,52 @@ public class TestBulkSchemaConcurrent extends AbstractFullDistribZkTestBase { // get another node Set<String> errmessages = new HashSet<>(); + // don't close harness - gets closed at teardown RestTestHarness harness = randomRestTestHarness(r); - try { - long startTime = System.nanoTime(); - long maxTimeoutMillis = 100000; - while (TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS) - < maxTimeoutMillis) { - errmessages.clear(); - @SuppressWarnings({"rawtypes"}) - Map m = getObj(harness, aField, "fields"); - if (m != null) errmessages.add(StrUtils.formatString("field {0} still exists", aField)); - - m = getObj(harness, dynamicFldName, "dynamicFields"); - if (m != null) - errmessages.add(StrUtils.formatString("dynamic field {0} still exists", dynamicFldName)); - - @SuppressWarnings({"rawtypes"}) - List l = getSourceCopyFields(harness, aField); - if (checkCopyField(l, aField, dynamicCopyFldDest)) - errmessages.add( - StrUtils.formatString( - "CopyField source={0},dest={1} still exists", aField, dynamicCopyFldDest)); - - m = getObj(harness, newFieldTypeName, "fieldTypes"); - if (m != null) - errmessages.add(StrUtils.formatString("new type {0} still exists", newFieldTypeName)); - - if (errmessages.isEmpty()) break; - - Thread.sleep(10); + TimeOut timeout = new TimeOut(TIMEOUT, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while (!timeout.hasTimedOut()) { + errmessages.clear(); + Map<?, ?> m = getObj(harness, aField, "fields"); + if (m != null) { + errmessages.add(StrUtils.formatString("field {0} still exists", aField)); + } + + m = getObj(harness, dynamicFldName, "dynamicFields"); + if (m != null) { + errmessages.add(StrUtils.formatString("dynamic field {0} still exists", dynamicFldName)); } - } finally { - harness.close(); + + List<Map<String, String>> l = getSourceCopyFields(harness, aField); + if (checkCopyField(l, aField, dynamicCopyFldDest)) { + errmessages.add( + StrUtils.formatString( + "CopyField source={0},dest={1} still exists", aField, dynamicCopyFldDest)); + } + + m = getObj(harness, newFieldTypeName, "fieldTypes"); + if (m != null) { + errmessages.add(StrUtils.formatString("new type {0} still exists", newFieldTypeName)); + } + + if (errmessages.isEmpty()) { + break; + } + + timeout.sleep(10); } if (!errmessages.isEmpty()) { errs.addAll(errmessages); } } - private boolean checkCopyField( - @SuppressWarnings({"rawtypes"}) List<Map> l, String src, String dest) { - if (l == null) return false; - for (@SuppressWarnings({"rawtypes"}) Map map : l) { - if (src.equals(map.get("source")) && dest.equals(map.get("dest"))) return true; + private boolean checkCopyField(List<Map<String, String>> l, String src, String dest) { + if (l == null) { + return false; + } + for (Map<String, String> map : l) { + if (src.equals(map.get("source")) && dest.equals(map.get("dest"))) { + return true; + } } return false; }
