Fix for JIRA [TRAFODION-1996] TLOG uses excessive memory when aging old entries
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/6b5da39b Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/6b5da39b Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/6b5da39b Branch: refs/heads/master Commit: 6b5da39b1a2b4359735439ff7095ba719b11786e Parents: 68653b1 Author: Sean Broeder <[email protected]> Authored: Thu Jun 9 13:59:37 2016 +0000 Committer: Sean Broeder <[email protected]> Committed: Thu Jun 9 13:59:37 2016 +0000 ---------------------------------------------------------------------- .../transactional/TrxRegionEndpoint.java.tmpl | 35 ++++--- .../trafodion/dtm/HBaseAuditControlPoint.java | 4 +- .../java/org/trafodion/dtm/TmAuditTlog.java | 101 +++++++++---------- 3 files changed, 71 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/6b5da39b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl index b0e87d3..69ddb68 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl @@ -2184,7 +2184,7 @@ CoprocessorService, Coprocessor { } catch (Exception e){ if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries Exception in region: " - + regionInfo.getRegionNameAsString() + " getting scanner " + e ); + + regionInfo.getRegionNameAsString() + " getting scanner ", e ); } long count = 0L; @@ -2224,11 +2224,13 @@ CoprocessorService, Coprocessor { StringTokenizer st = new StringTokenizer(valueString, ","); if (st.hasMoreElements()) { String asnToken = st.nextElement().toString(); - String transidToken = st.nextElement().toString(); - String stateToken = st.nextElement().toString(); - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries transidToken: " - + transidToken + " asnToken: " + asnToken); + String transidToken = null; + String stateToken = null; if (Long.parseLong(asnToken) < lvAsn) { + transidToken = st.nextElement().toString(); + stateToken = st.nextElement().toString(); + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries transidToken: " + + transidToken + " asnToken: " + asnToken); if ( (stateToken.contains("FORGOTTEN")) || (stateToken.equals("COMMITTED") && (lvAgeCommitted)) || (stateToken.equals("ABORTED") && (lvAgeCommitted))) { @@ -2238,11 +2240,12 @@ CoprocessorService, Coprocessor { try { Delete d = new Delete(result.getRow()); + d.setDurability(Durability.SKIP_WAL); m_Region.delete(d); } catch (Exception e) { LOG.warn("TrxRegionEndpoint coprocessor: deleteTlogEntries -" - + " txId " + transidToken + ", Executing delete caught an exception " + e); + + " txId " + transidToken + ", Executing delete caught an exception ", e); throw new IOException(e.toString()); } count++; @@ -2288,15 +2291,17 @@ CoprocessorService, Coprocessor { + e.getMessage() + " " + stackTraceToString(e)); t = e; } - if (scanner != null) { - try { - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId " - + transId + ", closing the scanner, region is " + regionInfo.getRegionNameAsString()); - scanner.close(); - } catch(Exception e) { - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - transaction id " - + transId + ", Caught general exception " + e.getMessage() + " " + stackTraceToString(e)); - ne = e; + finally { + if (scanner != null) { + try { + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId " + + transId + ", closing the scanner, region is " + regionInfo.getRegionNameAsString()); + scanner.close(); + } catch(Exception e) { + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - transaction id " + + transId + ", Caught general exception " + e.getMessage() + " " + stackTraceToString(e)); + ne = e; + } } } } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/6b5da39b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java index 11120ef..e50fe3b 100644 --- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java +++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java @@ -34,6 +34,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -404,7 +406,7 @@ public class HBaseAuditControlPoint { table.delete(deleteList); } catch (Exception e) { - LOG.error("deleteAgedRecords IOException"); + LOG.error("deleteAgedRecords IOException ", e); }finally { ss.close(); } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/6b5da39b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java index a94aa16..11b17ce 100644 --- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java +++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/TmAuditTlog.java @@ -194,7 +194,7 @@ public class TmAuditTlog { try { table = new HTable(location.getRegionInfo().getTable(), connection, tlogThreadPool); } catch(IOException e) { - LOG.error("Error obtaining HTable instance " + e); + LOG.error("Error obtaining HTable instance ", e); table = null; } startKey = location.getRegionInfo().getStartKey(); @@ -242,8 +242,9 @@ public class TmAuditTlog { + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8")); result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable); } catch (Throwable e) { - String msg = "ERROR occurred while calling deleteTlogEntries coprocessor service in deleteEntriesOlderThanASNX"; - LOG.error(msg + ":" + e); + e.printStackTrace(); + String msg = new String("ERROR occurred while calling deleteTlogEntries coprocessor service in deleteEntriesOlderThanASNX: " + e); + LOG.error(msg); throw new Exception(msg); } if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASNX -- after coprocessorService ASN: " + auditSeqNum @@ -267,7 +268,7 @@ public class TmAuditTlog { retry = false; } } catch (Exception e) { - LOG.error("deleteEntriesOlderThanASNX retrying due to Exception: " + e); + LOG.error("deleteEntriesOlderThanASNX retrying due to Exception: ", e); refresh = true; retry = true; } @@ -309,7 +310,7 @@ public class TmAuditTlog { retrySleep += TLOG_SLEEP_INCR; } - } while (retryCount < RETRY_ATTEMPTS && retry == true); + } while (retryCount < RETRY_ATTEMPTS && retry == true); // We have received our reply so decrement outstanding count transactionState.requestPendingCountDec(false); @@ -334,7 +335,7 @@ public class TmAuditTlog { buffer.add(localPut); } catch (Exception e) { - if (LOG.isDebugEnabled()) LOG.debug("AuditBuffer Exception trying bufferAdd" + e); + if (LOG.isDebugEnabled()) LOG.debug("AuditBuffer Exception trying bufferAdd", e); throw e; } if (LOG.isTraceEnabled()) LOG.trace("BufferAdd end in thread " + threadId ); @@ -348,7 +349,7 @@ public class TmAuditTlog { lvSize = buffer.size(); } catch (Exception e) { - if (LOG.isDebugEnabled()) LOG.debug("AuditBuffer Exception trying bufferSize" + e); + if (LOG.isDebugEnabled()) LOG.debug("AuditBuffer Exception trying bufferSize", e); throw e; } if (LOG.isTraceEnabled()) LOG.trace("AuditBuffer bufferSize end; returning " + lvSize + " in thread " @@ -363,7 +364,7 @@ public class TmAuditTlog { buffer.clear(); } catch (Exception e) { - if (LOG.isDebugEnabled()) LOG.debug("Exception trying bufferClear.clear" + e); + if (LOG.isDebugEnabled()) LOG.debug("Exception trying bufferClear.clear", e); throw e; } if (LOG.isTraceEnabled()) LOG.trace("AuditBuffer bufferClear end in thread " + threadId); @@ -522,7 +523,7 @@ public class TmAuditTlog { tLogHashShiftFactor = 59; break; default : { - LOG.error("TM_TLOG_NUM_LOGS must b 1 or a power of 2 in the range 2-32"); + LOG.error("TM_TLOG_NUM_LOGS must be 1 or a power of 2 in the range 2-32"); throw new RuntimeException(); } } @@ -571,7 +572,7 @@ public class TmAuditTlog { tLogControlPoint = new HBaseAuditControlPoint(config); } catch (Exception e) { - LOG.error("Unable to create new HBaseAuditControlPoint object " + e); + LOG.error("Unable to create new HBaseAuditControlPoint object ", e); } tlogAuditLock = new Object[tlogNumLogs]; @@ -615,7 +616,7 @@ public class TmAuditTlog { table[i] = new HTable(config, desc.getName()); } catch(Exception e){ - LOG.error("TmAuditTlog Exception on index " + i + "; " + e); + LOG.error("TmAuditTlog Exception on index " + i + "; ", e); throw new RuntimeException(e); } @@ -718,7 +719,7 @@ public class TmAuditTlog { } catch (Exception e2){ // create record of the exception - LOG.error("putSingleRecord Exception in recoveryTable" + e2); + LOG.error("putSingleRecord Exception in recoveryTable", e2); throw e2; } finally { @@ -733,7 +734,7 @@ public class TmAuditTlog { } } else { - // THis goes to our local TLOG + // This goes to our local TLOG if (LOG.isTraceEnabled()) LOG.trace("TLOG putSingleRecord synchronizing tlogAuditLock[" + lv_lockIndex + "] in thread " + threadId ); startSynch = System.nanoTime(); try { @@ -751,7 +752,7 @@ public class TmAuditTlog { } catch (Exception e2){ // create record of the exception - LOG.error("putSingleRecord Exception " + e2); + LOG.error("putSingleRecord Exception ", e2); e2.printStackTrace(); throw e2; } @@ -759,7 +760,7 @@ public class TmAuditTlog { } catch (Exception e) { // create record of the exception - LOG.error("Synchronizing on tlogAuditLock[" + lv_lockIndex + "] Exception " + e); + LOG.error("Synchronizing on tlogAuditLock[" + lv_lockIndex + "] Exception ", e); e.printStackTrace(); throw e; } @@ -907,16 +908,16 @@ public class TmAuditTlog { if (LOG.isTraceEnabled()) LOG.trace("transid: " + lvTransid + " state: " + lvTxState); } catch (IOException e){ - LOG.error("getRecord IOException " + e); + LOG.error("getRecord IOException ", e); throw e; } catch (Exception e){ - LOG.error("getRecord Exception " + e); + LOG.error("getRecord Exception ", e); throw e; } } catch (Exception e2) { - LOG.error("getRecord Exception2 " + e2); + LOG.error("getRecord Exception2 ", e2); e2.printStackTrace(); } @@ -944,11 +945,11 @@ public class TmAuditTlog { lvTxState = st.nextElement().toString(); if (LOG.isTraceEnabled()) LOG.trace("transid: " + transidToken + " state: " + lvTxState); } catch (IOException e){ - LOG.error("getRecord IOException"); + LOG.error("getRecord IOException: ", e); throw e; } } catch (Exception e){ - LOG.error("getRecord Exception " + e); + LOG.error("getRecord Exception: ", e); throw e; } if (LOG.isTraceEnabled()) LOG.trace("getRecord end; returning String:" + lvTxState); @@ -970,7 +971,7 @@ public class TmAuditTlog { table[lv_lockIndex].delete(d); } catch (Exception e) { - LOG.error("deleteRecord Exception " + e ); + LOG.error("deleteRecord Exception: ", e ); } if (LOG.isTraceEnabled()) LOG.trace("deleteRecord - exit"); return true; @@ -1051,7 +1052,7 @@ public class TmAuditTlog { } } catch(Exception e){ - LOG.error("deleteAgedEntries Exception getting results for table " + lv_tLogName + "; " + e); + LOG.error("deleteAgedEntries Exception getting results for table " + lv_tLogName + "; ", e); throw new RuntimeException(e); } finally { @@ -1064,13 +1065,13 @@ public class TmAuditTlog { deleteTable.delete(deleteList); } catch(IOException e){ - LOG.error("deleteAgedEntries Exception deleting from table " + lv_tLogName + "; " + e); + LOG.error("deleteAgedEntries Exception deleting from table " + lv_tLogName + "; ", e); throw new RuntimeException(e); } } catch (IOException e) { LOG.error("deleteAgedEntries IOException setting up scan on table " - + lv_tLogName + ", Exception: " + e); + + lv_tLogName + ", Exception: ", e); e.printStackTrace(); } finally { @@ -1105,16 +1106,11 @@ public class TmAuditTlog { if (value.getStatus().equals("COMMITTED")){ if (LOG.isTraceEnabled()) LOG.trace("writeControlPointRecords adding record for trans (" + transid + ") : state is " + value.getStatus()); cpWrites++; - if (forceControlPoint) { - putSingleRecord(transid, value.getCommitId(), value.getStatus(), value.getParticipatingRegions(), true); - } - else { - putSingleRecord(transid, value.getCommitId(), value.getStatus(), value.getParticipatingRegions(), false); - } + putSingleRecord(transid, value.getCommitId(), value.getStatus(), value.getParticipatingRegions(), forceControlPoint); } } catch (Exception ex) { - LOG.error("formatRecord Exception " + ex); + LOG.error("formatRecord Exception ", ex); throw ex; } } @@ -1177,7 +1173,7 @@ public class TmAuditTlog { deleteEntriesOlderThanASN(agedAsn, ageCommitted); } catch (Exception e){ - LOG.error("deleteAgedEntries Exception " + e); + LOG.error("deleteAgedEntries Exception ", e); throw e; } } @@ -1189,12 +1185,12 @@ public class TmAuditTlog { } } catch (IOException e){ - LOG.error("addControlPoint IOException " + e); + LOG.error("addControlPoint IOException ", e); throw e; } } } catch (Exception e){ - LOG.error("addControlPoint Exception " + e); + LOG.error("addControlPoint Exception ", e); throw e; } if (LOG.isTraceEnabled()) LOG.trace("addControlPoint returning " + lvCtrlPt); @@ -1371,7 +1367,7 @@ public class TmAuditTlog { } } catch (Exception e2) { - LOG.error("getTransactionState Exception2 " + e2); + LOG.error("getTransactionState Exception2 ", e2); e2.printStackTrace(); } if (LOG.isTraceEnabled()) LOG.trace("getTransactionState end transid: " + ts.getTransactionId()); @@ -1389,7 +1385,7 @@ public class TmAuditTlog { * Purpose : Delete transaction records which are no longer needed */ public void deleteEntriesOlderThanASN(final long pv_ASN, final boolean pv_ageCommitted) throws IOException { - int loopCount = 0; + int loopIndex = 0; long threadId = Thread.currentThread().getId(); // This TransactionState object is just a mechanism to keep track of the asynch rpc calls // send to regions in order to retrience the desired set of transactions @@ -1406,9 +1402,11 @@ public class TmAuditTlog { for (int index = 0; index < tlogNumLogs; index++) { String lv_tLogName = new String("TRAFODION._DTM_.TLOG" + String.valueOf(this.dtmid) + "_LOG_" + Integer.toHexString(index)); regionList = connection.locateRegions(TableName.valueOf(lv_tLogName), false, false); - loopCount++; + loopIndex++; + int regionIndex = 0; // For every region in this table for (HRegionLocation location : regionList) { + regionIndex++; final byte[] regionName = location.getRegionInfo().getRegionName(); compPool.submit(new TlogCallable(transactionState, location, connection) { public Integer call() throws IOException { @@ -1417,28 +1415,25 @@ public class TmAuditTlog { return deleteEntriesOlderThanASNX(regionName, pv_ASN, pv_ageCommitted); } }); + + try { + int partialResult = compPool.take().get(); + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN partial result: " + partialResult + + " loopIndex " + loopIndex + " regionIndex " + regionIndex); + } + catch (Exception e2) { + LOG.error("exception retieving reply in deleteEntriesOlderThanASN for interval ASN: " + pv_ASN + + " ", e2); + throw new IOException(e2); + } } } } catch (Exception e) { LOG.error("exception in deleteEntriesOlderThanASN for ASN: " - + pv_ASN + " " + e); + + pv_ASN + " ", e); throw new IOException(e); } - // all requests sent at this point, can record the count - if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN tlog callable requests sent to " - + loopCount + " tlogs in thread " + threadId); - int deleteError = 0; - try { - for (int loopIndex = 0; loopIndex < loopCount; loopIndex ++) { - int partialResult = compPool.take().get(); - if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN partial result: " + partialResult + " loopIndex " + loopIndex); - } - } - catch (Exception e2) { - LOG.error("exception retieving replys in deleteEntriesOlderThanASN for interval ASN: " + pv_ASN - + " " + e2); - throw new IOException(e2); - } + if (LOG.isTraceEnabled()) LOG.trace("deleteEntriesOlderThanASN tlog callable requests completed in thread " + threadId); return;
