This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 0b38147 Return Thrift client to pool in finally block so that the connection can be reused. 0b38147 is described below commit 0b38147676218e7b33bcf9474b0e44880ba61a0b Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Apr 20 12:36:44 2021 +0000 Return Thrift client to pool in finally block so that the connection can be reused. --- .../server/compaction/RetryableThriftCall.java | 10 ++++++---- .../java/org/apache/accumulo/compactor/Compactor.java | 18 ++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java index ecf5428..82108fb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java @@ -101,7 +101,7 @@ public class RetryableThriftCall<T> { try { result = function.execute(); } catch (TException e) { - LOG.error("Error in Thrift function, retrying in {}ms. Error: {}", waitTime, e); + LOG.error("Error in Thrift function, retrying in {}ms. Error: {}", waitTime, e, e); if (!retryForever) { numRetries++; if (numRetries > maxNumRetries) { @@ -110,9 +110,11 @@ public class RetryableThriftCall<T> { } } } - UtilWaitThread.sleep(waitTime); - if (waitTime != maxWaitTime) { - waitTime = Math.min(waitTime * 2, maxWaitTime); + if (result == null) { + UtilWaitThread.sleep(waitTime); + if (waitTime != maxWaitTime) { + waitTime = Math.min(waitTime * 2, maxWaitTime); + } } } while (null == result); return result; diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 0190820..8246fcc 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -115,7 +115,8 @@ public class Compactor extends AbstractServer private static final long TIME_BETWEEN_GC_CHECKS = 5000; private static final CompactionJobHolder JOB_HOLDER = new CompactionJobHolder(); private static final long TEN_MEGABYTES = 10485760; - + private static final CompactionCoordinator.Client.Factory COORDINATOR_CLIENT_FACTORY = new CompactionCoordinator.Client.Factory(); + private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); private final UUID compactorId = UUID.randomUUID(); private final AccumuloConfiguration aconf; @@ -337,9 +338,8 @@ public class Compactor extends AbstractServer getContext().rpcCreds(), job.getExternalCompactionId(), state, message, System.currentTimeMillis()); return ""; - } catch (TException e) { + } finally { ThriftUtil.returnClient(coordinatorClient.getAndSet(null)); - throw e; } } }); @@ -365,9 +365,8 @@ public class Compactor extends AbstractServer coordinatorClient.get().compactionFailed(TraceUtil.traceInfo(), getContext().rpcCreds(), job.getExternalCompactionId(), job.extent); return ""; - } catch (TException e) { + } finally { ThriftUtil.returnClient(coordinatorClient.getAndSet(null)); - throw e; } } }); @@ -395,9 +394,8 @@ public class Compactor extends AbstractServer coordinatorClient.get().compactionCompleted(TraceUtil.traceInfo(), getContext().rpcCreds(), job.getExternalCompactionId(), job.extent, stats); return ""; - } catch (TException e) { + } finally { ThriftUtil.returnClient(coordinatorClient.getAndSet(null)); - throw e; } } }); @@ -430,8 +428,9 @@ public class Compactor extends AbstractServer eci.toString()); } catch (TException e) { currentCompactionId.set(null); - ThriftUtil.returnClient(coordinatorClient.getAndSet(null)); throw e; + } finally { + ThriftUtil.returnClient(coordinatorClient.getAndSet(null)); } } }); @@ -451,8 +450,7 @@ public class Compactor extends AbstractServer throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); } LOG.info("CompactionCoordinator address is: {}", coordinatorHost); - return ThriftUtil.getClient(new CompactionCoordinator.Client.Factory(), coordinatorHost, - getContext()); + return ThriftUtil.getClient(COORDINATOR_CLIENT_FACTORY, coordinatorHost, getContext()); } /**