Fix for JIRA [TRAFODION-1996]
TLOG uses excessive memory when aging old entries


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/6b5da39b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/6b5da39b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/6b5da39b

Branch: refs/heads/master
Commit: 6b5da39b1a2b4359735439ff7095ba719b11786e
Parents: 68653b1
Author: Sean Broeder <[email protected]>
Authored: Thu Jun 9 13:59:37 2016 +0000
Committer: Sean Broeder <[email protected]>
Committed: Thu Jun 9 13:59:37 2016 +0000

----------------------------------------------------------------------
 .../transactional/TrxRegionEndpoint.java.tmpl   |  35 ++++---
 .../trafodion/dtm/HBaseAuditControlPoint.java   |   4 +-
 .../java/org/trafodion/dtm/TmAuditTlog.java     | 101 +++++++++----------
 3 files changed, 71 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/6b5da39b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
index b0e87d3..69ddb68 100644
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
@@ -2184,7 +2184,7 @@ CoprocessorService, Coprocessor {
      }
      catch (Exception e){
         if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries Exception in 
region: "
-           + regionInfo.getRegionNameAsString() + " getting scanner " + e );
+           + regionInfo.getRegionNameAsString() + " getting scanner ", e );
      }
 
      long count = 0L;
@@ -2224,11 +2224,13 @@ CoprocessorService, Coprocessor {
                        StringTokenizer st = new StringTokenizer(valueString, 
",");
                        if (st.hasMoreElements()) {
                           String asnToken = st.nextElement().toString();
-                          String transidToken = st.nextElement().toString();
-                          String stateToken = st.nextElement().toString();
-                          if (LOG.isTraceEnabled()) 
LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries transidToken: "
-                                    + transidToken + " asnToken: " + asnToken);
+                          String transidToken = null;
+                          String stateToken = null;
                           if (Long.parseLong(asnToken) < lvAsn) {
+                             transidToken = st.nextElement().toString();
+                             stateToken = st.nextElement().toString();
+                             if (LOG.isTraceEnabled()) 
LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries transidToken: "
+                                    + transidToken + " asnToken: " + asnToken);
                              if ( (stateToken.contains("FORGOTTEN")) ||
                                   (stateToken.equals("COMMITTED") && 
(lvAgeCommitted)) ||
                                   (stateToken.equals("ABORTED") && 
(lvAgeCommitted))) {
@@ -2238,11 +2240,12 @@ CoprocessorService, Coprocessor {
 
                                 try {
                                    Delete d = new Delete(result.getRow());
+                                   d.setDurability(Durability.SKIP_WAL);
                                    m_Region.delete(d);
                                 }
                                 catch (Exception e) {
                                    LOG.warn("TrxRegionEndpoint coprocessor: 
deleteTlogEntries -"
-                                       + " txId " + transidToken + ", 
Executing delete caught an exception " + e);
+                                       + " txId " + transidToken + ", 
Executing delete caught an exception ", e);
                                    throw new IOException(e.toString());
                                 }
                                 count++;
@@ -2288,15 +2291,17 @@ CoprocessorService, Coprocessor {
                     + e.getMessage() + " " + stackTraceToString(e));
            t = e;
         }
-        if (scanner != null) {
-           try {
-              if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor: deleteTlogEntries - txId "
-                  + transId + ", closing the scanner, region is " + 
regionInfo.getRegionNameAsString());
-              scanner.close();
-           } catch(Exception e) {
-              if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor: deleteTlogEntries -  transaction id "
-                   + transId + ", Caught general exception " + e.getMessage() 
+ " " + stackTraceToString(e));
-              ne = e;
+        finally {
+           if (scanner != null) {
+              try {
+                 if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor: deleteTlogEntries - txId "
+                     + transId + ", closing the scanner, region is " + 
regionInfo.getRegionNameAsString());
+                 scanner.close();
+              } catch(Exception e) {
+                 if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor: deleteTlogEntries -  transaction id "
+                      + transId + ", Caught general exception " + 
e.getMessage() + " " + stackTraceToString(e));
+                 ne = e;
+              }
            }
         }
      }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/6b5da39b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
 
b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
index 11120ef..e50fe3b 100644
--- 
a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
+++ 
b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
@@ -34,6 +34,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -404,7 +406,7 @@ public class HBaseAuditControlPoint {
          table.delete(deleteList);
       }
       catch (Exception e) {
-         LOG.error("deleteAgedRecords IOException");
+         LOG.error("deleteAgedRecords IOException ", e);
       }finally {
          ss.close();
       }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/6b5da39b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java
 
