Fix for [TRAFODION-1703] Lower overhead in deleting old Tlog entries
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/7f54aa7c Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/7f54aa7c Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/7f54aa7c Branch: refs/heads/master Commit: 7f54aa7cddc5c57d71d3aa5a59ce2046b67865cd Parents: 02c6c32 Author: Sean Broeder <[email protected]> Authored: Tue Jan 26 13:40:38 2016 +0000 Committer: Sean Broeder <[email protected]> Committed: Wed Jan 27 03:57:25 2016 +0000 ---------------------------------------------------------------------- .../transactional/TrxRegionEndpoint.java | 208 +- .../generated/TrxRegionProtos.java | 2182 +++++++++++++++++- .../hbase-trx/src/main/protobuf/TrxRegion.proto | 17 + .../java/org/trafodion/dtm/TmAuditTlog.java | 350 ++- 4 files changed, 2606 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7f54aa7c/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java index 804afdf..e8e6038 100755 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java @@ -61,6 +61,7 @@ import java.util.NavigableSet; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; +import java.util.StringTokenizer; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; @@ -207,6 +208,8 @@ import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProt import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalResponse; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestRequest; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestResponse; +import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteRequest; +import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteResponse; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionalAggregateRequest; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionalAggregateResponse; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService; @@ -2145,6 +2148,206 @@ CoprocessorService, Coprocessor { done.run(presponse); } + public void deleteTlogEntries(RpcController controller, + TlogDeleteRequest request, RpcCallback<TlogDeleteResponse> done) { + boolean hasMore = true; + RegionScanner scanner = null; + Throwable t = null; + ScannerTimeoutException ste = null; + OutOfOrderProtocolException oop = null; + OutOfOrderScannerNextException ooo = null; + UnknownScannerException use = null; + MemoryUsageException mue = null; + WrongRegionException wre = null; + Exception ne = null; + Scan scan = null; + List<Cell> cellResults = new ArrayList<Cell>(); + org.apache.hadoop.hbase.client.Result result = null; + long transId = request.getTransactionId(); + long lvAsn = request.getAuditSeqNum(); + boolean lvAgeCommitted = request.getAgeCommitted(); + try{ + scan = ProtobufUtil.toScan(request.getScan()); + prepareScanner(scan); + scanner = getScanner(transId, scan); + } + catch (Exception e){ + if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries Exception in region: " + + regionInfo.getRegionNameAsString() + " getting scanner " + e ); + } + + long count = 0L; + boolean shouldContinue = true; + + if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries ENTRY. Records older than " + lvAsn + + " will be deleted in region: " + regionInfo.getRegionNameAsString()); + + // There should be a matching key in the transactionsById map + // associated with this transaction id. If there is not + // one, then the initial openScanner call for the transaction + // id was not called. This is a protocol error requiring + // openScanner, performScan followed by a closeScanner. + + String key = getTransactionalUniqueId(transId); + boolean keyFound = transactionsById.containsKey(key); + + if (keyFound != true){ + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - Unknown transaction [" + + transId + "] in region [" + m_Region.getRegionInfo().getRegionNameAsString() + + "], will create an OutOfOrderProtocol exception "); + oop = new OutOfOrderProtocolException("deleteTlogEntries does not have an active transaction with an open scanner, txId: " + transId); + } + + if (oop == null) { + try { + + if (scanner != null){ + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId " + + transId + ", scanner is not null"); + while (shouldContinue) { + hasMore = scanner.next(cellResults); + result = Result.create(cellResults); + if (!result.isEmpty()) { + for (Cell cell : result.rawCells()) { + String valueString = new String(CellUtil.cloneValue(cell)); + StringTokenizer st = new StringTokenizer(valueString, ","); + if (st.hasMoreElements()) { + String asnToken = st.nextElement().toString(); + String transidToken = st.nextElement().toString(); + String stateToken = st.nextElement().toString(); + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries transidToken: " + + transidToken + " asnToken: " + asnToken); + if (Long.parseLong(asnToken) < lvAsn) { + if ( (stateToken.contains("FORGOTTEN")) || + (stateToken.equals("COMMITTED") && (lvAgeCommitted)) || + (stateToken.equals("ABORTED") && (lvAgeCommitted))) { + + if (LOG.isTraceEnabled()) LOG.trace("Deleting transid: " + transidToken + + " from region: " + m_Region.getRegionInfo().getRegionNameAsString() + " with state: " + stateToken); + + try { + Delete d = new Delete(result.getRow()); + m_Region.delete(d); + } + catch (Exception e) { + LOG.warn("TrxRegionEndpoint coprocessor: deleteTlogEntries -" + + " txId " + transidToken + ", Executing delete caught an exception " + e); + throw new IOException(e.toString()); + } + count++; + } + } else { + if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries Ending scan at asn: " + asnToken + + ", transid: " + transidToken + + " because it is not less than the comparator: " + lvAsn + + " in region: " + m_Region.getRegionInfo().getRegionNameAsString()); + shouldContinue = false; + break; + } + } // if (st.hasMoreElements() + } // for (Cell cell : result.rawCells() + } // if (!result.isEmpty() + cellResults.clear(); + + if (!hasMore){ + shouldContinue = false; + } + } // while (shouldContinue) + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId " + + transId + ", count is " + count + ", hasMore is " + hasMore + + ", result " + result.isEmpty()); + } + else { + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId " + + transId + ", scanner is null"); + } + } catch(OutOfOrderScannerNextException ooone) { + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId " + + transId + " Caught OutOfOrderScannerNextException " + + ooone.getMessage() + " " + stackTraceToString(ooone)); + ooo = ooone; + } catch(ScannerTimeoutException cste) { + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId " + + transId + " Caught ScannerTimeoutException " + + cste.getMessage() + " " + stackTraceToString(cste)); + ste = cste; + } catch(Throwable e) { + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId " + + transId + " Caught throwable exception " + + e.getMessage() + " " + stackTraceToString(e)); + t = e; + } + if (scanner != null) { + try { + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId " + + transId + ", closing the scanner, region is " + regionInfo.getRegionNameAsString()); + scanner.close(); + } catch(Exception e) { + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - transaction id " + + transId + ", Caught general exception " + e.getMessage() + " " + stackTraceToString(e)); + ne = e; + } + } + } + + org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteResponse.Builder deleteResponseBuilder = TlogDeleteResponse.newBuilder(); + deleteResponseBuilder.setCount(count); + deleteResponseBuilder.setHasException(false); + + if (t != null){ + deleteResponseBuilder.setHasException(true); + deleteResponseBuilder.setException(t.toString()); + } + + if (ste != null){ + deleteResponseBuilder.setHasException(true); + deleteResponseBuilder.setException(ste.toString()); + } + + if (wre != null){ + deleteResponseBuilder.setHasException(true); + deleteResponseBuilder.setException(wre.toString()); + } + + if (ne != null){ + deleteResponseBuilder.setHasException(true); + deleteResponseBuilder.setException(ne.toString()); + } + + if (ooo != null){ + deleteResponseBuilder.setHasException(true); + deleteResponseBuilder.setException(ooo.toString()); + } + + if (use != null){ + deleteResponseBuilder.setHasException(true); + deleteResponseBuilder.setException(use.toString()); + } + + if (oop != null){ + if (this.suppressOutOfOrderProtocolException == false){ + deleteResponseBuilder.setHasException(true); + deleteResponseBuilder.setException(oop.toString()); + LOG.warn("TrxRegionEndpoint coprocessor: deleteTlogEntries - OutOfOrderProtocolException, transaction was not found, txId: " + + transId + ", return exception" + ", regionName " + regionInfo.getRegionNameAsString()); + } + else{ + LOG.warn("TrxRegionEndpoint coprocessor: deleteTlogEntries - suppressing OutOfOrderProtocolException, transaction was not found, txId: " + + transId + ", regionName " + regionInfo.getRegionNameAsString()); + } + } + + if (mue != null){ + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - performing memoryPercentage " + + memoryPercentage + ", posting memory usage exceeds indicated percentage"); + deleteResponseBuilder.setHasException(true); + deleteResponseBuilder.setException(mue.toString()); + } + + TlogDeleteResponse TlogDel_response = deleteResponseBuilder.build(); + done.run(TlogDel_response); + } + @Override public void put(RpcController controller, PutTransactionalRequest request, @@ -4607,7 +4810,7 @@ CoprocessorService, Coprocessor { state.setSequenceNumber(nextSequenceId.getAndIncrement()); commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(), state); if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: Transaction " + transactionId - + " found in region " + m_Region.getRegionInfo().getRegionNameAsString() + + " found in region " + lv_regionName + ". Adding to commitedTransactionsBySequenceNumber for sequence number " + state.getSequenceNumber()); } commitCheckEndTime = putBySequenceEndTime = System.nanoTime(); @@ -4767,7 +4970,8 @@ CoprocessorService, Coprocessor { if(state.getSplitRetry()) return COMMIT_RESEND; retireTransaction(state, true); - if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest READ ONLY -- EXIT txId: " + transactionId); + if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest READ ONLY for participant " + + participantNum + " -- EXIT txId: " + transactionId + ", regionName " + regionInfo.getRegionNameAsString()); return COMMIT_OK_READ_ONLY; }
