This is an automated email from the ASF dual-hosted git repository. cpoerschke pushed a commit to branch branch_9_0 in repository https://gitbox.apache.org/repos/asf/solr.git
commit bc501bab8a599ec313b042844611bc4192899f10 Author: Christine Poerschke <[email protected]> AuthorDate: Mon Feb 21 12:29:56 2022 +0000 SOLR-15983: parallel log replay now uses separate UpdateRequestProcessor objects (#601) * parallel log replay now uses separate UpdateRequestProcessor objects * executor local variable in UpdateLog.doReplay can be null * Use a ThreadLocal of the URP to avoid re-creating and avoid excessive finish() calls and to make it clearer that the URPs are used correctly (have correct lifecycle and aren't shared). * remove unused procThreadLocal member in favour of local variable with same name * just one for-loop on procPool to finish-and-close * add solr/CHANGES.txt entry Co-authored-by: David Smiley <[email protected]> (cherry picked from commit 7c0767064eeaddda5ecf8e5b0ff0e07524dfafce) (cherry picked from commit 7d61190a0d917b9319c30d76328d62d1435dbe58) Resolved Conflicts: solr/CHANGES.txt solr/core/src/java/org/apache/solr/update/UpdateLog.java --- solr/CHANGES.txt | 2 + .../src/java/org/apache/solr/update/UpdateLog.java | 52 +++++++++++++--------- .../update/processor/UpdateRequestProcessor.java | 2 + 3 files changed, 34 insertions(+), 22 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 9a42505..5103f2f 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -611,6 +611,8 @@ Bug Fixes * SOLR-16019: UTF-8 parsing errors for parameters should cause a HTTP 400 status code, not 500 (janhoy, Matthias Pigulla) +* SOLR-15983: Fix ClassCastException in UpdateLog$LogReplayer.doReplay. (Christine Poerschke, David Smiley) + * SOLR-15333: Reduced spurious warn logging by AbstractSpatialPrefixTreeFieldType field properties (Steffen Moldenhauer, David Smiley, Mike Drob) ================== 8.11.1 ================== diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index f4c4cc0..73ac3d0 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -1824,8 +1824,15 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { // NOTE: we don't currently handle a core reload during recovery. This would cause the core // to change underneath us. + // Use a pool of URPs using a ThreadLocal to have them per-thread. URPs aren't threadsafe. UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(null); - UpdateRequestProcessor proc = processorChain.createProcessor(req, rsp); + Collection<UpdateRequestProcessor> procPool = Collections.synchronizedList(new ArrayList<>()); + ThreadLocal<UpdateRequestProcessor> procThreadLocal = ThreadLocal.withInitial(() -> { + var proc = processorChain.createProcessor(req, rsp); + procPool.add(proc); + return proc; + }); + OrderedExecutor executor = inSortedOrder ? null : req.getCore().getCoreContainer().getReplayUpdatesExecutor(); AtomicInteger pendingTasks = new AtomicInteger(0); AtomicReference<SolrException> exceptionOnExecuteUpdate = new AtomicReference<>(); @@ -1906,7 +1913,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, entry, oper, version); cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT); if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", cmd); - execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate); + execute(cmd, executor, pendingTasks, procThreadLocal, exceptionOnExecuteUpdate); break; } case UpdateLog.DELETE: { @@ -1917,7 +1924,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { cmd.setVersion(version); cmd.setFlags(UpdateCommand.REPLAY | UpdateCommand.IGNORE_AUTOCOMMIT); if (debug) log.debug("delete {}", cmd); - execute(cmd, executor, pendingTasks, proc, exceptionOnExecuteUpdate); + execute(cmd, executor, pendingTasks, procThreadLocal, exceptionOnExecuteUpdate); break; } @@ -1931,7 +1938,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { if (debug) log.debug("deleteByQuery {}", cmd); waitForAllUpdatesGetExecuted(pendingTasks); // DBQ will be executed in the same thread - execute(cmd, null, pendingTasks, proc, exceptionOnExecuteUpdate); + execute(cmd, null, pendingTasks, procThreadLocal, exceptionOnExecuteUpdate); break; } case UpdateLog.COMMIT: { @@ -1988,13 +1995,15 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { translog.writeCommit(cmd); } - try { - proc.finish(); - } catch (IOException ex) { - recoveryInfo.errors.incrementAndGet(); - loglog.error("Replay exception: finish()", ex); - } finally { - IOUtils.closeQuietly(proc); + for (UpdateRequestProcessor proc : procPool) { + try { + proc.finish(); + } catch (IOException ex) { + recoveryInfo.errors.incrementAndGet(); + loglog.error("Replay exception: finish()", ex); + } finally { + IOUtils.closeQuietly(proc); + } } } finally { @@ -2036,7 +2045,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { } private void execute(UpdateCommand cmd, OrderedExecutor executor, - AtomicInteger pendingTasks, UpdateRequestProcessor proc, + AtomicInteger pendingTasks, ThreadLocal<UpdateRequestProcessor> procTl, AtomicReference<SolrException> exceptionHolder) { assert cmd instanceof AddUpdateCommand || cmd instanceof DeleteUpdateCommand; @@ -2046,11 +2055,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { try { // fail fast if (exceptionHolder.get() != null) return; - if (cmd instanceof AddUpdateCommand) { - proc.processAdd((AddUpdateCommand) cmd); - } else { - proc.processDelete((DeleteUpdateCommand) cmd); - } + invokeCmdOnProc(cmd, procTl.get()); } catch (IOException e) { recoveryInfo.errors.incrementAndGet(); loglog.warn("REPLAY_ERR: IOException reading log", e); @@ -2069,11 +2074,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { pendingTasks.incrementAndGet(); } else { try { - if (cmd instanceof AddUpdateCommand) { - proc.processAdd((AddUpdateCommand) cmd); - } else { - proc.processDelete((DeleteUpdateCommand) cmd); - } + invokeCmdOnProc(cmd, procTl.get()); } catch (IOException e) { recoveryInfo.errors.incrementAndGet(); loglog.warn("REPLAY_ERR: IOException replaying log", e); @@ -2088,6 +2089,13 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer { } } + private void invokeCmdOnProc(UpdateCommand cmd, UpdateRequestProcessor proc) throws IOException { + if (cmd instanceof AddUpdateCommand) { + proc.processAdd((AddUpdateCommand) cmd); + } else { + proc.processDelete((DeleteUpdateCommand) cmd); + } + } } diff --git a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java index 7963ec8..a8657de 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import org.apache.solr.common.SolrException; +import org.apache.solr.common.annotation.SolrThreadUnsafe; import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.DeleteUpdateCommand; @@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory; * * @since solr 1.3 */ +@SolrThreadUnsafe public abstract class UpdateRequestProcessor implements Closeable { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
