Repository: incubator-trafodion
Updated Branches:
  refs/heads/master 643f82ba5 -> 67288b3cd


[TRAFODION-2150] and [TRAFODION-2151] TM long prepare and chore thread issues.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/30c8a83d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/30c8a83d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/30c8a83d

Branch: refs/heads/master
Commit: 30c8a83d1512667f5ecfe2c7384372f646f18ae3
Parents: 624d61e
Author: Prashant Vasudev <[email protected]>
Authored: Thu Jan 19 00:11:33 2017 +0000
Committer: Prashant Vasudev <[email protected]>
Committed: Thu Jan 19 00:11:33 2017 +0000

----------------------------------------------------------------------
 .../transactional/TrxRegionEndpoint.java.tmpl   | 49 ++++++++++++++++++--
 .../java/org/trafodion/dtm/HBaseTxClient.java   | 19 ++++++++
 2 files changed, 65 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/30c8a83d/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
index f824a4f..403df5f 100644
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
@@ -93,6 +93,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+#ifdef HDP2.3 HDP2.4 CDH5.5 CDH5.7 APACHE1.2
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.ScheduledChore;
+#endif
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -309,7 +313,7 @@ CoprocessorService, Coprocessor {
   static MemoryUsageChore memoryUsageThread = null;
   Stoppable stoppable = new StoppableImplementation();
   static Stoppable stoppable2 = new StoppableImplementation();
-  private int cleanTimer = 5000; // Five minutes
+  private int cleanTimer = 5000;  // 5 secs overriden by DEFAULT_SLEEP
   private int memoryUsageTimer = 60000; // One minute   
   private int regionState = 0; 
   private Path recoveryTrxPath = null;
@@ -361,6 +365,7 @@ CoprocessorService, Coprocessor {
   private boolean configuredEarlyLogging = false;
   private boolean configuredConflictReinstate = false;
   private static Object zkRecoveryCheckLock = new Object();
+  private static Object choreDetectStaleBranchLock = new Object();
   private static ZooKeeperWatcher zkw1 = null;
   String lv_hostName;
   int lv_port;
@@ -435,6 +440,13 @@ CoprocessorService, Coprocessor {
   public static final int REGION_STATE_START = 2;
 
   public static final String trxkeyEPCPinstance = "EPCPinstance";
+  
+  #ifdef HDP2.3 HDP2.4 CDH5.5 CDH5.7 APACHE1.2
+  static ChoreService s_ChoreService = null;
+  #endif
+  static int txnChoreServiceThreadPoolSize = 1;
+  public static final int DEFAULT_TXN_CHORE_SERVICE_THREAD_POOL_SIZE=5;
+  
   // TBD Maybe we should just use HashMap to improve the performance, 
ConcurrentHashMap could be too strict
   static ConcurrentHashMap<String, Object> transactionsEPCPMap;
   // TrxRegionService methods
@@ -3876,9 +3888,19 @@ CoprocessorService, Coprocessor {
             + this.suppressOutOfOrderProtocolException);
 
         // Start the clean core thread
-          
         this.cleanOldTransactionsThread = new CleanOldTransactionsChore(this, 
cleanTimer, stoppable);
 
+#ifdef HDP2.3 HDP2.4 CDH5.5 CDH5.7 APACHE1.2
+        this.txnChoreServiceThreadPoolSize = 
+                
tmp_env.getConfiguration().getInt("hbase.regionserver.region.transactional.chore_service_thread_pool_size",
+                DEFAULT_TXN_CHORE_SERVICE_THREAD_POOL_SIZE);        
+        if (LOG.isTraceEnabled()) LOG.trace("Transactional chore thread pool 
size setting is " + this.txnChoreServiceThreadPoolSize);
+        
+        if (this.cleanOldTransactionsThread != null) {
+            setupChoreService();
+            s_ChoreService.scheduleChore(this.cleanOldTransactionsThread);
+        }
+#else
         UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
 
           public void uncaughtException(final Thread t, final Throwable e)
@@ -3891,6 +3913,7 @@ CoprocessorService, Coprocessor {
        
         ChoreThread = new Thread(this.cleanOldTransactionsThread);
         Threads.setDaemonThreadRunning(ChoreThread, n + 
".oldTransactionCleaner", handler);
+#endif
 
         // Start the memory usage chore thread if the threshold
         // selected is greater than the default of 100%.   
@@ -4080,6 +4103,22 @@ CoprocessorService, Coprocessor {
 
   }
 
+#ifdef HDP2.3 HDP2.4 CDH5.5
+  private synchronized void setupChoreService() { 
+      if (s_ChoreService == null) {
+    s_ChoreService = new ChoreService("Cleanup ChoreService", 
this.txnChoreServiceThreadPoolSize);  
+      }
+  }
+#endif
+
+#ifdef CDH5.7 APACHE1.2
+  private synchronized void setupChoreService() {
+      if (s_ChoreService == null) {
+    s_ChoreService = new ChoreService("Cleanup ChoreService", 
this.txnChoreServiceThreadPoolSize, true);
+      }
+  }
+#endif
+
   // Internal support methods
    /**
    * Checks if the region is closing and needs to block all activity
@@ -4216,6 +4255,8 @@ CoprocessorService, Coprocessor {
 
   public void choreThreadDetectStaleTransactionBranch() {
 
+      synchronized(choreDetectStaleBranchLock) {
+         
       List<Integer> staleBranchforTMId = new ArrayList<Integer>();
       List<TrxTransactionState> commitPendingCopy = new 
ArrayList<TrxTransactionState>(commitPendingTransactions);
       Map<Long, List<WALEdit>> indoubtTransactionsMap = new TreeMap<Long, 
List<WALEdit>>(indoubtTransactionsById);
@@ -4252,7 +4293,6 @@ CoprocessorService, Coprocessor {
             }
          }
       }
-
       if (!staleBranchforTMId.isEmpty()) {
             for (int i = 0; i < staleBranchforTMId.size(); i++) {
                try {
@@ -4270,6 +4310,8 @@ CoprocessorService, Coprocessor {
       commitPendingCopy.clear();
       indoubtTransactionsMap.clear();
       staleBranchforTMId.clear();
+      
+      } //synchronized
   }
 
   public void createRecoveryzNode(int node, String encodedName, byte [] data) 
throws IOException {
@@ -6678,6 +6720,7 @@ CoprocessorService, Coprocessor {
 
      @Override
      public void stop(String why) {
+       LOG.info("Cleanup Chore thread has stopped: Reason:" + why);
        this.stop = true;
      }
 

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/30c8a83d/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
 
b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
index 1b2ebed..0435175 100644
--- 
a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
+++ 
b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
@@ -1108,9 +1108,28 @@ public class HBaseTxClient {
                             }
                             if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV 
THREAD: in-doubt transaction size " + transactionStates.size());
                             for (Map.Entry<Long, TransactionState> tsEntry : 
transactionStates.entrySet()) {
+                                int isTransactionStillAlive = 0;
                                 TransactionState ts = tsEntry.getValue();
                                 Long txID = ts.getTransactionId();
                                 // TransactionState ts = new 
TransactionState(txID);
+                                
+                                //It is possible for long prepare situations 
that involve multiple DDL
+                                //operations, multiple prompts from RS is 
received. Hence check to see if there
+                                //is a TS object in main TS list and 
transaction is still active.
+                                //Note that tsEntry is local TS object. 
+                                if (hbtx.mapTransactionStates.get(txID) != 
null) {
+                                  if 
(hbtx.mapTransactionStates.get(txID).getStatus().toString().contains("ACTIVE")) 
{
+                                    isTransactionStillAlive = 1;
+                                  }
+                                  if (LOG.isInfoEnabled()) 
+                                  LOG.info("TRAF RCOV THREAD: TID " + txID
+                                            + " still has ts object in TM 
memory with state "
+                                            + 
hbtx.mapTransactionStates.get(txID).getStatus().toString() 
+                                            + " transactionAlive: " + 
isTransactionStillAlive);
+                                  if(isTransactionStillAlive == 1)
+                                    continue; //for loop
+                                }
+                               
                                 try {
                                     audit.getTransactionState(ts);
                                     if 
(ts.getStatus().equals(TransState.STATE_COMMITTED.toString())) {

Reply via email to