This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 0b38147  Return Thrift client to pool in finally block so that the 
connection can be reused.
0b38147 is described below

commit 0b38147676218e7b33bcf9474b0e44880ba61a0b
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Apr 20 12:36:44 2021 +0000

    Return Thrift client to pool in finally block so that the connection can be 
reused.
---
 .../server/compaction/RetryableThriftCall.java         | 10 ++++++----
 .../java/org/apache/accumulo/compactor/Compactor.java  | 18 ++++++++----------
 2 files changed, 14 insertions(+), 14 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
index ecf5428..82108fb 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/RetryableThriftCall.java
@@ -101,7 +101,7 @@ public class RetryableThriftCall<T> {
       try {
         result = function.execute();
       } catch (TException e) {
-        LOG.error("Error in Thrift function, retrying in {}ms. Error: {}", 
waitTime, e);
+        LOG.error("Error in Thrift function, retrying in {}ms. Error: {}", 
waitTime, e, e);
         if (!retryForever) {
           numRetries++;
           if (numRetries > maxNumRetries) {
@@ -110,9 +110,11 @@ public class RetryableThriftCall<T> {
           }
         }
       }
-      UtilWaitThread.sleep(waitTime);
-      if (waitTime != maxWaitTime) {
-        waitTime = Math.min(waitTime * 2, maxWaitTime);
+      if (result == null) {
+        UtilWaitThread.sleep(waitTime);
+        if (waitTime != maxWaitTime) {
+          waitTime = Math.min(waitTime * 2, maxWaitTime);
+        }
       }
     } while (null == result);
     return result;
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 0190820..8246fcc 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -115,7 +115,8 @@ public class Compactor extends AbstractServer
   private static final long TIME_BETWEEN_GC_CHECKS = 5000;
   private static final CompactionJobHolder JOB_HOLDER = new 
CompactionJobHolder();
   private static final long TEN_MEGABYTES = 10485760;
-
+  private static final CompactionCoordinator.Client.Factory 
COORDINATOR_CLIENT_FACTORY = new CompactionCoordinator.Client.Factory();
+  
   private final GarbageCollectionLogger gcLogger = new 
GarbageCollectionLogger();
   private final UUID compactorId = UUID.randomUUID();
   private final AccumuloConfiguration aconf;
@@ -337,9 +338,8 @@ public class Compactor extends AbstractServer
                   getContext().rpcCreds(), job.getExternalCompactionId(), 
state, message,
                   System.currentTimeMillis());
               return "";
-            } catch (TException e) {
+            } finally {
               ThriftUtil.returnClient(coordinatorClient.getAndSet(null));
-              throw e;
             }
           }
         });
@@ -365,9 +365,8 @@ public class Compactor extends AbstractServer
               coordinatorClient.get().compactionFailed(TraceUtil.traceInfo(),
                   getContext().rpcCreds(), job.getExternalCompactionId(), 
job.extent);
               return "";
-            } catch (TException e) {
+            } finally {
               ThriftUtil.returnClient(coordinatorClient.getAndSet(null));
-              throw e;
             }
           }
         });
@@ -395,9 +394,8 @@ public class Compactor extends AbstractServer
               
coordinatorClient.get().compactionCompleted(TraceUtil.traceInfo(),
                   getContext().rpcCreds(), job.getExternalCompactionId(), 
job.extent, stats);
               return "";
-            } catch (TException e) {
+            } finally {
               ThriftUtil.returnClient(coordinatorClient.getAndSet(null));
-              throw e;
             }
           }
         });
@@ -430,8 +428,9 @@ public class Compactor extends AbstractServer
                       eci.toString());
                 } catch (TException e) {
                   currentCompactionId.set(null);
-                  ThriftUtil.returnClient(coordinatorClient.getAndSet(null));
                   throw e;
+                } finally {
+                  ThriftUtil.returnClient(coordinatorClient.getAndSet(null));
                 }
               }
             });
@@ -451,8 +450,7 @@ public class Compactor extends AbstractServer
       throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
     }
     LOG.info("CompactionCoordinator address is: {}", coordinatorHost);
-    return ThriftUtil.getClient(new CompactionCoordinator.Client.Factory(), 
coordinatorHost,
-        getContext());
+    return ThriftUtil.getClient(COORDINATOR_CLIENT_FACTORY, coordinatorHost, 
getContext());
   }
 
   /**

Reply via email to