Repository: incubator-trafodion Updated Branches: refs/heads/master fe92c7b6b -> c2d844331
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7f54aa7c/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto b/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto index 33e39fa..eea4129 100755 --- a/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto +++ b/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto @@ -268,6 +268,21 @@ message RecoveryRequestResponse { optional bool hasException = 3; } +message TlogDeleteRequest{ + required bytes regionName = 1; + required int64 transactionId = 2; + required Scan scan = 3; + required int64 auditSeqNum = 4; + required bool ageCommitted = 5; +} + +message TlogDeleteResponse { + repeated Result result = 1; + required int64 count = 2; + optional string exception = 3; + optional bool hasException = 4; +} + message TransactionalAggregateRequest { /** The request passed to the TransactionalAggregateService consists of three parts * (1) the (canonical) classname of the ColumnInterpreter implementation @@ -355,6 +370,8 @@ service TrxRegionService { returns(PutMultipleTransactionalResponse); rpc recoveryRequest(RecoveryRequestRequest) returns(RecoveryRequestResponse); + rpc deleteTlogEntries(TlogDeleteRequest) + returns(TlogDeleteResponse); rpc GetMax (TransactionalAggregateRequest) returns (TransactionalAggregateResponse); rpc GetMin (TransactionalAggregateRequest) http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7f54aa7c/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java index cb3ba94..222dd86 100644 --- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java +++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java @@ -28,6 +28,7 @@ import java.io.IOException; import org.apache.log4j.PropertyConfigurator; import org.apache.log4j.Logger; +import org.apache.commons.codec.binary.Hex; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.transactional.TransactionManager; import org.apache.hadoop.hbase.client.transactional.TransactionState; @@ -53,6 +55,11 @@ import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException; import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger; import org.apache.hadoop.hbase.client.transactional.TransactionRegionLocation; import org.apache.hadoop.hbase.client.transactional.TransState; +import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteRequest; +import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteResponse; +import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -64,8 +71,19 @@ import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; + +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; + import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; +import com.google.protobuf.ByteString; +import com.google.protobuf.HBaseZeroCopyByteString; + import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; @@ -82,9 +100,11 @@ import java.util.StringTokenizer; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -100,6 +120,7 @@ public class TmAuditTlog { private static final byte[] ASN_STATE = Bytes.toBytes("as"); private static final byte[] QUAL_TX_STATE = Bytes.toBytes("tx"); private static HTable[] table; + private static HConnection connection; private static HBaseAuditControlPoint tLogControlPoint; private static long tLogControlPointNum; private static long tLogHashKey; @@ -139,7 +160,9 @@ public class TmAuditTlog { private static boolean forceControlPoint; private boolean disableBlockCache; private boolean controlPointDeferred; - + private int TlogRetryDelay; + private int TlogRetryCount; + private static AtomicLong asn; // Audit sequence number is the monotonic increasing value of the tLog write private static Object tlogAuditLock[]; // Lock for synchronizing access via regions. @@ -147,26 +170,153 @@ public class TmAuditTlog { private static Object tablePutLock; // Lock for synchronizing table.put operations // to avoid ArrayIndexOutOfBoundsException private static byte filler[]; + public static final int TLOG_SLEEP = 1000; // One second + public static final int TLOG_SLEEP_INCR = 5000; // Five seconds + public static final int TLOG_RETRY_ATTEMPTS = 5; + private int RETRY_ATTEMPTS; + + /** + * tlogThreadPool - pool of thread for asynchronous requests + */ + ExecutorService tlogThreadPool; + + private abstract class TlogCallable implements Callable<Integer>{ + TransactionState transactionState; + HRegionLocation location; + HTable table; + byte[] startKey; + byte[] endKey_orig; + byte[] endKey; + + TlogCallable(TransactionState txState, HRegionLocation location, HConnection connection) { + transactionState = txState; + this.location = location; + try { + table = new HTable(location.getRegionInfo().getTable(), connection, tlogThreadPool); + } catch(IOException e) { + LOG.error("Error obtaining HTable instance " + e); + table = null; + } + startKey = location.getRegionInfo().getStartKey(); + endKey_orig = location.getRegionInfo().getEndKey(); + endKey = TransactionManager.binaryIncrementPos(endKey_orig, -1); + } + + public Integer deleteEntriesOlderThanASNX(final byte[] regionName, final long auditSeqNum, final boolean pv_ageCommitted) throws IOException { + long threadId = Thread.currentThread().getId(); + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX -- ENTRY auditSeqNum: " + + auditSeqNum + ", thread " + threadId); + boolean retry = false; + boolean refresh = false; + final Scan scan = new Scan(startKey, endKey); + + int retryCount = 0; + int retrySleep = TLOG_SLEEP; + do { + try { + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX -- ENTRY ASN: " + auditSeqNum); + Batch.Call<TrxRegionService, TlogDeleteResponse> callable = + new Batch.Call<TrxRegionService, TlogDeleteResponse>() { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<TlogDeleteResponse> rpcCallback = + new BlockingRpcCallback<TlogDeleteResponse>(); + + @Override + public TlogDeleteResponse call(TrxRegionService instance) throws IOException { + org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteRequest.Builder + builder = TlogDeleteRequest.newBuilder(); + builder.setAuditSeqNum(auditSeqNum); + builder.setTransactionId(transactionState.getTransactionId()); + builder.setScan(ProtobufUtil.toScan(scan)); + builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName))); //ByteString.copyFromUtf8(Bytes.toString(regionName))); + builder.setAgeCommitted(pv_ageCommitted); + + instance.deleteTlogEntries(controller, builder.build(), rpcCallback); + return rpcCallback.get(); + } + }; + + Map<byte[], TlogDeleteResponse> result = null; + try { + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX -- before coprocessorService ASN: " + auditSeqNum + + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8")); + result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable); + } catch (Throwable e) { + String msg = "ERROR occurred while calling deleteTlogEntries coprocessor service in deleteEntriesOlderThanASNX"; + LOG.error(msg + ":" + e); + throw new Exception(msg); + } + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX -- after coprocessorService ASN: " + auditSeqNum + + " startKey: " + new String(startKey, "UTF-8") + " result size: " + result.size()); + + if(result.size() != 1) { + LOG.error("deleteEntriesOlderThanASNX, received incorrect result size: " + result.size() + " ASN: " + auditSeqNum); + throw new Exception("Wrong result size in deleteEntriesOlderThanASNX"); + } + else { + // size is 1 + for (TlogDeleteResponse TD_response : result.values()){ + if(TD_response.getHasException()) { + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX coprocessor exception: " + + TD_response.getException()); + throw new Exception(TD_response.getException()); + } + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX coprocessor deleted count: " + + TD_response.getCount()); + } + retry = false; + } + } catch (Exception e) { + LOG.error("deleteEntriesOlderThanASNX retrying due to Exception: " + e); + refresh = true; + retry = true; + } + if (refresh) { + + HRegionLocation lv_hrl = table.getRegionLocation(startKey); + HRegionInfo lv_hri = lv_hrl.getRegionInfo(); + String lv_node = lv_hrl.getHostname(); + int lv_length = lv_node.indexOf('.'); + + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX -- location being refreshed : " + + location.getRegionInfo().getRegionNameAsString() + "endKey: " + + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for ASN: " + auditSeqNum); + if(retryCount == RETRY_ATTEMPTS) { + LOG.error("Exceeded retry attempts (" + retryCount + ") in deleteEntriesOlderThanASNX for ASN: " + auditSeqNum); + // We have received our reply in the form of an exception, + // so decrement outstanding count and wake up waiters to avoid + // getting hung forever + transactionState.requestPendingCountDec(true); + throw new IOException("Exceeded retry attempts (" + retryCount + ") in deleteEntriesOlderThanASNX for ASN: " + auditSeqNum); + } + + if (LOG.isWarnEnabled()) LOG.warn("deleteEntriesOlderThanASNX -- " + table.toString() + " location being refreshed"); + if (LOG.isWarnEnabled()) LOG.warn("deleteEntriesOlderThanASNX -- lv_hri: " + lv_hri); + if (LOG.isWarnEnabled()) LOG.warn("deleteEntriesOlderThanASNX -- location.getRegionInfo(): " + location.getRegionInfo()); + table.getRegionLocation(startKey, true); + + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX -- setting retry, count: " + retryCount); + refresh = false; + } + retryCount++; + + if (retryCount < RETRY_ATTEMPTS && retry == true) { + try { + Thread.sleep(retrySleep); + } catch(InterruptedException ex) { + Thread.currentThread().interrupt(); + } - public static final int TM_TX_STATE_NOTX = 0; //S0 - NOTX - public static final int TM_TX_STATE_ACTIVE = 1; //S1 - ACTIVE - public static final int TM_TX_STATE_FORGOTTEN = 2; //N/A - public static final int TM_TX_STATE_COMMITTED = 3; //N/A - public static final int TM_TX_STATE_ABORTING = 4; //S4 - ROLLBACK - public static final int TM_TX_STATE_ABORTED = 5; //S4 - ROLLBACK - public static final int TM_TX_STATE_COMMITTING = 6; //S3 - PREPARED - public static final int TM_TX_STATE_PREPARING = 7; //S2 - IDLE - public static final int TM_TX_STATE_FORGETTING = 8; //N/A - public static final int TM_TX_STATE_PREPARED = 9; //S3 - PREPARED XARM Branches only! - public static final int TM_TX_STATE_FORGETTING_HEUR = 10; //S5 - HEURISTIC - public static final int TM_TX_STATE_BEGINNING = 11; //S1 - ACTIVE - public static final int TM_TX_STATE_HUNGCOMMITTED = 12; //N/A - public static final int TM_TX_STATE_HUNGABORTED = 13; //S4 - ROLLBACK - public static final int TM_TX_STATE_IDLE = 14; //S2 - IDLE XARM Branches only! - public static final int TM_TX_STATE_FORGOTTEN_HEUR = 15; //S5 - HEURISTIC - Waiting Superior TM xa_forget request - public static final int TM_TX_STATE_ABORTING_PART2 = 16; // Internal State - public static final int TM_TX_STATE_TERMINATING = 17; - public static final int TM_TX_STATE_LAST = 17; + retrySleep += TLOG_SLEEP_INCR; + } + } while (retryCount < RETRY_ATTEMPTS && retry == true); + // We have received our reply so decrement outstanding count + transactionState.requestPendingCountDec(false); + + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX -- EXIT ASN: " + auditSeqNum); + return 0; + } //getTransactionStatesFromIntervalX + } // TlogCallable private class AuditBuffer{ private ArrayList<Put> buffer; // Each Put is an audit record @@ -241,6 +391,12 @@ public class TmAuditTlog { if (LOG.isTraceEnabled()) LOG.trace("Enter TmAuditTlog constructor for dtmid " + dtmid); TLOG_TABLE_NAME = config.get("TLOG_TABLE_NAME"); int fillerSize = 2; + int intThreads = 16; + String numThreads = System.getenv("TM_JAVA_THREAD_POOL_SIZE"); + if (numThreads != null){ + intThreads = Integer.parseInt(numThreads); + } + tlogThreadPool = Executors.newFixedThreadPool(intThreads); controlPointDeferred = false; forceControlPoint = false; @@ -282,7 +438,7 @@ public class TmAuditTlog { } LOG.info("ageCommitted is " + ageCommitted); - versions = 5; + versions = 10; try { String maxVersions = System.getenv("TM_TLOG_MAX_VERSIONS"); if (maxVersions != null){ @@ -293,6 +449,30 @@ public class TmAuditTlog { if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_MAX_VERSIONS is not in ms.env"); } + TlogRetryDelay = 5000; // 3 seconds + try { + String retryDelayS = System.getenv("TM_TLOG_RETRY_DELAY"); + if (retryDelayS != null){ + TlogRetryDelay = (Integer.parseInt(retryDelayS) > TlogRetryDelay ? Integer.parseInt(retryDelayS) : TlogRetryDelay); + } + } + catch (Exception e) { + if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_RETRY_DELAY is not in ms.env"); + } + + TlogRetryCount = 60; + try { + String retryCountS = System.getenv("TM_TLOG_RETRY_COUNT"); + if (retryCountS != null){ + TlogRetryCount = (Integer.parseInt(retryCountS) > TlogRetryCount ? Integer.parseInt(retryCountS) : TlogRetryCount); + } + } + catch (Exception e) { + if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_RETRY_COUNT is not in ms.env"); + } + + connection = HConnectionManager.createConnection(config); + tlogNumLogs = 1; try { String numLogs = System.getenv("TM_TLOG_NUM_LOGS"); @@ -419,7 +599,7 @@ public class TmAuditTlog { HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(lv_tLogName)); desc.addFamily(hcol); - if (lvTlogExists == false) { + if (lvTlogExists == false) { // Need to prime the asn for future writes try { if (LOG.isTraceEnabled()) LOG.trace("Creating the table " + lv_tLogName); @@ -497,16 +677,14 @@ public class TmAuditTlog { tableString.append(name); } } - if (LOG.isTraceEnabled()) LOG.trace("table names: " + tableString.toString()); + if (LOG.isTraceEnabled()) LOG.trace("table names: " + tableString.toString() + " in thread " + threadId); } //Create the Put as directed by the hashed key boolean - Put p; - //create our own hashed key long key = (((lvTransid & tLogHashKey) << tLogHashShiftFactor) + (lvTransid & 0xFFFFFFFF)); lv_lockIndex = (int)(lvTransid & tLogHashKey); if (LOG.isTraceEnabled()) LOG.trace("key: " + key + ", hex: " + Long.toHexString(key) + ", transid: " + lvTransid); - p = new Put(Bytes.toBytes(key)); + Put p = new Put(Bytes.toBytes(key)); if (recoveryASN == -1){ // This is a normal audit record so we manage the ASN @@ -516,11 +694,12 @@ public class TmAuditTlog { // This is a recovery audit record so use the ASN passed in lvAsn = recoveryASN; } - if (LOG.isTraceEnabled()) LOG.trace("transid: " + lvTransid + " state: " + lvTxState + " ASN: " + lvAsn); + if (LOG.isTraceEnabled()) LOG.trace("transid: " + lvTransid + " state: " + lvTxState + " ASN: " + lvAsn + + " in thread " + threadId); p.add(TLOG_FAMILY, ASN_STATE, Bytes.toBytes(String.valueOf(lvAsn) + "," - + transidString + "," + lvTxState + + String.valueOf(lvTransid) + "," + lvTxState + "," + Bytes.toString(filler) - + "," + commitIdString + + "," + String.valueOf(lvCommitId) + "," + tableString.toString())); @@ -538,7 +717,6 @@ public class TmAuditTlog { catch (Exception e2){ // create record of the exception LOG.error("putSingleRecord Exception in recoveryTable" + e2); - e2.printStackTrace(); throw e2; } finally { @@ -727,11 +905,11 @@ public class TmAuditTlog { if (LOG.isTraceEnabled()) LOG.trace("transid: " + lvTransid + " state: " + lvTxState); } catch (IOException e){ - LOG.error("getRecord IOException"); + LOG.error("getRecord IOException " + e); throw e; } catch (Exception e){ - LOG.error("getRecord Exception"); + LOG.error("getRecord Exception " + e); throw e; } } @@ -803,10 +981,12 @@ public class TmAuditTlog { String lv_tLogName = new String(TLOG_TABLE_NAME + "_LOG_" + Integer.toHexString(i)); // Connection deleteConnection = ConnectionFactory.createConnection(this.config); + if (LOG.isTraceEnabled()) LOG.trace("delete table is: " + lv_tLogName); HConnection deleteConnection = HConnectionManager.createConnection(this.config); deleteTable = deleteConnection.getTable(TableName.valueOf(lv_tLogName)); try { + boolean scanComplete = false; Scan s = new Scan(); s.setCaching(100); s.setCacheBlocks(false); @@ -816,8 +996,9 @@ public class TmAuditTlog { try { for (Result r : ss) { for (Cell cell : r.rawCells()) { - String valueString = new String(CellUtil.cloneValue(cell)); - StringTokenizer st = new StringTokenizer(valueString, ","); + StringTokenizer st = + new StringTokenizer(Bytes.toString(CellUtil.cloneValue(cell)), ","); + if (LOG.isTraceEnabled()) LOG.trace("string tokenizer success "); if (st.hasMoreElements()) { String asnToken = st.nextElement().toString() ; String transidToken = st.nextElement().toString() ; @@ -831,21 +1012,17 @@ public class TmAuditTlog { else if ((Long.parseLong(asnToken) < lvAsn) && (stateToken.equals("COMMITTED") || stateToken.equals("ABORTED"))) { if (ageCommitted) { - String rowKey = new String(r.getRow()); Delete del = new Delete(r.getRow()); if (LOG.isTraceEnabled()) LOG.trace("adding transid: " + transidToken + " to delete list"); deleteList.add(del); } else { - String key = new String(r.getRow()); Get get = new Get(r.getRow()); get.setMaxVersions(versions); // will return last n versions of row Result lvResult = deleteTable.get(get); - // byte[] b = lvResult.getValue(TLOG_FAMILY, ASN_STATE); // returns current version of value List<Cell> list = lvResult.getColumnCells(TLOG_FAMILY, ASN_STATE); // returns all versions of this column for (Cell element : list) { - String value = new String(CellUtil.cloneValue(element)); - StringTokenizer stok = new StringTokenizer(value, ","); + StringTokenizer stok = new StringTokenizer(Bytes.toString(CellUtil.cloneValue(element)), ","); if (stok.hasMoreElements()) { if (LOG.isTraceEnabled()) LOG.trace("Performing secondary search on (" + transidToken + ")"); asnToken = stok.nextElement().toString() ; @@ -877,9 +1054,11 @@ public class TmAuditTlog { throw new RuntimeException(e); } finally { + if (LOG.isTraceEnabled()) LOG.trace("deleteAgedEntries closing ResultScanner"); ss.close(); } - if (LOG.isTraceEnabled()) LOG.trace("attempting to delete list with " + deleteList.size() + " elements"); + if (LOG.isTraceEnabled()) LOG.trace("attempting to delete list with " + deleteList.size() + + " elements from table " + lv_tLogName); try { deleteTable.delete(deleteList); } @@ -889,7 +1068,8 @@ public class TmAuditTlog { } } catch (IOException e) { - LOG.error("deleteAgedEntries IOException setting up scan on table index " + i); + LOG.error("deleteAgedEntries IOException setting up scan on table index " + + i + ", Exception: " + e); e.printStackTrace(); } finally { @@ -913,7 +1093,8 @@ public class TmAuditTlog { long startTime = System.nanoTime(); long endTime; - if (LOG.isTraceEnabled()) LOG.trace("writeControlPointRecords start with map size " + map.size()); + if (LOG.isTraceEnabled()) LOG.trace("Tlog " + getTlogTableNameBase() + + " writeControlPointRecords start with map size " + map.size()); try { for (Map.Entry<Long, TransactionState> e : map.entrySet()) { @@ -934,7 +1115,6 @@ public class TmAuditTlog { } catch (Exception ex) { LOG.error("formatRecord Exception " + ex); - ex.printStackTrace(); throw ex; } } @@ -956,7 +1136,6 @@ public class TmAuditTlog { } - public long addControlPoint (final Map<Long, TransactionState> map) throws IOException, Exception { if (LOG.isTraceEnabled()) LOG.trace("addControlPoint start with map size " + map.size()); long lvCtrlPt = 0L; @@ -983,6 +1162,7 @@ public class TmAuditTlog { try { lvAsn = asn.getAndIncrement(); + if (LOG.isTraceEnabled()) LOG.trace("lvAsn reset to: " + lvAsn); // Write the control point interval and the ASN to the control point table lvCtrlPt = tLogControlPoint.doControlPoint(lvAsn); @@ -990,10 +1170,11 @@ public class TmAuditTlog { if ((lvCtrlPt - 5) > 0){ // We'll keep 5 control points of audit try { agedAsn = tLogControlPoint.getRecord(String.valueOf(lvCtrlPt - 5)); - if ((agedAsn > 0) && (lvCtrlPt % 5 == 0)){ + if (agedAsn > 0){ try { if (LOG.isTraceEnabled()) LOG.trace("Attempting to remove TLOG writes older than asn " + agedAsn); - deleteAgedEntries(agedAsn); +// deleteAgedEntries(agedAsn); + deleteEntriesOlderThanASN(agedAsn, ageCommitted); } catch (Exception e){ LOG.error("deleteAgedEntries Exception " + e); @@ -1008,14 +1189,12 @@ public class TmAuditTlog { } } catch (IOException e){ - LOG.error("addControlPoint IOException"); - e.printStackTrace(); + LOG.error("addControlPoint IOException " + e); throw e; } } } catch (Exception e){ LOG.error("addControlPoint Exception " + e); - e.printStackTrace(); throw e; } if (LOG.isTraceEnabled()) LOG.trace("addControlPoint returning " + lvCtrlPt); @@ -1032,6 +1211,8 @@ public class TmAuditTlog { int lv_ownerNid = (int)(lvTransid >> 32); int lv_lockIndex = (int)(lvTransid & tLogHashKey); String lv_tLogName = new String("TRAFODION._DTM_.TLOG" + String.valueOf(lv_ownerNid) + "_LOG_" + Integer.toHexString(lv_lockIndex)); + if (LOG.isTraceEnabled()) LOG.trace("getTransactionState reading from: " + lv_tLogName); + HConnection unknownTableConnection = HConnectionManager.createConnection(this.config); unknownTransactionTable = unknownTableConnection.getTable(TableName.valueOf(lv_tLogName)); @@ -1196,6 +1377,73 @@ public class TmAuditTlog { } if (LOG.isTraceEnabled()) LOG.trace("getTransactionState end transid: " + ts.getTransactionId()); return; - } -} + } + + public String getTlogTableNameBase(){ + return TLOG_TABLE_NAME; + } + /** + * Method : deleteEntriesOlderThanASN + * Params : pv_ASN - ASN before which all audit records will be deleted + * Return : void + * Purpose : Delete transaction records which are no longer needed + */ + public void deleteEntriesOlderThanASN(final long pv_ASN, final boolean pv_ageCommitted) throws IOException { + int loopCount = 0; + long threadId = Thread.currentThread().getId(); + // This TransactionState object is just a mechanism to keep track of the asynch rpc calls + // send to regions in order to retrience the desired set of transactions + TransactionState transactionState = new TransactionState(0); + CompletionService<Integer> compPool = new ExecutorCompletionService<Integer>(tlogThreadPool); + HConnection targetTableConnection = HConnectionManager.createConnection(this.config); + + try { + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN: " + + pv_ASN + ", in thread: " + threadId); + HTableInterface targetTable; + List<HRegionLocation> regionList; + + // For every Tlog table for this node + for (int index = 0; index < tlogNumLogs; index++) { + String lv_tLogName = new String("TRAFODION._DTM_.TLOG" + String.valueOf(this.dtmid) + "_LOG_" + Integer.toHexString(index)); + regionList = targetTableConnection.locateRegions(TableName.valueOf(lv_tLogName), false, false); + loopCount++; + // For every region in this table + for (HRegionLocation location : regionList) { + final byte[] regionName = location.getRegionInfo().getRegionName(); + compPool.submit(new TlogCallable(transactionState, location, connection) { + public Integer call() throws IOException { + if (LOG.isTraceEnabled()) LOG.trace("before deleteEntriesOlderThanASNX() ASN: " + + pv_ASN); + return deleteEntriesOlderThanASNX(regionName, pv_ASN, pv_ageCommitted); + } + }); + } + } + } catch (Exception e) { + LOG.error("exception in deleteEntriesOlderThanASN for ASN: " + + pv_ASN + " " + e); + throw new IOException(e); + } + // all requests sent at this point, can record the count + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN tlog callable requests sent to " + + loopCount + " tlogs in thread " + threadId); + int deleteError = 0; + try { + for (int loopIndex = 0; loopIndex < loopCount; loopIndex ++) { + int partialResult = compPool.take().get(); + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN partial result: " + partialResult + " loopIndex " + loopIndex); + } + } + catch (Exception e2) { + LOG.error("exception retieving replys in deleteEntriesOlderThanASN for interval ASN: " + pv_ASN + + " " + e2); + throw new IOException(e2); + } + HConnectionManager.deleteStaleConnection(targetTableConnection); + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN tlog callable requests completed in thread " + + threadId); + return; + } +}
