This is an automated email from the ASF dual-hosted git repository.

cpoerschke pushed a commit to branch branch_9_0
in repository https://gitbox.apache.org/repos/asf/solr.git

commit bc501bab8a599ec313b042844611bc4192899f10
Author: Christine Poerschke <[email protected]>
AuthorDate: Mon Feb 21 12:29:56 2022 +0000

    SOLR-15983: parallel log replay now uses separate UpdateRequestProcessor 
objects (#601)
    
    * parallel log replay now uses separate UpdateRequestProcessor objects
    
    * executor local variable in UpdateLog.doReplay can be null
    
    * Use a ThreadLocal of the URP to avoid re-creating and avoid excessive 
finish() calls and to make it clearer that the URPs are used correctly (have 
correct lifecycle and aren't shared).
    
    * remove unused procThreadLocal member in favour of local variable with 
same name
    
    * just one for-loop on procPool to finish-and-close
    
    * add solr/CHANGES.txt entry
    
    Co-authored-by: David Smiley <[email protected]>
    (cherry picked from commit 7c0767064eeaddda5ecf8e5b0ff0e07524dfafce)
    (cherry picked from commit 7d61190a0d917b9319c30d76328d62d1435dbe58)
    
    Resolved Conflicts:
        solr/CHANGES.txt
        solr/core/src/java/org/apache/solr/update/UpdateLog.java
---
 solr/CHANGES.txt                                   |  2 +
 .../src/java/org/apache/solr/update/UpdateLog.java | 52 +++++++++++++---------
 .../update/processor/UpdateRequestProcessor.java   |  2 +
 3 files changed, 34 insertions(+), 22 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 9a42505..5103f2f 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -611,6 +611,8 @@ Bug Fixes
 
 * SOLR-16019: UTF-8 parsing errors for parameters should cause a HTTP 400 
status code, not 500 (janhoy, Matthias Pigulla)
 
+* SOLR-15983: Fix ClassCastException in UpdateLog$LogReplayer.doReplay. 
(Christine Poerschke, David Smiley)
+
 * SOLR-15333: Reduced spurious warn logging by 
AbstractSpatialPrefixTreeFieldType field properties (Steffen Moldenhauer, David 
Smiley, Mike Drob)
 
 ==================  8.11.1 ==================
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java 
b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index f4c4cc0..73ac3d0 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -1824,8 +1824,15 @@ public class UpdateLog implements PluginInfoInitialized, 
SolrMetricProducer {
         // NOTE: we don't currently handle a core reload during recovery.  
This would cause the core
         // to change underneath us.
 
+        // Use a pool of URPs using a ThreadLocal to have them per-thread.  
URPs aren't threadsafe.
         UpdateRequestProcessorChain processorChain = 
req.getCore().getUpdateProcessingChain(null);
-        UpdateRequestProcessor proc = processorChain.createProcessor(req, rsp);
+        Collection<UpdateRequestProcessor> procPool = 
Collections.synchronizedList(new ArrayList<>());
+        ThreadLocal<UpdateRequestProcessor> procThreadLocal = 
ThreadLocal.withInitial(() -> {
+          var proc = processorChain.createProcessor(req, rsp);
+          procPool.add(proc);
+          return proc;
+        });
+
         OrderedExecutor executor = inSortedOrder ? null : 
req.getCore().getCoreContainer().getReplayUpdatesExecutor();
         AtomicInteger pendingTasks = new AtomicInteger(0);
         AtomicReference<SolrException> exceptionOnExecuteUpdate = new 
AtomicReference<>();
@@ -1906,7 +1913,7 @@ public class UpdateLog implements PluginInfoInitialized, 
SolrMetricProducer {
                 AddUpdateCommand cmd = convertTlogEntryToAddUpdateCommand(req, 
entry, oper, version);
                 cmd.setFlags(UpdateCommand.REPLAY | 
UpdateCommand.IGNORE_AUTOCOMMIT);
                 if (debug) log.debug("{} {}", oper == ADD ? "add" : "update", 
cmd);
-                execute(cmd, executor, pendingTasks, proc, 
exceptionOnExecuteUpdate);
+                execute(cmd, executor, pendingTasks, procThreadLocal, 
exceptionOnExecuteUpdate);
                 break;
               }
               case UpdateLog.DELETE: {
@@ -1917,7 +1924,7 @@ public class UpdateLog implements PluginInfoInitialized, 
SolrMetricProducer {
                 cmd.setVersion(version);
                 cmd.setFlags(UpdateCommand.REPLAY | 
UpdateCommand.IGNORE_AUTOCOMMIT);
                 if (debug) log.debug("delete {}", cmd);
-                execute(cmd, executor, pendingTasks, proc, 
exceptionOnExecuteUpdate);
+                execute(cmd, executor, pendingTasks, procThreadLocal, 
exceptionOnExecuteUpdate);
                 break;
               }
 
@@ -1931,7 +1938,7 @@ public class UpdateLog implements PluginInfoInitialized, 
SolrMetricProducer {
                 if (debug) log.debug("deleteByQuery {}", cmd);
                 waitForAllUpdatesGetExecuted(pendingTasks);
                 // DBQ will be executed in the same thread
-                execute(cmd, null, pendingTasks, proc, 
exceptionOnExecuteUpdate);
+                execute(cmd, null, pendingTasks, procThreadLocal, 
exceptionOnExecuteUpdate);
                 break;
               }
               case UpdateLog.COMMIT: {
@@ -1988,13 +1995,15 @@ public class UpdateLog implements 
PluginInfoInitialized, SolrMetricProducer {
           translog.writeCommit(cmd);
         }
 
-        try {
-          proc.finish();
-        } catch (IOException ex) {
-          recoveryInfo.errors.incrementAndGet();
-          loglog.error("Replay exception: finish()", ex);
-        } finally {
-          IOUtils.closeQuietly(proc);
+        for (UpdateRequestProcessor proc : procPool) {
+          try {
+            proc.finish();
+          } catch (IOException ex) {
+            recoveryInfo.errors.incrementAndGet();
+            loglog.error("Replay exception: finish()", ex);
+          } finally {
+            IOUtils.closeQuietly(proc);
+          }
         }
 
       } finally {
@@ -2036,7 +2045,7 @@ public class UpdateLog implements PluginInfoInitialized, 
SolrMetricProducer {
     }
 
     private void execute(UpdateCommand cmd, OrderedExecutor executor,
-                         AtomicInteger pendingTasks, UpdateRequestProcessor 
proc,
+                         AtomicInteger pendingTasks, 
ThreadLocal<UpdateRequestProcessor> procTl,
                          AtomicReference<SolrException> exceptionHolder) {
       assert cmd instanceof AddUpdateCommand || cmd instanceof 
DeleteUpdateCommand;
 
@@ -2046,11 +2055,7 @@ public class UpdateLog implements PluginInfoInitialized, 
SolrMetricProducer {
           try {
             // fail fast
             if (exceptionHolder.get() != null) return;
-            if (cmd instanceof AddUpdateCommand) {
-              proc.processAdd((AddUpdateCommand) cmd);
-            } else {
-              proc.processDelete((DeleteUpdateCommand) cmd);
-            }
+            invokeCmdOnProc(cmd, procTl.get());
           } catch (IOException e) {
             recoveryInfo.errors.incrementAndGet();
             loglog.warn("REPLAY_ERR: IOException reading log", e);
@@ -2069,11 +2074,7 @@ public class UpdateLog implements PluginInfoInitialized, 
SolrMetricProducer {
         pendingTasks.incrementAndGet();
       } else {
         try {
-          if (cmd instanceof AddUpdateCommand) {
-            proc.processAdd((AddUpdateCommand) cmd);
-          } else {
-            proc.processDelete((DeleteUpdateCommand) cmd);
-          }
+          invokeCmdOnProc(cmd, procTl.get());
         } catch (IOException e) {
           recoveryInfo.errors.incrementAndGet();
           loglog.warn("REPLAY_ERR: IOException replaying log", e);
@@ -2088,6 +2089,13 @@ public class UpdateLog implements PluginInfoInitialized, 
SolrMetricProducer {
       }
     }
 
+    private void invokeCmdOnProc(UpdateCommand cmd, UpdateRequestProcessor 
proc) throws IOException {
+      if (cmd instanceof AddUpdateCommand) {
+        proc.processAdd((AddUpdateCommand) cmd);
+      } else {
+        proc.processDelete((DeleteUpdateCommand) cmd);
+      }
+    }
 
   }
 
diff --git 
a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java
 
b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java
index 7963ec8..a8657de 100644
--- 
a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java
+++ 
b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessor.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.SolrThreadUnsafe;
 import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
  * 
  * @since solr 1.3
  */
+@SolrThreadUnsafe
 public abstract class UpdateRequestProcessor implements Closeable {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   

Reply via email to