b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java
index a94aa16..11b17ce 100644
--- 
a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java
+++ 
b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java
@@ -194,7 +194,7 @@ public class TmAuditTlog {
         try {
            table = new HTable(location.getRegionInfo().getTable(), connection, 
tlogThreadPool);
         } catch(IOException e) {
-           LOG.error("Error obtaining HTable instance " + e);
+           LOG.error("Error obtaining HTable instance ", e);
            table = null;
         }
         startKey = location.getRegionInfo().getStartKey();
@@ -242,8 +242,9 @@ public class TmAuditTlog {
                          + " startKey: " + new String(startKey, "UTF-8") + " 
endKey: " + new String(endKey, "UTF-8"));
                    result = table.coprocessorService(TrxRegionService.class, 
startKey, endKey, callable);
                  } catch (Throwable e) {
-                   String msg = "ERROR occurred while calling 
deleteTlogEntries coprocessor service in deleteEntriesOlderThanASNX";
-                   LOG.error(msg + ":" + e);
+                   e.printStackTrace();
+                   String msg = new String("ERROR occurred while calling 
deleteTlogEntries coprocessor service in deleteEntriesOlderThanASNX: " + e);
+                   LOG.error(msg);
                    throw new Exception(msg);
                  }
                  if (LOG.isTraceEnabled()) 
LOG.trace("deleteEntriesOlderThanASNX -- after coprocessorService ASN: " + 
auditSeqNum
@@ -267,7 +268,7 @@ public class TmAuditTlog {
                     retry = false;
                  }
               } catch (Exception e) {
-                 LOG.error("deleteEntriesOlderThanASNX retrying due to 
Exception: " + e);
+                 LOG.error("deleteEntriesOlderThanASNX retrying due to 
Exception: ", e);
                  refresh = true;
                  retry = true;
               }
@@ -309,7 +310,7 @@ public class TmAuditTlog {
 
                retrySleep += TLOG_SLEEP_INCR;
             }
-         } while (retryCount < RETRY_ATTEMPTS && retry == true);
+       } while (retryCount < RETRY_ATTEMPTS && retry == true);
        // We have received our reply so decrement outstanding count
        transactionState.requestPendingCountDec(false);
 
@@ -334,7 +335,7 @@ public class TmAuditTlog {
             buffer.add(localPut);
          }
          catch (Exception e) {
-            if (LOG.isDebugEnabled()) LOG.debug("AuditBuffer Exception trying 
bufferAdd" + e);
+            if (LOG.isDebugEnabled()) LOG.debug("AuditBuffer Exception trying 
bufferAdd", e);
             throw e;
          }
          if (LOG.isTraceEnabled()) LOG.trace("BufferAdd end in thread " + 
threadId );
@@ -348,7 +349,7 @@ public class TmAuditTlog {
             lvSize = buffer.size();
          }
          catch (Exception e) {
-            if (LOG.isDebugEnabled()) LOG.debug("AuditBuffer Exception trying 
bufferSize" + e);
+            if (LOG.isDebugEnabled()) LOG.debug("AuditBuffer Exception trying 
bufferSize", e);
             throw e;
          }
          if (LOG.isTraceEnabled()) LOG.trace("AuditBuffer bufferSize end; 
returning " + lvSize + " in thread " 
@@ -363,7 +364,7 @@ public class TmAuditTlog {
             buffer.clear();
          }
          catch (Exception e) {
-            if (LOG.isDebugEnabled()) LOG.debug("Exception trying 
bufferClear.clear" + e);
+            if (LOG.isDebugEnabled()) LOG.debug("Exception trying 
bufferClear.clear", e);
             throw e;
          }
          if (LOG.isTraceEnabled()) LOG.trace("AuditBuffer bufferClear end in 
thread " + threadId);
@@ -522,7 +523,7 @@ public class TmAuditTlog {
           tLogHashShiftFactor = 59;
           break;
         default : {
-          LOG.error("TM_TLOG_NUM_LOGS must b 1 or a power of 2 in the range 
2-32");
+          LOG.error("TM_TLOG_NUM_LOGS must be 1 or a power of 2 in the range 
2-32");
           throw new RuntimeException();
         }
       }
@@ -571,7 +572,7 @@ public class TmAuditTlog {
          tLogControlPoint = new HBaseAuditControlPoint(config);
       }
       catch (Exception e) {
-         LOG.error("Unable to create new HBaseAuditControlPoint object " + e);
+         LOG.error("Unable to create new HBaseAuditControlPoint object ", e);
       }
 
       tlogAuditLock =    new Object[tlogNumLogs];
@@ -615,7 +616,7 @@ public class TmAuditTlog {
             table[i] = new HTable(config, desc.getName());
          }
          catch(Exception e){
-            LOG.error("TmAuditTlog Exception on index " + i + "; " + e);
+            LOG.error("TmAuditTlog Exception on index " + i + "; ", e);
             throw new RuntimeException(e);
          }
 
@@ -718,7 +719,7 @@ public class TmAuditTlog {
          }
          catch (Exception e2){
             // create record of the exception
-            LOG.error("putSingleRecord Exception in recoveryTable" + e2);
+            LOG.error("putSingleRecord Exception in recoveryTable", e2);
             throw e2;
          }
          finally {
@@ -733,7 +734,7 @@ public class TmAuditTlog {
          }
       }
       else {
-         // THis goes to our local TLOG
+         // This goes to our local TLOG
          if (LOG.isTraceEnabled()) LOG.trace("TLOG putSingleRecord 
synchronizing tlogAuditLock[" + lv_lockIndex + "] in thread " + threadId );
          startSynch = System.nanoTime();
          try {
@@ -751,7 +752,7 @@ public class TmAuditTlog {
                }
                catch (Exception e2){
                   // create record of the exception
-                  LOG.error("putSingleRecord Exception " + e2);
+                  LOG.error("putSingleRecord Exception ", e2);
                   e2.printStackTrace();
                   throw e2;
                }
@@ -759,7 +760,7 @@ public class TmAuditTlog {
          }
          catch (Exception e) {
             // create record of the exception
-            LOG.error("Synchronizing on tlogAuditLock[" + lv_lockIndex + "] 
Exception " + e);
+            LOG.error("Synchronizing on tlogAuditLock[" + lv_lockIndex + "] 
Exception ", e);
             e.printStackTrace();
             throw e;
          }
@@ -907,16 +908,16 @@ public class TmAuditTlog {
             if (LOG.isTraceEnabled()) LOG.trace("transid: " + lvTransid + " 
state: " + lvTxState);
          }
          catch (IOException e){
-             LOG.error("getRecord IOException " + e);
+             LOG.error("getRecord IOException ", e);
              throw e;
          }
          catch (Exception e){
-             LOG.error("getRecord Exception " + e);
+             LOG.error("getRecord Exception ", e);
              throw e;
          }
       }
       catch (Exception e2) {
-            LOG.error("getRecord Exception2 " + e2);
+            LOG.error("getRecord Exception2 ", e2);
             e2.printStackTrace();
       }
 
@@ -944,11 +945,11 @@ public class TmAuditTlog {
             lvTxState = st.nextElement().toString();
             if (LOG.isTraceEnabled()) LOG.trace("transid: " + transidToken + " 
state: " + lvTxState);
          } catch (IOException e){
-             LOG.error("getRecord IOException");
+             LOG.error("getRecord IOException: ", e);
              throw e;
          }
       } catch (Exception e){
-             LOG.error("getRecord Exception " + e);
+             LOG.error("getRecord Exception: ", e);
              throw e;
       }
       if (LOG.isTraceEnabled()) LOG.trace("getRecord end; returning String:" + 
lvTxState);
@@ -970,7 +971,7 @@ public class TmAuditTlog {
          table[lv_lockIndex].delete(d);
       }
       catch (Exception e) {
-         LOG.error("deleteRecord Exception " + e );
+         LOG.error("deleteRecord Exception: ", e );
       }
       if (LOG.isTraceEnabled()) LOG.trace("deleteRecord - exit");
       return true;
@@ -1051,7 +1052,7 @@ public class TmAuditTlog {
               }
            }
            catch(Exception e){
-              LOG.error("deleteAgedEntries Exception getting results for table 
" + lv_tLogName + "; " + e);
+              LOG.error("deleteAgedEntries Exception getting results for table 
" + lv_tLogName + "; ", e);
               throw new RuntimeException(e);
            }
            finally {
@@ -1064,13 +1065,13 @@ public class TmAuditTlog {
               deleteTable.delete(deleteList);
            }
            catch(IOException e){
-              LOG.error("deleteAgedEntries Exception deleting from table " + 
lv_tLogName + "; " + e);
+              LOG.error("deleteAgedEntries Exception deleting from table " + 
lv_tLogName + "; ", e);
               throw new RuntimeException(e);
            }
         }
         catch (IOException e) {
            LOG.error("deleteAgedEntries IOException setting up scan on table "
-                   + lv_tLogName + ", Exception: " + e);
+                   + lv_tLogName + ", Exception: ", e);
            e.printStackTrace();
         }
         finally {
@@ -1105,16 +1106,11 @@ public class TmAuditTlog {
             if (value.getStatus().equals("COMMITTED")){
                if (LOG.isTraceEnabled()) LOG.trace("writeControlPointRecords 
adding record for trans (" + transid + ") : state is " + value.getStatus());
                cpWrites++;
-               if (forceControlPoint) {
-                  putSingleRecord(transid, value.getCommitId(), 
value.getStatus(), value.getParticipatingRegions(), true);
-               }
-               else {
-                  putSingleRecord(transid, value.getCommitId(), 
value.getStatus(), value.getParticipatingRegions(), false);
-               }
+               putSingleRecord(transid, value.getCommitId(), 
value.getStatus(), value.getParticipatingRegions(), forceControlPoint);
             }
          }
          catch (Exception ex) {
-            LOG.error("formatRecord Exception " + ex);
+            LOG.error("formatRecord Exception ", ex);
             throw ex;
          }
         }
@@ -1177,7 +1173,7 @@ public class TmAuditTlog {
                      deleteEntriesOlderThanASN(agedAsn, ageCommitted);
                   }
                   catch (Exception e){
-                     LOG.error("deleteAgedEntries Exception " + e);
+                     LOG.error("deleteAgedEntries Exception ", e);
                      throw e;
                   }
                }
@@ -1189,12 +1185,12 @@ public class TmAuditTlog {
                }
             }
             catch (IOException e){
-               LOG.error("addControlPoint IOException " + e);
+               LOG.error("addControlPoint IOException ", e);
                throw e;
             }
          }
       } catch (Exception e){
-          LOG.error("addControlPoint Exception " + e);
+          LOG.error("addControlPoint Exception ", e);
           throw e;
       }
       if (LOG.isTraceEnabled()) LOG.trace("addControlPoint returning " + 
lvCtrlPt);
@@ -1371,7 +1367,7 @@ public class TmAuditTlog {
          }
       }
       catch (Exception e2) {
-            LOG.error("getTransactionState Exception2 " + e2);
+            LOG.error("getTransactionState Exception2 ", e2);
             e2.printStackTrace();
       }
       if (LOG.isTraceEnabled()) LOG.trace("getTransactionState end transid: " 
+ ts.getTransactionId());
@@ -1389,7 +1385,7 @@ public class TmAuditTlog {
    * Purpose : Delete transaction records which are no longer needed
    */
    public void deleteEntriesOlderThanASN(final long pv_ASN, final boolean 
pv_ageCommitted) throws IOException {
-      int loopCount = 0;
+      int loopIndex = 0;
       long threadId = Thread.currentThread().getId();
       // This TransactionState object is just a mechanism to keep track of the 
asynch rpc calls
       // send to regions in order to retrience the desired set of transactions
@@ -1406,9 +1402,11 @@ public class TmAuditTlog {
          for (int index = 0; index < tlogNumLogs; index++) {
             String lv_tLogName = new String("TRAFODION._DTM_.TLOG" + 
String.valueOf(this.dtmid) + "_LOG_" + Integer.toHexString(index));
             regionList = 
connection.locateRegions(TableName.valueOf(lv_tLogName), false, false);
-            loopCount++;
+            loopIndex++;
+            int regionIndex = 0;
             // For every region in this table
             for (HRegionLocation location : regionList) {
+               regionIndex++;
                final byte[] regionName = 
location.getRegionInfo().getRegionName();
                compPool.submit(new TlogCallable(transactionState, location, 
connection) {
                   public Integer call() throws IOException {
@@ -1417,28 +1415,25 @@ public class TmAuditTlog {
                      return deleteEntriesOlderThanASNX(regionName, pv_ASN, 
pv_ageCommitted);
                   }
                });
+               
+               try {
+                   int partialResult = compPool.take().get();
+                   if (LOG.isTraceEnabled()) 
LOG.trace("deleteEntriesOlderThanASN partial result: " + partialResult
+                                     + " loopIndex " + loopIndex + " 
regionIndex " + regionIndex);
+                }
+                catch (Exception e2) {
+                   LOG.error("exception retieving reply in 
deleteEntriesOlderThanASN for interval ASN: " + pv_ASN
+                           + " ", e2);
+                   throw new IOException(e2);
+                }
             }
          }
       } catch (Exception e) {
          LOG.error("exception in deleteEntriesOlderThanASN for ASN: "
-                 + pv_ASN + " " + e);
+                 + pv_ASN + " ", e);
          throw new IOException(e);
       }
-      // all requests sent at this point, can record the count
-      if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN tlog 
callable requests sent to "
-                + loopCount + " tlogs in thread " + threadId);
-      int deleteError = 0;
-      try {
-         for (int loopIndex = 0; loopIndex < loopCount; loopIndex ++) {
-            int partialResult = compPool.take().get();
-            if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN 
partial result: " + partialResult + " loopIndex " + loopIndex);
-         }
-      }
-      catch (Exception e2) {
-         LOG.error("exception retieving replys in deleteEntriesOlderThanASN 
for interval ASN: " + pv_ASN
-                 + " " + e2);
-         throw new IOException(e2);
-      }
+
       if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN tlog 
callable requests completed in thread "
             + threadId);
       return;

Reply via email to