This is an automated email from the ASF dual-hosted git repository.
cpoerschke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 7c07670 SOLR-15983: parallel log replay now uses separate
UpdateRequestProcessor objects (#601)
7c07670 is described below
commit 7c0767064eeaddda5ecf8e5b0ff0e07524dfafce
Author: Christine Poerschke <[email protected]>
AuthorDate: Mon Feb 21 12:29:56 2022 +0000
SOLR-15983: parallel log replay now uses separate UpdateRequestProcessor
objects (#601)
* parallel log replay now uses separate UpdateRequestProcessor objects
* executor local variable in UpdateLog.doReplay can be null
* Use a ThreadLocal of the URP to avoid re-creating and avoid excessive
finish() calls and to make it clearer that the URPs are used correctly (have
correct lifecycle and aren't shared).
* remove unused procThreadLocal member in favour of local variable with
same name
* just one for-loop on procPool to finish-and-close
* add solr/CHANGES.txt entry
Co-authored-by: David Smiley <[email protected]>
---
solr/CHANGES.txt | 2 +
.../src/java/org/apache/solr/update/UpdateLog.java | 52 +++++++++++++---------
.../update/processor/UpdateRequestProcessor.java | 2 +
3 files changed, 34 insertions(+), 22 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index d9e4bce..665df99 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -656,6 +656,8 @@ Bug Fixes
* SOLR-16019: UTF-8 parsing errors for parameters should cause a HTTP 400
status code, not 500 (janhoy, Matthias Pigulla)
+* SOLR-15983: Fix ClassCastException in UpdateLog$LogReplayer.doReplay.
(Christine Poerschke, David Smiley)
+
================== 8.11.2 ==================
Bug Fixes
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index eee0865..d478d9f 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1824,8 +1824,15 @@ public class UpdateLog implements PluginInfoInitialized,
SolrMetricProducer {
// NOTE: we don't currently handle a core reload during recovery.
This would cause the core
// to change underneath us.
+ // Use a pool of URPs using a ThreadLocal to have them per-thread.
URPs aren't threadsafe.
UpdateRequestProcessorChain processorChain =
req.getCore().getUpdateProcessingChain(null);
- UpdateRequestProcessor proc = processorChain.createProcessor(req, rsp);
+ Collection<UpdateRequestProcessor> procPool =
Collections.synchronizedList(new ArrayList<>());
+ ThreadLocal<UpdateRequestProcessor> procThreadLocal =
ThreadLocal.withInitial(() -> {
+ var proc = processorChain.createProcessor(req, rsp);
+ procPool.add(proc);
+ return proc;
+ });
+
OrderedExecutor executor = inSortedOrder ? null :
req.getCoreContainer().getReplayUpdatesExecutor();
AtomicInteger pendingTasks = new AtomicInteger(0);
AtomicReference<SolrException> exceptionOnExecuteUpdate = new
AtomicReference<>();
@@ -1906,7 +1913,7 @@ public class UpdateLog implements PluginInfoInitialized,
SolrMetricProducer {
AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req,
entry, oper, version);
cmd.setFlags(UpdateCommand.REPLAY |
UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) log.debug("{} {}", oper == ADD ? "add" : "update",
cmd);
- execute(cmd, executor, pendingTasks, proc,
exceptionOnExecuteUpdate);
+ execute(cmd, executor, pendingTasks, procThreadLocal,
exceptionOnExecuteUpdate);
break;
}
case UpdateLog.DELETE: {
@@ -1917,7 +1924,7 @@ public class UpdateLog implements PluginInfoInitialized,
SolrMetricProducer {
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.REPLAY |
UpdateCommand.IGNORE_AUTOCOMMIT);
if (debug) log.debug("delete {}", cmd);
- execute(cmd, executor, pendingTasks, proc,
exceptionOnExecuteUpdate);
+ execute(cmd, executor, pendingTasks, procThreadLocal,
exceptionOnExecuteUpdate);
break;
}
@@ -1931,7 +1938,7 @@ public class UpdateLog implements PluginInfoInitialized,
SolrMetricProducer {
if (debug) log.debug("deleteByQuery {}", cmd);
waitForAllUpdatesGetExecuted(pendingTasks);
// DBQ will be executed in the same thread
- execute(cmd, null, pendingTasks, proc,
exceptionOnExecuteUpdate);
+ execute(cmd, null, pendingTasks, procThreadLocal,
exceptionOnExecuteUpdate);
break;
}
case UpdateLog.COMMIT: {
@@ -1988,13 +1995,15 @@ public class UpdateLog implements
PluginInfoInitialized, SolrMetricProducer {
translog.writeCommit(cmd);
}
- try {
- proc.finish();
- } catch (IOException ex) {
- recoveryInfo.errors.incrementAndGet();
- loglog.error("Replay exception: finish()", ex);
- } finally {
- IOUtils.closeQuietly(proc);
+ for (UpdateRequestProcessor proc : procPool) {
+ try {
+ proc.finish();
+ } catch (IOException ex) {
+ recoveryInfo.errors.incrementAndGet();
+ loglog.error("Replay exception: finish()", ex);
+ } finally {
+ IOUtils.closeQuietly(proc);
+ }
}
} finally {
@@ -2036,7 +2045,7 @@ public class UpdateLog implements PluginInfoInitialized,
SolrMetricProducer {
}
private void execute(UpdateCommand cmd, OrderedExecutor executor,
- AtomicInteger pendingTasks, UpdateRequestProcessor
proc,
+ AtomicInteger pendingTasks,
ThreadLocal<UpdateRequestProcessor> procTl,
AtomicReference<SolrException> exceptionHolder) {
assert cmd instanceof AddUpdateCommand || cmd instanceof
DeleteUpdateCommand;
@@ -2046,11 +2055,7 @@ public class UpdateLog implements PluginInfoInitialized,
SolrMetricProducer {
try {
// fail fast
if (exceptionHolder.get() != null) return;
- if (cmd instanceof AddUpdateCommand) {
- proc.processAdd((AddUpdateCommand) cmd);
- } else {
- proc.processDelete((DeleteUpdateCommand) cmd);
- }
+ invokeCmdOnProc(cmd, procTl.get());
} catch (IOException e) {
recoveryInfo.errors.incrementAndGet();
loglog.warn("REPLAY_ERR: IOException reading log", e);
@@ -2069,11 +2074,7 @@ public class UpdateLog implements PluginInfoInitialized,
SolrMetricProducer {
pendingTasks.incrementAndGet();
} else {
try {
- if (cmd instanceof AddUpdateCommand) {
- proc.processAdd((AddUpdateCommand) cmd);
- } else {
- proc.processDelete((DeleteUpdateCommand) cmd);
- }
+ invokeCmdOnProc(cmd, procTl.get());
} catch (IOException e) {
recoveryInfo.errors.incrementAndGet();
loglog.warn("REPLAY_ERR: IOException replaying log", e);
@@ -2088,6 +2089,13 @@ public class UpdateLog implements PluginInfoInitialized,
SolrMetricProducer {
}
}
+ private void invokeCmdOnProc(UpdateCommand cmd, UpdateRequestProcessor
proc) throws IOException {
+ if (cmd instanceof AddUpdateCommand) {
+ proc.processAdd((AddUpdateCommand) cmd);
+ } else {
+ proc.processDelete((DeleteUpdateCommand) cmd);
+ }
+ }
}
diff --git
a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java
b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java
index 7963ec8..a8657de 100644
---
a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java
+++
b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.SolrThreadUnsafe;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
*
* @since solr 1.3
*/
+@SolrThreadUnsafe
public abstract class UpdateRequestProcessor implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());