This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 001eaefedc fixes NPE in coordinator (#5612)
001eaefedc is described below

commit 001eaefedc24a4522071fd56671a01de297d1201
Author: Keith Turner <[email protected]>
AuthorDate: Tue Jun 3 10:58:44 2025 -0400

    fixes NPE in coordinator (#5612)
    
    When the compaction coordinator requested a connection to a tserver from
    LiveTserverSet it could return null. If this happened an NPE would be
    thrown.  Modified the coordinator to handle null for these cases.
---
 .../coordinator/CompactionCoordinator.java         | 30 +++++++++++++++++-----
 1 file changed, 23 insertions(+), 7 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 1b414bd809..25b1ce39e7 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -425,12 +425,18 @@ public class CompactionCoordinator extends AbstractServer 
implements
         LOG.trace("Contacting tablet server {} to get external compaction 
summaries",
             tsi.getHostPort());
         client = getTabletServerConnection(tsi);
-        List<TCompactionQueueSummary> summaries =
-            client.getCompactionQueueInfo(TraceUtil.traceInfo(), 
getContext().rpcCreds());
-        QUEUE_SUMMARIES.update(tsi, summaries);
-        summaries.forEach(summary -> {
-          queuesSeen.add(summary.getQueue());
-        });
+        if (client != null) {
+          List<TCompactionQueueSummary> summaries =
+              client.getCompactionQueueInfo(TraceUtil.traceInfo(), 
getContext().rpcCreds());
+          QUEUE_SUMMARIES.update(tsi, summaries);
+          summaries.forEach(summary -> {
+            queuesSeen.add(summary.getQueue());
+          });
+        } else {
+          LOG.trace("Connection to get summaries could not be established {} ",
+              tsi.getHostAndPort());
+          QUEUE_SUMMARIES.remove(Set.of(tsi));
+        }
       } finally {
         ThriftUtil.returnClient(client, getContext());
       }
@@ -506,13 +512,20 @@ public class CompactionCoordinator extends AbstractServer 
implements
       TabletClientService.Client client = null;
       try {
         client = getTabletServerConnection(tserver);
+        if (client == null) {
+          LOG.trace("No connection established for queue {} on tserver {}, 
trying next tserver",
+              queue, tserver.getHostAndPort());
+          QUEUE_SUMMARIES.removeSummary(tserver, queue, prioTserver.prio);
+          prioTserver = QUEUE_SUMMARIES.getNextTserver(queue);
+          continue;
+        }
+
         TExternalCompactionJob job =
             client.reserveCompactionJob(TraceUtil.traceInfo(), 
getContext().rpcCreds(), queue,
                 prioTserver.prio, compactorAddress, externalCompactionId);
         if (null == job.getExternalCompactionId()) {
           LOG.trace("No compactions found for queue {} on tserver {}, trying 
next tserver", queue,
               tserver.getHostAndPort());
-
           QUEUE_SUMMARIES.removeSummary(tserver, queue, prioTserver.prio);
           prioTserver = QUEUE_SUMMARIES.getNextTserver(queue);
           continue;
@@ -555,6 +568,9 @@ public class CompactionCoordinator extends AbstractServer 
implements
   protected TabletClientService.Client 
getTabletServerConnection(TServerInstance tserver)
       throws TTransportException {
     TServerConnection connection = tserverSet.getConnection(tserver);
+    if (connection == null) {
+      return null;
+    }
     ServerContext serverContext = getContext();
     TTransport transport = serverContext.getTransportPool().getTransport(
         ThriftClientTypes.TABLET_SERVER, connection.getAddress(), 0, 
serverContext, true);

Reply via email to