Repository: incubator-trafodion
Updated Branches:
  refs/heads/master fe92c7b6b -> c2d844331


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7f54aa7c/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto 
b/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto
index 33e39fa..eea4129 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto
@@ -268,6 +268,21 @@ message RecoveryRequestResponse {
   optional bool hasException = 3;
 }
 
+message TlogDeleteRequest{
+  required bytes regionName = 1;
+  required int64 transactionId = 2;
+  required Scan scan = 3;
+  required int64 auditSeqNum = 4;
+  required bool ageCommitted = 5;
+}
+
+message TlogDeleteResponse {
+  repeated Result result = 1;
+  required int64  count = 2;
+  optional string exception = 3;
+  optional bool hasException = 4;
+}
+
 message TransactionalAggregateRequest {
   /** The request passed to the TransactionalAggregateService consists of 
three parts
    *  (1) the (canonical) classname of the ColumnInterpreter implementation
@@ -355,6 +370,8 @@ service TrxRegionService {
     returns(PutMultipleTransactionalResponse);
   rpc recoveryRequest(RecoveryRequestRequest)
     returns(RecoveryRequestResponse);
+  rpc deleteTlogEntries(TlogDeleteRequest)
+    returns(TlogDeleteResponse);
   rpc GetMax (TransactionalAggregateRequest)
     returns (TransactionalAggregateResponse);
   rpc GetMin (TransactionalAggregateRequest) 

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7f54aa7c/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java
 
b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java
index cb3ba94..222dd86 100644
--- 
a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java
+++ 
b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.log4j.Logger;
 
+import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.transactional.TransactionManager;
 import org.apache.hadoop.hbase.client.transactional.TransactionState;
@@ -53,6 +55,11 @@ import 
org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
 import 
org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
 import org.apache.hadoop.hbase.client.transactional.TransactionRegionLocation;
 import org.apache.hadoop.hbase.client.transactional.TransState;
+import 
org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteRequest;
+import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteResponse;
+import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -64,8 +71,19 @@ import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
 
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+
 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
+
 import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -82,9 +100,11 @@ import java.util.StringTokenizer;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -100,6 +120,7 @@ public class TmAuditTlog {
    private static final byte[] ASN_STATE = Bytes.toBytes("as");
    private static final byte[] QUAL_TX_STATE = Bytes.toBytes("tx");
    private static HTable[] table;
+   private static HConnection connection;
    private static HBaseAuditControlPoint tLogControlPoint;
    private static long tLogControlPointNum;
    private static long tLogHashKey;
@@ -139,7 +160,9 @@ public class TmAuditTlog {
    private static boolean forceControlPoint;
    private boolean disableBlockCache;
    private boolean controlPointDeferred;
- 
+   private int TlogRetryDelay;
+   private int TlogRetryCount;
+
    private static AtomicLong asn;  // Audit sequence number is the monotonic 
increasing value of the tLog write
 
    private static Object tlogAuditLock[];        // Lock for synchronizing 
access via regions.
@@ -147,26 +170,153 @@ public class TmAuditTlog {
    private static Object tablePutLock;            // Lock for synchronizing 
table.put operations
                                                   // to avoid 
ArrayIndexOutOfBoundsException
    private static byte filler[];
+   public static final int TLOG_SLEEP = 1000;      // One second
+   public static final int TLOG_SLEEP_INCR = 5000; // Five seconds
+   public static final int TLOG_RETRY_ATTEMPTS = 5;
+   private int RETRY_ATTEMPTS;
+
+   /**
+    * tlogThreadPool - pool of thread for asynchronous requests
+    */
+   ExecutorService tlogThreadPool;
+
+   private abstract class TlogCallable implements Callable<Integer>{
+      TransactionState transactionState;
+      HRegionLocation  location;
+      HTable table;
+      byte[] startKey;
+      byte[] endKey_orig;
+      byte[] endKey;
+
+     TlogCallable(TransactionState txState, HRegionLocation location, 
HConnection connection) {
+        transactionState = txState;
+        this.location = location;
+        try {
+           table = new HTable(location.getRegionInfo().getTable(), connection, 
tlogThreadPool);
+        } catch(IOException e) {
+           LOG.error("Error obtaining HTable instance " + e);
+           table = null;
+        }
+        startKey = location.getRegionInfo().getStartKey();
+        endKey_orig = location.getRegionInfo().getEndKey();
+        endKey = TransactionManager.binaryIncrementPos(endKey_orig, -1);
+    }
+
+    public Integer deleteEntriesOlderThanASNX(final byte[] regionName, final 
long auditSeqNum, final boolean pv_ageCommitted) throws IOException {
+       long threadId = Thread.currentThread().getId();
+       if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX -- 
ENTRY auditSeqNum: "
+            + auditSeqNum + ", thread " + threadId);
+       boolean retry = false;
+       boolean refresh = false;
+       final Scan scan = new Scan(startKey, endKey);
+
+       int retryCount = 0;
+       int retrySleep = TLOG_SLEEP;
+       do {
+          try {
+             if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX 
-- ENTRY ASN: " + auditSeqNum);
+             Batch.Call<TrxRegionService, TlogDeleteResponse> callable =
+                new Batch.Call<TrxRegionService, TlogDeleteResponse>() {
+                  ServerRpcController controller = new ServerRpcController();
+                  BlockingRpcCallback<TlogDeleteResponse> rpcCallback =
+                      new BlockingRpcCallback<TlogDeleteResponse>();
+
+                     @Override
+                     public TlogDeleteResponse call(TrxRegionService instance) 
throws IOException {
+                        
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteRequest.Builder
+                        builder = TlogDeleteRequest.newBuilder();
+                        builder.setAuditSeqNum(auditSeqNum);
+                        
builder.setTransactionId(transactionState.getTransactionId());
+                        builder.setScan(ProtobufUtil.toScan(scan));
+                        
builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName))); 
//ByteString.copyFromUtf8(Bytes.toString(regionName)));
+                        builder.setAgeCommitted(pv_ageCommitted); 
+
+                        instance.deleteTlogEntries(controller, 
builder.build(), rpcCallback);
+                        return rpcCallback.get();
+                    }
+                 };
+
+                 Map<byte[], TlogDeleteResponse> result = null;
+                 try {
+                   if (LOG.isTraceEnabled()) 
LOG.trace("deleteEntriesOlderThanASNX -- before coprocessorService ASN: " + 
auditSeqNum
+                         + " startKey: " + new String(startKey, "UTF-8") + " 
endKey: " + new String(endKey, "UTF-8"));
+                   result = table.coprocessorService(TrxRegionService.class, 
startKey, endKey, callable);
+                 } catch (Throwable e) {
+                   String msg = "ERROR occurred while calling 
deleteTlogEntries coprocessor service in deleteEntriesOlderThanASNX";
+                   LOG.error(msg + ":" + e);
+                   throw new Exception(msg);
+                 }
+                 if (LOG.isTraceEnabled()) 
LOG.trace("deleteEntriesOlderThanASNX -- after coprocessorService ASN: " + 
auditSeqNum
+                         + " startKey: " + new String(startKey, "UTF-8") + " 
result size: " + result.size());
+
+                 if(result.size() != 1) {
+                    LOG.error("deleteEntriesOlderThanASNX, received incorrect 
result size: " + result.size() + " ASN: " + auditSeqNum);
+                    throw new Exception("Wrong result size in 
deleteEntriesOlderThanASNX");
+                 }
+                 else {
+                    // size is 1
+                    for (TlogDeleteResponse TD_response : result.values()){
+                       if(TD_response.getHasException()) {
+                          if (LOG.isTraceEnabled()) 
LOG.trace("deleteEntriesOlderThanASNX coprocessor exception: "
+                               + TD_response.getException());
+                          throw new Exception(TD_response.getException());
+                       }
+                       if (LOG.isTraceEnabled()) 
LOG.trace("deleteEntriesOlderThanASNX coprocessor deleted count: "
+                               + TD_response.getCount());
+                    }
+                    retry = false;
+                 }
+              } catch (Exception e) {
+                 LOG.error("deleteEntriesOlderThanASNX retrying due to 
Exception: " + e);
+                 refresh = true;
+                 retry = true;
+              }
+              if (refresh) {
+
+               HRegionLocation lv_hrl = table.getRegionLocation(startKey);
+               HRegionInfo     lv_hri = lv_hrl.getRegionInfo();
+               String          lv_node = lv_hrl.getHostname();
+               int             lv_length = lv_node.indexOf('.');
+
+               if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX 
-- location being refreshed : "
+                    + location.getRegionInfo().getRegionNameAsString() + 
"endKey: "
+                    + 
Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for ASN: " + 
auditSeqNum);
+               if(retryCount == RETRY_ATTEMPTS) {
+                  LOG.error("Exceeded retry attempts (" + retryCount + ") in 
deleteEntriesOlderThanASNX for ASN: " + auditSeqNum);
+                  // We have received our reply in the form of an exception,
+                  // so decrement outstanding count and wake up waiters to 
avoid
+                  // getting hung forever
+                  transactionState.requestPendingCountDec(true);
+                  throw new IOException("Exceeded retry attempts (" + 
retryCount + ") in deleteEntriesOlderThanASNX for ASN: " + auditSeqNum);
+               }
+
+               if (LOG.isWarnEnabled()) LOG.warn("deleteEntriesOlderThanASNX 
-- " + table.toString() + " location being refreshed");
+               if (LOG.isWarnEnabled()) LOG.warn("deleteEntriesOlderThanASNX 
-- lv_hri: " + lv_hri);
+               if (LOG.isWarnEnabled()) LOG.warn("deleteEntriesOlderThanASNX 
-- location.getRegionInfo(): " + location.getRegionInfo());
+               table.getRegionLocation(startKey, true);
+
+               if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX 
-- setting retry, count: " + retryCount);
+               refresh = false;
+            }
+            retryCount++;
+
+            if (retryCount < RETRY_ATTEMPTS && retry == true) {
+               try {
+                  Thread.sleep(retrySleep);
+               } catch(InterruptedException ex) {
+                  Thread.currentThread().interrupt();
+               }
 
-   public static final int TM_TX_STATE_NOTX = 0; //S0 - NOTX
-   public static final int TM_TX_STATE_ACTIVE = 1; //S1 - ACTIVE
-   public static final int TM_TX_STATE_FORGOTTEN = 2; //N/A
-   public static final int TM_TX_STATE_COMMITTED = 3; //N/A
-   public static final int TM_TX_STATE_ABORTING = 4; //S4 - ROLLBACK
-   public static final int TM_TX_STATE_ABORTED = 5; //S4 - ROLLBACK
-   public static final int TM_TX_STATE_COMMITTING = 6; //S3 - PREPARED
-   public static final int TM_TX_STATE_PREPARING = 7; //S2 - IDLE
-   public static final int TM_TX_STATE_FORGETTING = 8; //N/A
-   public static final int TM_TX_STATE_PREPARED = 9; //S3 - PREPARED XARM 
Branches only!
-   public static final int TM_TX_STATE_FORGETTING_HEUR = 10; //S5 - HEURISTIC
-   public static final int TM_TX_STATE_BEGINNING = 11; //S1 - ACTIVE
-   public static final int TM_TX_STATE_HUNGCOMMITTED = 12; //N/A
-   public static final int TM_TX_STATE_HUNGABORTED = 13; //S4 - ROLLBACK
-   public static final int TM_TX_STATE_IDLE = 14; //S2 - IDLE XARM Branches 
only!
-   public static final int TM_TX_STATE_FORGOTTEN_HEUR = 15; //S5 - HEURISTIC - 
Waiting Superior TM xa_forget request
-   public static final int TM_TX_STATE_ABORTING_PART2 = 16; // Internal State
-   public static final int TM_TX_STATE_TERMINATING = 17;
-   public static final int TM_TX_STATE_LAST = 17;
+               retrySleep += TLOG_SLEEP_INCR;
+            }
+         } while (retryCount < RETRY_ATTEMPTS && retry == true);
+       // We have received our reply so decrement outstanding count
+       transactionState.requestPendingCountDec(false);
+
+       if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX -- EXIT 
ASN: " + auditSeqNum);
+       return 0;
+     } //getTransactionStatesFromIntervalX
+   } // TlogCallable
 
    private class AuditBuffer{
       private ArrayList<Put> buffer;           // Each Put is an audit record
@@ -241,6 +391,12 @@ public class TmAuditTlog {
       if (LOG.isTraceEnabled()) LOG.trace("Enter TmAuditTlog constructor for 
dtmid " + dtmid);
       TLOG_TABLE_NAME = config.get("TLOG_TABLE_NAME");
       int fillerSize = 2;
+      int intThreads = 16;
+      String numThreads = System.getenv("TM_JAVA_THREAD_POOL_SIZE");
+      if (numThreads != null){
+         intThreads = Integer.parseInt(numThreads);
+      }
+      tlogThreadPool = Executors.newFixedThreadPool(intThreads);
       controlPointDeferred = false;
 
       forceControlPoint = false;
@@ -282,7 +438,7 @@ public class TmAuditTlog {
       }
       LOG.info("ageCommitted is " + ageCommitted);
 
-      versions = 5;
+      versions = 10;
       try {
          String maxVersions = System.getenv("TM_TLOG_MAX_VERSIONS");
          if (maxVersions != null){
@@ -293,6 +449,30 @@ public class TmAuditTlog {
          if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_MAX_VERSIONS is not in 
ms.env");
       }
 
+      TlogRetryDelay = 5000; // 3 seconds
+      try {
+         String retryDelayS = System.getenv("TM_TLOG_RETRY_DELAY");
+         if (retryDelayS != null){
+            TlogRetryDelay = (Integer.parseInt(retryDelayS) > TlogRetryDelay ? 
Integer.parseInt(retryDelayS) : TlogRetryDelay);
+         }
+      }
+      catch (Exception e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_RETRY_DELAY is not in 
ms.env");
+      }
+
+      TlogRetryCount = 60;
+      try {
+         String retryCountS = System.getenv("TM_TLOG_RETRY_COUNT");
+         if (retryCountS != null){
+            TlogRetryCount = (Integer.parseInt(retryCountS) > TlogRetryCount ? 
Integer.parseInt(retryCountS) : TlogRetryCount);
+         }
+      }
+      catch (Exception e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_RETRY_COUNT is not in 
ms.env");
+      }
+
+      connection = HConnectionManager.createConnection(config);
+
       tlogNumLogs = 1;
       try {
          String numLogs = System.getenv("TM_TLOG_NUM_LOGS");
@@ -419,7 +599,7 @@ public class TmAuditTlog {
          HTableDescriptor desc = new 
HTableDescriptor(TableName.valueOf(lv_tLogName));
          desc.addFamily(hcol);
 
-          if (lvTlogExists == false) {
+         if (lvTlogExists == false) {
             // Need to prime the asn for future writes
             try {
                if (LOG.isTraceEnabled()) LOG.trace("Creating the table " + 
lv_tLogName);
@@ -497,16 +677,14 @@ public class TmAuditTlog {
                tableString.append(name);
             }
          }
-         if (LOG.isTraceEnabled()) LOG.trace("table names: " + 
tableString.toString());
+         if (LOG.isTraceEnabled()) LOG.trace("table names: " + 
tableString.toString() + " in thread " + threadId);
       }
       //Create the Put as directed by the hashed key boolean
-      Put p;
-
       //create our own hashed key
       long key = (((lvTransid & tLogHashKey) << tLogHashShiftFactor) + 
(lvTransid & 0xFFFFFFFF));
       lv_lockIndex = (int)(lvTransid & tLogHashKey);
       if (LOG.isTraceEnabled()) LOG.trace("key: " + key + ", hex: " + 
Long.toHexString(key) + ", transid: " +  lvTransid);
-      p = new Put(Bytes.toBytes(key));
+      Put p = new Put(Bytes.toBytes(key));
 
       if (recoveryASN == -1){
          // This is a normal audit record so we manage the ASN
@@ -516,11 +694,12 @@ public class TmAuditTlog {
          // This is a recovery audit record so use the ASN passed in
          lvAsn = recoveryASN;
       }
-      if (LOG.isTraceEnabled()) LOG.trace("transid: " + lvTransid + " state: " 
+ lvTxState + " ASN: " + lvAsn);
+      if (LOG.isTraceEnabled()) LOG.trace("transid: " + lvTransid + " state: " 
+ lvTxState + " ASN: " + lvAsn
+              + " in thread " + threadId);
       p.add(TLOG_FAMILY, ASN_STATE, Bytes.toBytes(String.valueOf(lvAsn) + ","
-                       + transidString + "," + lvTxState
+                       + String.valueOf(lvTransid) + "," + lvTxState
                        + "," + Bytes.toString(filler)
-                       + "," + commitIdString
+                       + "," + String.valueOf(lvCommitId)
                        + "," + tableString.toString()));
 
 
@@ -538,7 +717,6 @@ public class TmAuditTlog {
          catch (Exception e2){
             // create record of the exception
             LOG.error("putSingleRecord Exception in recoveryTable" + e2);
-            e2.printStackTrace();
             throw e2;
          }
          finally {
@@ -727,11 +905,11 @@ public class TmAuditTlog {
             if (LOG.isTraceEnabled()) LOG.trace("transid: " + lvTransid + " 
state: " + lvTxState);
          }
          catch (IOException e){
-             LOG.error("getRecord IOException");
+             LOG.error("getRecord IOException " + e);
              throw e;
          }
          catch (Exception e){
-             LOG.error("getRecord Exception");
+             LOG.error("getRecord Exception " + e);
              throw e;
          }
       }
@@ -803,10 +981,12 @@ public class TmAuditTlog {
          String lv_tLogName = new String(TLOG_TABLE_NAME + "_LOG_" + 
Integer.toHexString(i));
 //         Connection deleteConnection = 
ConnectionFactory.createConnection(this.config);
 
+         if (LOG.isTraceEnabled()) LOG.trace("delete table is: " + 
lv_tLogName);
          HConnection deleteConnection = 
HConnectionManager.createConnection(this.config);
 
          deleteTable = 
deleteConnection.getTable(TableName.valueOf(lv_tLogName));
          try {
+            boolean scanComplete = false;
             Scan s = new Scan();
             s.setCaching(100);
             s.setCacheBlocks(false);
@@ -816,8 +996,9 @@ public class TmAuditTlog {
             try {
                for (Result r : ss) {
                   for (Cell cell : r.rawCells()) {
-                     String valueString = new 
String(CellUtil.cloneValue(cell));
-                     StringTokenizer st = new StringTokenizer(valueString, 
",");
+                     StringTokenizer st =
+                        new 
StringTokenizer(Bytes.toString(CellUtil.cloneValue(cell)), ",");
+                     if (LOG.isTraceEnabled()) LOG.trace("string tokenizer 
success ");
                      if (st.hasMoreElements()) {
                         String asnToken = st.nextElement().toString() ;
                         String transidToken = st.nextElement().toString() ;
@@ -831,21 +1012,17 @@ public class TmAuditTlog {
                         else if ((Long.parseLong(asnToken) < lvAsn) &&
                                 (stateToken.equals("COMMITTED") || 
stateToken.equals("ABORTED"))) {
                            if (ageCommitted) {
-                              String rowKey = new String(r.getRow());
                               Delete del = new Delete(r.getRow());
                               if (LOG.isTraceEnabled()) LOG.trace("adding 
transid: " + transidToken + " to delete list");
                               deleteList.add(del);
                            }
                            else {
-                              String key = new String(r.getRow());
                               Get get = new Get(r.getRow());
                               get.setMaxVersions(versions);  // will return 
last n versions of row
                               Result lvResult = deleteTable.get(get);
-                             // byte[] b = lvResult.getValue(TLOG_FAMILY, 
ASN_STATE);  // returns current version of value
                               List<Cell> list = 
lvResult.getColumnCells(TLOG_FAMILY, ASN_STATE);  // returns all versions of 
this column
                               for (Cell element : list) {
-                                 String value = new 
String(CellUtil.cloneValue(element));
-                                 StringTokenizer stok = new 
StringTokenizer(value, ",");
+                                 StringTokenizer stok = new 
StringTokenizer(Bytes.toString(CellUtil.cloneValue(element)), ",");
                                  if (stok.hasMoreElements()) {
                                     if (LOG.isTraceEnabled()) 
LOG.trace("Performing secondary search on (" + transidToken + ")");
                                     asnToken = stok.nextElement().toString() ;
@@ -877,9 +1054,11 @@ public class TmAuditTlog {
               throw new RuntimeException(e);
            }
            finally {
+              if (LOG.isTraceEnabled()) LOG.trace("deleteAgedEntries closing 
ResultScanner");
               ss.close();
            }
-           if (LOG.isTraceEnabled()) LOG.trace("attempting to delete list with 
" + deleteList.size() + " elements");
+           if (LOG.isTraceEnabled()) LOG.trace("attempting to delete list with 
" + deleteList.size()
+                   + " elements from table " + lv_tLogName);
            try {
               deleteTable.delete(deleteList);
            }
@@ -889,7 +1068,8 @@ public class TmAuditTlog {
            }
         }
         catch (IOException e) {
-           LOG.error("deleteAgedEntries IOException setting up scan on table 
index " + i);
+           LOG.error("deleteAgedEntries IOException setting up scan on table 
index "
+                   + i + ", Exception: " + e);
            e.printStackTrace();
         }
         finally {
@@ -913,7 +1093,8 @@ public class TmAuditTlog {
       long startTime = System.nanoTime();
       long endTime;
 
-      if (LOG.isTraceEnabled()) LOG.trace("writeControlPointRecords start with 
map size " + map.size());
+      if (LOG.isTraceEnabled()) LOG.trace("Tlog " + getTlogTableNameBase()
+           + " writeControlPointRecords start with map size " + map.size());
 
       try {
         for (Map.Entry<Long, TransactionState> e : map.entrySet()) {
@@ -934,7 +1115,6 @@ public class TmAuditTlog {
          }
          catch (Exception ex) {
             LOG.error("formatRecord Exception " + ex);
-            ex.printStackTrace();
             throw ex;
          }
         }
@@ -956,7 +1136,6 @@ public class TmAuditTlog {
 
    }
 
-
    public long addControlPoint (final Map<Long, TransactionState> map) throws 
IOException, Exception {
       if (LOG.isTraceEnabled()) LOG.trace("addControlPoint start with map size 
" + map.size());
       long lvCtrlPt = 0L;
@@ -983,6 +1162,7 @@ public class TmAuditTlog {
 
       try {
          lvAsn = asn.getAndIncrement();
+         if (LOG.isTraceEnabled()) LOG.trace("lvAsn reset to: " + lvAsn);
 
          // Write the control point interval and the ASN to the control point 
table
          lvCtrlPt = tLogControlPoint.doControlPoint(lvAsn); 
@@ -990,10 +1170,11 @@ public class TmAuditTlog {
          if ((lvCtrlPt - 5) > 0){  // We'll keep 5 control points of audit
             try {
                agedAsn = tLogControlPoint.getRecord(String.valueOf(lvCtrlPt - 
5));
-               if ((agedAsn > 0) && (lvCtrlPt % 5 == 0)){
+               if (agedAsn > 0){
                   try {
                      if (LOG.isTraceEnabled()) LOG.trace("Attempting to remove 
TLOG writes older than asn " + agedAsn);
-                     deleteAgedEntries(agedAsn);
+//                     deleteAgedEntries(agedAsn);
+                     deleteEntriesOlderThanASN(agedAsn, ageCommitted);
                   }
                   catch (Exception e){
                      LOG.error("deleteAgedEntries Exception " + e);
@@ -1008,14 +1189,12 @@ public class TmAuditTlog {
                }
             }
             catch (IOException e){
-               LOG.error("addControlPoint IOException");
-               e.printStackTrace();
+               LOG.error("addControlPoint IOException " + e);
                throw e;
             }
          }
       } catch (Exception e){
           LOG.error("addControlPoint Exception " + e);
-          e.printStackTrace();
           throw e;
       }
       if (LOG.isTraceEnabled()) LOG.trace("addControlPoint returning " + 
lvCtrlPt);
@@ -1032,6 +1211,8 @@ public class TmAuditTlog {
       int lv_ownerNid = (int)(lvTransid >> 32);
       int lv_lockIndex = (int)(lvTransid & tLogHashKey);
       String lv_tLogName = new String("TRAFODION._DTM_.TLOG" + 
String.valueOf(lv_ownerNid) + "_LOG_" + Integer.toHexString(lv_lockIndex));
+      if (LOG.isTraceEnabled()) LOG.trace("getTransactionState reading from: " 
+ lv_tLogName);
+
       HConnection unknownTableConnection = 
HConnectionManager.createConnection(this.config);
       unknownTransactionTable = 
unknownTableConnection.getTable(TableName.valueOf(lv_tLogName));
 
@@ -1196,6 +1377,73 @@ public class TmAuditTlog {
       }
       if (LOG.isTraceEnabled()) LOG.trace("getTransactionState end transid: " 
+ ts.getTransactionId());
       return;
-   } 
-}
+   }
+
+   public String getTlogTableNameBase(){
+      return TLOG_TABLE_NAME;
+   }
 
+   /**
+   * Method  : deleteEntriesOlderThanASN
+   * Params  : pv_ASN  - ASN before which all audit records will be deleted
+   * Return  : void
+   * Purpose : Delete transaction records which are no longer needed
+   */
+   public void deleteEntriesOlderThanASN(final long pv_ASN, final boolean 
pv_ageCommitted) throws IOException {
+      int loopCount = 0;
+      long threadId = Thread.currentThread().getId();
+      // This TransactionState object is just a mechanism to keep track of the 
asynch rpc calls
+      // send to regions in order to retrience the desired set of transactions
+      TransactionState transactionState = new TransactionState(0);
+      CompletionService<Integer> compPool = new 
ExecutorCompletionService<Integer>(tlogThreadPool);
+      HConnection targetTableConnection = 
HConnectionManager.createConnection(this.config);
+
+      try {
+         if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN: "
+              + pv_ASN + ", in thread: " + threadId);
+         HTableInterface targetTable;
+         List<HRegionLocation> regionList;
+
+         // For every Tlog table for this node
+         for (int index = 0; index < tlogNumLogs; index++) {
+            String lv_tLogName = new String("TRAFODION._DTM_.TLOG" + 
String.valueOf(this.dtmid) + "_LOG_" + Integer.toHexString(index));
+            regionList = 
targetTableConnection.locateRegions(TableName.valueOf(lv_tLogName), false, 
false);
+            loopCount++;
+            // For every region in this table
+            for (HRegionLocation location : regionList) {
+               final byte[] regionName = 
location.getRegionInfo().getRegionName();
+               compPool.submit(new TlogCallable(transactionState, location, 
connection) {
+                  public Integer call() throws IOException {
+                     if (LOG.isTraceEnabled()) LOG.trace("before 
deleteEntriesOlderThanASNX() ASN: "
+                         + pv_ASN);
+                     return deleteEntriesOlderThanASNX(regionName, pv_ASN, 
pv_ageCommitted);
+                  }
+               });
+            }
+         }
+      } catch (Exception e) {
+         LOG.error("exception in deleteEntriesOlderThanASN for ASN: "
+                 + pv_ASN + " " + e);
+         throw new IOException(e);
+      }
+      // all requests sent at this point, can record the count
+      if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN tlog 
callable requests sent to "
+                + loopCount + " tlogs in thread " + threadId);
+      int deleteError = 0;
+      try {
+         for (int loopIndex = 0; loopIndex < loopCount; loopIndex ++) {
+            int partialResult = compPool.take().get();
+            if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN 
partial result: " + partialResult + " loopIndex " + loopIndex);
+         }
+      }
+      catch (Exception e2) {
+         LOG.error("exception retieving replys in deleteEntriesOlderThanASN 
for interval ASN: " + pv_ASN
+                 + " " + e2);
+         throw new IOException(e2);
+      }
+      HConnectionManager.deleteStaleConnection(targetTableConnection);
+      if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN tlog 
callable requests completed in thread "
+            + threadId);
+      return;
+  }
+}

Reply via email to