Repository: sentry
Updated Branches:
  refs/heads/sentry-ha-redesign 79983e00b -> 4d7c4cede


SENTRY-1821: Transactions could fail to commit to the database under load (Alex 
Kolbasov, reviewed by: Vamsee Yarlagadda ana Na Li)


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/4d7c4ced
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/4d7c4ced
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/4d7c4ced

Branch: refs/heads/sentry-ha-redesign
Commit: 4d7c4cedeef370a60dcfee1cc8a2907bc738e08e
Parents: 79983e0
Author: Alexander Kolbasov <[email protected]>
Authored: Wed Jun 28 11:31:25 2017 -0700
Committer: Alexander Kolbasov <[email protected]>
Committed: Wed Jun 28 11:31:25 2017 -0700

----------------------------------------------------------------------
 .../service/persistent/TransactionManager.java  | 140 +++++++++++--------
 .../sentry/service/thrift/ServiceConstants.java |   2 +-
 2 files changed, 82 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/4d7c4ced/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
index 795e2b0..0a9f3a7 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/TransactionManager.java
@@ -22,9 +22,10 @@ import com.codahale.metrics.Counter;
 import static com.codahale.metrics.MetricRegistry.name;
 import com.codahale.metrics.Timer;
 
+import com.codahale.metrics.Timer.Context;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sentry.core.common.exception.SentryUserException;
-import org.apache.sentry.service.thrift.ServiceConstants;
+import org.apache.sentry.service.thrift.ServiceConstants.ServerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +35,8 @@ import javax.jdo.Transaction;
 
 import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
 
-import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
 
 /**
  * TransactionManager is used for executing the database transaction, it 
supports
@@ -57,11 +59,15 @@ import java.util.List;
  *     <li>Counter for each exception thrown by transaction</li>
  * </ul>
  */
-public class TransactionManager {
+@SuppressWarnings("NestedTryStatement")
+public final class TransactionManager {
 
   private static final Logger LOGGER =
           LoggerFactory.getLogger(TransactionManager.class);
 
+  /** Random number generator for exponential backoff */
+  private static final Random random = new Random();
+
   private final PersistenceManagerFactory pmf;
 
   // Maximum number of retries per call
@@ -70,30 +76,33 @@ public class TransactionManager {
   // Delay (in milliseconds) between retries
   private final int retryWaitTimeMills;
 
+  /** Name for metrics */
+  private static final String TRANSACTIONS = "transactions";
+
   // Transaction timer measures time distribution for all transactions
   private final Timer transactionTimer =
           SentryMetrics.getInstance().
                   getTimer(name(TransactionManager.class,
-                           "transactions"));
+                           TRANSACTIONS));
 
   // Counter for failed transactions
   private final Counter failedTransactionsCount =
           SentryMetrics.getInstance().
                   getCounter(name(TransactionManager.class,
-                             "transactions", "failed"));
+                             TRANSACTIONS, "failed"));
 
   private final Counter retryCount =
           SentryMetrics.getInstance().getCounter(name(TransactionManager.class,
-                  "transactions", "retry"));
+                  TRANSACTIONS, "retry"));
 
   TransactionManager(PersistenceManagerFactory pmf, Configuration conf) {
     this.pmf = pmf;
-    this.transactionRetryMax = conf.getInt(
-        ServiceConstants.ServerConfig.SENTRY_STORE_TRANSACTION_RETRY,
-        ServiceConstants.ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_DEFAULT);
-    this.retryWaitTimeMills = conf.getInt(
-        
ServiceConstants.ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS,
-        
ServiceConstants.ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS_DEFAULT);
+    transactionRetryMax = conf.getInt(
+        ServerConfig.SENTRY_STORE_TRANSACTION_RETRY,
+        ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_DEFAULT);
+    retryWaitTimeMills = conf.getInt(
+        ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS,
+        ServerConfig.SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS_DEFAULT);
   }
 
 
