This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 001eaefedc fixes NPE in coordinator (#5612)
001eaefedc is described below
commit 001eaefedc24a4522071fd56671a01de297d1201
Author: Keith Turner <[email protected]>
AuthorDate: Tue Jun 3 10:58:44 2025 -0400
fixes NPE in coordinator (#5612)
When the compaction coordinator requested a connection to a tserver from
LiveTserverSet it could return null. If this happened an NPE would be
thrown. Modified the coordinator to handle null for these cases.
---
.../coordinator/CompactionCoordinator.java | 30 +++++++++++++++++-----
1 file changed, 23 insertions(+), 7 deletions(-)
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 1b414bd809..25b1ce39e7 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -425,12 +425,18 @@ public class CompactionCoordinator extends AbstractServer
implements
LOG.trace("Contacting tablet server {} to get external compaction
summaries",
tsi.getHostPort());
client = getTabletServerConnection(tsi);
- List<TCompactionQueueSummary> summaries =
- client.getCompactionQueueInfo(TraceUtil.traceInfo(),
getContext().rpcCreds());
- QUEUE_SUMMARIES.update(tsi, summaries);
- summaries.forEach(summary -> {
- queuesSeen.add(summary.getQueue());
- });
+ if (client != null) {
+ List<TCompactionQueueSummary> summaries =
+ client.getCompactionQueueInfo(TraceUtil.traceInfo(),
getContext().rpcCreds());
+ QUEUE_SUMMARIES.update(tsi, summaries);
+ summaries.forEach(summary -> {
+ queuesSeen.add(summary.getQueue());
+ });
+ } else {
+ LOG.trace("Connection to get summaries could not be established {} ",
+ tsi.getHostAndPort());
+ QUEUE_SUMMARIES.remove(Set.of(tsi));
+ }
} finally {
ThriftUtil.returnClient(client, getContext());
}
@@ -506,13 +512,20 @@ public class CompactionCoordinator extends AbstractServer
implements
TabletClientService.Client client = null;
try {
client = getTabletServerConnection(tserver);
+ if (client == null) {
+ LOG.trace("No connection established for queue {} on tserver {},
trying next tserver",
+ queue, tserver.getHostAndPort());
+ QUEUE_SUMMARIES.removeSummary(tserver, queue, prioTserver.prio);
+ prioTserver = QUEUE_SUMMARIES.getNextTserver(queue);
+ continue;
+ }
+
TExternalCompactionJob job =
client.reserveCompactionJob(TraceUtil.traceInfo(),
getContext().rpcCreds(), queue,
prioTserver.prio, compactorAddress, externalCompactionId);
if (null == job.getExternalCompactionId()) {
LOG.trace("No compactions found for queue {} on tserver {}, trying
next tserver", queue,
tserver.getHostAndPort());
-
QUEUE_SUMMARIES.removeSummary(tserver, queue, prioTserver.prio);
prioTserver = QUEUE_SUMMARIES.getNextTserver(queue);
continue;
@@ -555,6 +568,9 @@ public class CompactionCoordinator extends AbstractServer
implements
protected TabletClientService.Client
getTabletServerConnection(TServerInstance tserver)
throws TTransportException {
TServerConnection connection = tserverSet.getConnection(tserver);
+ if (connection == null) {
+ return null;
+ }
ServerContext serverContext = getContext();
TTransport transport = serverContext.getTransportPool().getTransport(
ThriftClientTypes.TABLET_SERVER, connection.getAddress(), 0,
serverContext, true);