dsmiley commented on code in PR #4431:
URL: https://github.com/apache/solr/pull/4431#discussion_r3254798735


##########
solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java:
##########
@@ -265,40 +278,108 @@ protected void makeShardRequest(
       SimpleSolrResponse ssr,
       ShardResponse srsp,
       long startTimeNS) {
-    CompletableFuture<LBSolrClient.Rsp> future = 
this.lbClient.requestAsync(lbReq);
-    // Synchronize on canceled, so that we know precisely whether to add it to 
the responseFutureMap
-    // or not.
-    synchronized (canceled) {
-      if (canceled.get() && !future.isDone()) {
-        future.cancel(true);
-        return;
-      } else {
-        responseFutureMap.put(srsp, future);
+    // Capture submitter context now so the shard task (running on a virtual 
thread) sees the
+    // submitter's MDC + every registered InheritableThreadLocalProvider 
(SolrRequestInfo for PKI,
+    // OTel trace Context, etc.), matching what MDCAwareThreadPoolExecutor 
does for pool threads.
+    // Virtual threads aren't pooled so no restore is needed.
+    final Map<String, String> submitterMdc = MDC.getCopyOfContextMap();
+    final List<ExecutorUtil.InheritableThreadLocalProvider> providers =
+        ExecutorUtil.getThreadLocalProviders();
+    final List<AtomicReference<Object>> providerCtx;
+    if (providers.isEmpty()) {
+      providerCtx = List.of();
+    } else {
+      providerCtx = new ArrayList<>(providers.size());
+      for (ExecutorUtil.InheritableThreadLocalProvider p : providers) {
+        AtomicReference<Object> ref = new AtomicReference<>();
+        p.store(ref);
+        providerCtx.add(ref);
       }
     }
-    // Add the callback explicitly after adding the future to the map, because 
the callback relies
-    // on the map already having the future.
-    future.whenComplete(
-        (LBSolrClient.Rsp rsp, Throwable throwable) -> {
-          if (rsp != null) {
-            ssr.nl = rsp.getResponse();
-            srsp.setShardAddress(rsp.getServer());
-          } else if (throwable != null) {
-            srsp.setException(throwable);
-            if (throwable instanceof SolrException) {
-              srsp.setResponseCode(((SolrException) throwable).code());
-            }
+
+    // Build + dispatch the request on the virtual thread (parallel CPU work) 
and block on the
+    // returned CompletableFuture there. cancelAll() interrupts the virtual 
thread; the catch
+    // below translates that into a graceful jetty-future cancel, which is how 
Jetty wants to be
+    // aborted (vs. interrupting a synchronous send mid-flight).
+    Runnable shardTask =
+        () -> {
+          ExecutorUtil.setServerThreadFlag(Boolean.TRUE);
+          if (submitterMdc != null) {
+            MDC.setContextMap(submitterMdc);
+          }
+          for (int i = 0; i < providers.size(); i++) {
+            providers.get(i).set(providerCtx.get(i));
           }
-          ssr.elapsedTime =
-              TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNS, 
TimeUnit.NANOSECONDS);
-          // Synchronize on cancelled so this code and cancelAll() cannot 
happen at the same time
-          synchronized (canceled) {
-            // We don't want to add responses after the requests have been 
canceled
-            if (responseFutureMap.containsKey(srsp)) {
-              responses.add(HttpShardHandler.this.transformResponse(sreq, 
srsp, shard));
+          CompletableFuture<LBSolrClient.Rsp> jettyFuture = null;
+          try {
+            try {
+              // doPrivileged needed because the request setup reads "solr.*" 
system properties
+              // and otherwise fails under SecurityManager when invoked from a 
virtual thread.
+              @SuppressWarnings("removal")
+              CompletableFuture<LBSolrClient.Rsp> tmp =
+                  AccessController.doPrivileged(
+                      
(PrivilegedExceptionAction<CompletableFuture<LBSolrClient.Rsp>>)
+                          () -> lbClient.requestAsync(lbReq));
+              jettyFuture = tmp;
+              LBSolrClient.Rsp rsp = jettyFuture.get();
+              ssr.nl = rsp.getResponse();
+              srsp.setShardAddress(rsp.getServer());
+            } catch (CancellationException ce) {
+              // jettyFuture was cancelled; leave srsp without an 
exception/response.
+            } catch (InterruptedException ie) {
+              Thread.currentThread().interrupt();
+              if (jettyFuture != null) {
+                jettyFuture.cancel(true);
+              }
+            } catch (ExecutionException ee) {
+              Throwable cause = ee.getCause() != null ? ee.getCause() : ee;
+              srsp.setException(cause);
+              if (cause instanceof SolrException se) {
+                srsp.setResponseCode(se.code());
+              }
+            } catch (PrivilegedActionException pae) {
+              Throwable cause = pae.getCause() != null ? pae.getCause() : pae;
+              srsp.setException(cause);
+              if (cause instanceof SolrException se) {
+                srsp.setResponseCode(se.code());
+              }
+            }
+          } finally {
+            ssr.elapsedTime =
+                TimeUnit.MILLISECONDS.convert(
+                    System.nanoTime() - startTimeNS, TimeUnit.NANOSECONDS);
+            // Synchronize on canceled so this code and cancelAll() cannot 
happen at the same time
+            synchronized (canceled) {
+              // We don't want to add responses after the requests have been 
canceled
+              if (pending.containsKey(srsp)) {
+                responses.add(HttpShardHandler.this.transformResponse(sreq, 
srsp, shard));
+              }
             }
+            for (int i = 0; i < providers.size(); i++) {
+              providers.get(i).clean(providerCtx.get(i));
+            }
+            MDC.clear();

Review Comment:
   as this is a virtual thread which aren't pool'ed, why do this cleanup?



##########
solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java:
##########
@@ -265,40 +278,108 @@ protected void makeShardRequest(
       SimpleSolrResponse ssr,
       ShardResponse srsp,
       long startTimeNS) {
-    CompletableFuture<LBSolrClient.Rsp> future = 
this.lbClient.requestAsync(lbReq);
-    // Synchronize on canceled, so that we know precisely whether to add it to 
the responseFutureMap
-    // or not.
-    synchronized (canceled) {
-      if (canceled.get() && !future.isDone()) {
-        future.cancel(true);
-        return;
-      } else {
-        responseFutureMap.put(srsp, future);
+    // Capture submitter context now so the shard task (running on a virtual 
thread) sees the
+    // submitter's MDC + every registered InheritableThreadLocalProvider 
(SolrRequestInfo for PKI,
+    // OTel trace Context, etc.), matching what MDCAwareThreadPoolExecutor 
does for pool threads.
+    // Virtual threads aren't pooled so no restore is needed.
+    final Map<String, String> submitterMdc = MDC.getCopyOfContextMap();
+    final List<ExecutorUtil.InheritableThreadLocalProvider> providers =
+        ExecutorUtil.getThreadLocalProviders();
+    final List<AtomicReference<Object>> providerCtx;
+    if (providers.isEmpty()) {
+      providerCtx = List.of();
+    } else {
+      providerCtx = new ArrayList<>(providers.size());
+      for (ExecutorUtil.InheritableThreadLocalProvider p : providers) {
+        AtomicReference<Object> ref = new AtomicReference<>();
+        p.store(ref);
+        providerCtx.add(ref);
       }
     }
-    // Add the callback explicitly after adding the future to the map, because 
the callback relies
-    // on the map already having the future.
-    future.whenComplete(
-        (LBSolrClient.Rsp rsp, Throwable throwable) -> {
-          if (rsp != null) {
-            ssr.nl = rsp.getResponse();
-            srsp.setShardAddress(rsp.getServer());
-          } else if (throwable != null) {
-            srsp.setException(throwable);
-            if (throwable instanceof SolrException) {
-              srsp.setResponseCode(((SolrException) throwable).code());
-            }
+
+    // Build + dispatch the request on the virtual thread (parallel CPU work) 
and block on the
+    // returned CompletableFuture there. cancelAll() interrupts the virtual 
thread; the catch
+    // below translates that into a graceful jetty-future cancel, which is how 
Jetty wants to be
+    // aborted (vs. interrupting a synchronous send mid-flight).
+    Runnable shardTask =
+        () -> {
+          ExecutorUtil.setServerThreadFlag(Boolean.TRUE);
+          if (submitterMdc != null) {
+            MDC.setContextMap(submitterMdc);
+          }
+          for (int i = 0; i < providers.size(); i++) {
+            providers.get(i).set(providerCtx.get(i));
           }
-          ssr.elapsedTime =
-              TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNS, 
TimeUnit.NANOSECONDS);
-          // Synchronize on cancelled so this code and cancelAll() cannot 
happen at the same time
-          synchronized (canceled) {
-            // We don't want to add responses after the requests have been 
canceled
-            if (responseFutureMap.containsKey(srsp)) {
-              responses.add(HttpShardHandler.this.transformResponse(sreq, 
srsp, shard));
+          CompletableFuture<LBSolrClient.Rsp> jettyFuture = null;
+          try {
+            try {
+              // doPrivileged needed because the request setup reads "solr.*" 
system properties
+              // and otherwise fails under SecurityManager when invoked from a 
virtual thread.
+              @SuppressWarnings("removal")
+              CompletableFuture<LBSolrClient.Rsp> tmp =

Review Comment:
   why declare tmp instead of directly setting jettyFuture?



##########
solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java:
##########
@@ -265,40 +278,108 @@ protected void makeShardRequest(
       SimpleSolrResponse ssr,
       ShardResponse srsp,
       long startTimeNS) {
-    CompletableFuture<LBSolrClient.Rsp> future = 
this.lbClient.requestAsync(lbReq);
-    // Synchronize on canceled, so that we know precisely whether to add it to 
the responseFutureMap
-    // or not.
-    synchronized (canceled) {
-      if (canceled.get() && !future.isDone()) {
-        future.cancel(true);
-        return;
-      } else {
-        responseFutureMap.put(srsp, future);
+    // Capture submitter context now so the shard task (running on a virtual 
thread) sees the
+    // submitter's MDC + every registered InheritableThreadLocalProvider 
(SolrRequestInfo for PKI,
+    // OTel trace Context, etc.), matching what MDCAwareThreadPoolExecutor 
does for pool threads.
+    // Virtual threads aren't pooled so no restore is needed.
+    final Map<String, String> submitterMdc = MDC.getCopyOfContextMap();
+    final List<ExecutorUtil.InheritableThreadLocalProvider> providers =
+        ExecutorUtil.getThreadLocalProviders();
+    final List<AtomicReference<Object>> providerCtx;
+    if (providers.isEmpty()) {
+      providerCtx = List.of();
+    } else {
+      providerCtx = new ArrayList<>(providers.size());
+      for (ExecutorUtil.InheritableThreadLocalProvider p : providers) {
+        AtomicReference<Object> ref = new AtomicReference<>();
+        p.store(ref);
+        providerCtx.add(ref);
       }
     }
-    // Add the callback explicitly after adding the future to the map, because 
the callback relies
-    // on the map already having the future.
-    future.whenComplete(
-        (LBSolrClient.Rsp rsp, Throwable throwable) -> {
-          if (rsp != null) {
-            ssr.nl = rsp.getResponse();
-            srsp.setShardAddress(rsp.getServer());
-          } else if (throwable != null) {
-            srsp.setException(throwable);
-            if (throwable instanceof SolrException) {
-              srsp.setResponseCode(((SolrException) throwable).code());
-            }
+
+    // Build + dispatch the request on the virtual thread (parallel CPU work) 
and block on the
+    // returned CompletableFuture there. cancelAll() interrupts the virtual 
thread; the catch
+    // below translates that into a graceful jetty-future cancel, which is how 
Jetty wants to be
+    // aborted (vs. interrupting a synchronous send mid-flight).
+    Runnable shardTask =
+        () -> {
+          ExecutorUtil.setServerThreadFlag(Boolean.TRUE);
+          if (submitterMdc != null) {
+            MDC.setContextMap(submitterMdc);
+          }
+          for (int i = 0; i < providers.size(); i++) {
+            providers.get(i).set(providerCtx.get(i));
           }
-          ssr.elapsedTime =
-              TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNS, 
TimeUnit.NANOSECONDS);
-          // Synchronize on cancelled so this code and cancelAll() cannot 
happen at the same time
-          synchronized (canceled) {
-            // We don't want to add responses after the requests have been 
canceled
-            if (responseFutureMap.containsKey(srsp)) {
-              responses.add(HttpShardHandler.this.transformResponse(sreq, 
srsp, shard));
+          CompletableFuture<LBSolrClient.Rsp> jettyFuture = null;
+          try {
+            try {
+              // doPrivileged needed because the request setup reads "solr.*" 
system properties
+              // and otherwise fails under SecurityManager when invoked from a 
virtual thread.
+              @SuppressWarnings("removal")
+              CompletableFuture<LBSolrClient.Rsp> tmp =
+                  AccessController.doPrivileged(
+                      
(PrivilegedExceptionAction<CompletableFuture<LBSolrClient.Rsp>>)

Review Comment:
   ugh; sad, for something so trivial.  Why is this *not* needed seemingly 
everywhere else?
   
   Any way, we'll likely be removing such things soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to