@@ -106,7 +115,7 @@ public class TransactionManager {
    * @return Object with the result of tb.execute()
    */
   public <T> T executeTransaction(TransactionBlock<T> tb) throws Exception {
-    try (Timer.Context context = transactionTimer.time();
+    try (Context context = transactionTimer.time();
          PersistenceManager pm = pmf.getPersistenceManager()) {
       Transaction transaction = pm.currentTransaction();
       transaction.begin();
@@ -139,8 +148,8 @@ public class TransactionManager {
    * @param tbs transaction blocks with code to be executed
    * @return the result of the last result of tb.execute()
    */
-  public <T> T executeTransaction(Iterable<TransactionBlock<T>> tbs) throws 
Exception {
-    try (Timer.Context context = transactionTimer.time();
+  private <T> T executeTransaction(Iterable<TransactionBlock<T>> tbs) throws 
Exception {
+    try (Context context = transactionTimer.time();
          PersistenceManager pm = pmf.getPersistenceManager()) {
       Transaction transaction = pm.currentTransaction();
       transaction.begin();
@@ -173,30 +182,17 @@ public class TransactionManager {
    * @param tb transaction block with code to execute
    * @return Object with the result of tb.execute()
    */
-  public <T> T executeTransactionWithRetry(TransactionBlock<T> tb)
+  @SuppressWarnings("squid:S00112")
+  public <T> T executeTransactionWithRetry(final TransactionBlock<T> tb)
           throws Exception {
-    int retryNum = 0;
-    while (retryNum < transactionRetryMax) {
-      try {
-        return executeTransaction(tb);
-      } catch (SentryUserException e) {
-        // throw the sentry exception without retry
-        throw e;
-      } catch (Exception e) {
-        retryNum++;
-        if (retryNum >= transactionRetryMax) {
-          String message = "The transaction has reached max retry number"
-              + e.getMessage();
-          LOGGER.error(message, e);
-          throw new Exception(message, e);
-        }
-        retryCount.inc();
-        LOGGER.warn("Exception during transaction execution, retrying "
-            + retryNum + "times. The max retry num is: " + 
transactionRetryMax, e);
-        Thread.sleep(retryWaitTimeMills);
-      }
-    }
-    return null;
+    return new ExponentialBackoff().execute(
+            new Callable<T>() {
+              @Override
+              public T call() throws Exception {
+                return executeTransaction(tb);
+              }
+            }
+    );
   }
 
   /**
@@ -206,31 +202,57 @@ public class TransactionManager {
    * execution.
    *
    * @param tbs a list of transaction blocks with code to be executed.
-   * @return the result of the last transaction block execution.
    */
-  public <T> T executeTransactionBlocksWithRetry(Iterable<TransactionBlock<T>> 
tbs)
+  @SuppressWarnings("squid:S00112")
+  <T> void executeTransactionBlocksWithRetry(final 
Iterable<TransactionBlock<T>> tbs)
           throws Exception {
-    int retryNum = 0;
-    while (retryNum < transactionRetryMax) {
-      try {
-        return executeTransaction(tbs);
-      } catch (SentryUserException e) {
-        // throw the sentry exception without retry
-        throw e;
-      } catch (Exception e) {
-        retryNum++;
-        if (retryNum >= transactionRetryMax) {
-          String message = "The transaction has reached max retry number, "
-              + e.getMessage();
-          LOGGER.error(message, e);
-          throw new Exception(message, e);
+    new ExponentialBackoff().execute(
+            new Callable<T>() {
+              @Override
+              public T call() throws Exception {
+                return executeTransaction(tbs);
+              }
+            }
+    );
+  }
+
+  /**
+   * Implementation of exponential backoff with random fuzziness.
+   * On each iteration the backoff time is 1.5 the previous amount plus the
+   * random fuzziness factor which is up to half of the previous amount.
+   */
+  private class ExponentialBackoff {
+
+    @SuppressWarnings("squid:S00112")
+    <T> T execute(Callable<T> arg) throws Exception {
+      Exception ex = null;
+      long sleepTime = retryWaitTimeMills;
+
+      for (int retryNum = 0; retryNum < transactionRetryMax; retryNum++) {
+        try {
+          return arg.call();
+        } catch (SentryUserException e) {
+          // throw the sentry exception without retry
+          throw e;
+        } catch (Exception e) {
+          ex = e;
+          retryCount.inc();
+          LOGGER.warn("Transaction execution encountered exception", e);
+          LOGGER.warn("Retrying transaction {}/{} times",
+                  retryNum, transactionRetryMax);
+          // Introduce some randomness in the backoff time.
+          Thread.sleep(sleepTime);
+          int fuzz = random.nextInt((int)sleepTime / 2);
+          sleepTime *= 3;
+          sleepTime /= 2;
+          sleepTime += fuzz;
         }
-        retryCount.inc();
-        LOGGER.warn("Exception during transaction execution, retrying "
-            + retryNum + "times. The max retry num is: " + 
transactionRetryMax, e);
-        Thread.sleep(retryWaitTimeMills);
       }
+      assert(ex != null);
+      String message = "The transaction has reached max retry number, "
+              + ex.getMessage();
+      LOGGER.error(message, ex);
+      throw new Exception(message, ex);
     }
-    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/4d7c4ced/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
index d3c96fa..4f48167 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java
@@ -92,7 +92,7 @@ public class ServiceConstants {
     // the default value is 500 ms
     public static final String SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS 
=
         "sentry.store.transaction.retry.wait.time.millis";
-    public static final int 
SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS_DEFAULT = 500;
+    public static final int 
SENTRY_STORE_TRANSACTION_RETRY_WAIT_TIME_MILLIS_DEFAULT = 250;
 
     public static final String JAVAX_JDO_URL = 
"javax.jdo.option.ConnectionURL";
     public static final String JAVAX_JDO_USER = 
"javax.jdo.option.ConnectionUserName";

Reply via email to