Repository: incubator-trafodion Updated Branches: refs/heads/master ff4b7dd73 -> ed98ed849
TRAFODION-1648 Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/2b46c9c0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/2b46c9c0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/2b46c9c0 Branch: refs/heads/master Commit: 2b46c9c008d00bab539ecbf7f52cf32c173bf561 Parents: 4550081 Author: Trina Krug <[email protected]> Authored: Thu Jan 21 21:46:34 2016 +0000 Committer: Trina Krug <[email protected]> Committed: Thu Jan 21 21:46:34 2016 +0000 ---------------------------------------------------------------------- .../transactional/TrxRegionEndpoint.java | 318 +++++++++++++------ .../transactional/TrxRegionObserver.java | 72 ++++- 2 files changed, 283 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/2b46c9c0/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java index 583fca2..804afdf 100755 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java @@ -328,7 +328,9 @@ CoprocessorService, Coprocessor { private FileSystem fs = null; private RegionCoprocessorHost rch = null; private HLog tHLog = null; - private AtomicBoolean closing = new AtomicBoolean(false); + private AtomicBoolean blockAll = new AtomicBoolean(false); + private AtomicBoolean blockNonPhase2 = new AtomicBoolean(false); + private AtomicBoolean blockNewTrans = new AtomicBoolean(false); private boolean configuredEarlyLogging = false; private boolean configuredConflictReinstate = false; private static Object zkRecoveryCheckLock = new Object(); @@ -1063,7 +1065,7 @@ CoprocessorService, Coprocessor { } // Process in local memory - if (delete != null && t == null) + if ((delete != null) && (t == null)) { if (request.hasRow()) { @@ -1187,7 +1189,7 @@ CoprocessorService, Coprocessor { } // Process in local memory - if (put != null) + if ((put != null) && (t == null)) { if (request.hasRow()) { row = request.getRow(); @@ -1308,6 +1310,8 @@ CoprocessorService, Coprocessor { if (oop == null) { try { + // we want to allow closing scaners and remove operations up until the very end. + checkBlockAll(transId); scanner = removeScanner(scannerId); if (scanner != null) { @@ -1425,6 +1429,7 @@ CoprocessorService, Coprocessor { if (type == MutationType.DELETE && proto.hasRow()) { try { + checkBlockNonPhase2(transactionId); // throws IOException delete = ProtobufUtil.toDelete(proto); } catch (Throwable e) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:deleteMultiple - txId " + transactionId + ", Caught exception after protobuf conversion delete" @@ -1433,7 +1438,7 @@ CoprocessorService, Coprocessor { } // Process in local memory - if (delete != null) + if ((delete != null) && (t == null)) { try { delete(transactionId, delete); @@ -1523,6 +1528,7 @@ CoprocessorService, Coprocessor { else { try { + checkBlockNonPhase2(transactionId); // throws IOException delete = ProtobufUtil.toDelete(proto); } catch (Throwable e) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:delete - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e)); @@ -1530,17 +1536,19 @@ CoprocessorService, Coprocessor { } // Process in local memory - try { - delete(transactionId, delete); - } catch (Throwable e) { - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:delete - txId " + transactionId + ", Caught exception after internal delete - " - + e.getMessage() + " " + stackTraceToString(e)); - t = e; - } + if ((delete != null) && (t == null)) + { + try { + delete(transactionId, delete); + } catch (Throwable e) { + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:delete - txId " + transactionId + ", Caught exception after internal delete - " + + e.getMessage() + " " + stackTraceToString(e)); + t = e; + } - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: delete - txId " + transactionId + ", regionName " + regionInfo.getRegionNameAsString() + ", type " + type + ", row " + Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex " + Hex.encodeHexString(proto.getRow().toByteArray())); + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: delete - txId " + transactionId + ", regionName " + regionInfo.getRegionNameAsString() + ", type " + type + ", row " + Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex " + Hex.encodeHexString(proto.getRow().toByteArray())); + } } - org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteTransactionalResponse.Builder deleteTransactionalResponseBuilder = DeleteTransactionalResponse.newBuilder(); deleteTransactionalResponseBuilder.setHasException(false); @@ -1584,6 +1592,7 @@ CoprocessorService, Coprocessor { WrongRegionException wre = null; org.apache.hadoop.hbase.client.Result result2 = null; long transactionId = request.getTransactionId(); + boolean exceptionThrown = false; /* commenting it out for the time-being java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8(); @@ -1605,59 +1614,65 @@ CoprocessorService, Coprocessor { else { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: get - performing memoryPercentage " + memoryPercentage + ", generating memory usage exceeds indicated percentage exception"); mue = new MemoryUsageException("get memory usage exceeds " + memoryUsageThreshold + " percent, trxId is " + transactionId); + exceptionThrown = true; } } else { try { + checkBlockNonPhase2(transactionId); // throws IOException get = ProtobufUtil.toGet(proto); } catch (Throwable e) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor:get - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e)); t = e; + exceptionThrown = true; } - Scan scan = new Scan(get); - List<Cell> results = new ArrayList<Cell>(); + if (exceptionThrown == false) + { + Scan scan = new Scan(get); + List<Cell> results = new ArrayList<Cell>(); - try { + try { - if (LOG.isTraceEnabled()) { - byte[] row = proto.getRow().toByteArray(); - byte[] getrow = get.getRow(); - String rowKey = Bytes.toString(row); - String getRowKey = Bytes.toString(getrow); + if (LOG.isTraceEnabled()) { + byte[] row = proto.getRow().toByteArray(); + byte[] getrow = get.getRow(); + String rowKey = Bytes.toString(row); + String getRowKey = Bytes.toString(getrow); - LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + transactionId + ", Calling getScanner for regionName " + regionInfo.getRegionNameAsString() + ", row = " + Bytes.toStringBinary(row) + ", row in hex " + Hex.encodeHexString(row) + ", getrow = " + Bytes.toStringBinary(getrow) + ", getrow in hex " + Hex.encodeHexString(getrow)); - } + LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + transactionId + ", Calling getScanner for regionName " + regionInfo.getRegionNameAsString() + ", row = " + Bytes.toStringBinary(row) + ", row in hex " + Hex.encodeHexString(row) + ", getrow = " + Bytes.toStringBinary(getrow) + ", getrow in hex " + Hex.encodeHexString(getrow)); + } - scanner = getScanner(transactionId, scan); + scanner = getScanner(transactionId, scan); - if (scanner != null) - scanner.next(results); + if (scanner != null) + scanner.next(results); - result2 = Result.create(results); - - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + transactionId + ", getScanner result2 isEmpty is " + result2 = Result.create(results); + + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + transactionId + ", getScanner result2 isEmpty is " + result2.isEmpty() + ", row " + Bytes.toStringBinary(result2.getRow()) + " result length: " + result2.size()); - } catch(Throwable e) { - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e)); - t = e; - } - finally { - if (scanner != null) { - try { - scanner.close(); - } catch(Exception e) { + } catch(Throwable e) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e)); - ge = e; + t = e; } - } - } + finally { + if (scanner != null) { + try { + scanner.close(); + } catch(Exception e) { + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + stackTraceToString(e)); + ge = e; + } + } + } + } // ExceptionThrown } // End of MemoryUsageCheck //} // End of WrongRegionCheck @@ -1822,6 +1837,7 @@ CoprocessorService, Coprocessor { } catch (IOException e) { LOG.error("TrxRegionEndpoint coprocessor: openScanner - txId " + transId + ", getScanner Error opening scanner, " + e.toString()); exceptionThrown = true; + ioe = e; } } @@ -1898,6 +1914,8 @@ CoprocessorService, Coprocessor { boolean shouldContinue = true; TransactionalRegionScannerHolder rsh = null; + boolean exceptionThrown = false; + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + ", numberOfRows " + numberOfRows + ", nextCallSeq " + nextCallSeq + ", closeScanner is " + closeScanner + ", region is " + regionInfo.getRegionNameAsString()); /* commenting it out for the time-being @@ -1976,12 +1994,15 @@ CoprocessorService, Coprocessor { } catch(OutOfOrderScannerNextException ooone) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + " Caught OutOfOrderScannerNextException " + ooone.getMessage() + " " + stackTraceToString(ooone)); ooo = ooone; + exceptionThrown = true; } catch(ScannerTimeoutException cste) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + " Caught ScannerTimeoutException " + cste.getMessage() + " " + stackTraceToString(cste)); ste = cste; + exceptionThrown = true; } catch(Throwable e) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + " Caught throwable exception " + e.getMessage() + " " + stackTraceToString(e)); t = e; + exceptionThrown = true; } finally { if (scanner != null) { @@ -2002,43 +2023,46 @@ CoprocessorService, Coprocessor { } catch(Exception e) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - transaction id " + transId + ", Caught general exception " + e.getMessage() + " " + stackTraceToString(e)); ne = e; + exceptionThrown = true; } } } - rsh = scanners.get(scannerId); + if (exceptionThrown == false) + { + rsh = scanners.get(scannerId); - nextCallSeq++; + nextCallSeq++; - if (rsh == null) - { - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan rsh is null"); - use = new UnknownScannerException( - "ScannerId: " + scannerId + ", already closed?"); - } - else - { - rsh.nextCallSeq = nextCallSeq; + if (rsh == null) + { + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan rsh is null"); + use = new UnknownScannerException( + "ScannerId: " + scannerId + ", already closed?"); + } + else + { + rsh.nextCallSeq = nextCallSeq; - if (rsh == null) - { - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", performScan rsh is null, UnknownScannerException for scannerId: " + scannerId + ", nextCallSeq was " + nextCallSeq + ", for region " + regionInfo.getRegionNameAsString()); - use = new UnknownScannerException( - "ScannerId: " + scannerId + ", was scanner already closed?, transaction id " + transId + ", nextCallSeq was " + nextCallSeq + ", for region " + regionInfo.getRegionNameAsString()); - } - else - { - rsh.nextCallSeq = nextCallSeq; + if (rsh == null) + { + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", performScan rsh is null, UnknownScannerException for scannerId: " + scannerId + ", nextCallSeq was " + nextCallSeq + ", for region " + regionInfo.getRegionNameAsString()); + use = new UnknownScannerException( + "ScannerId: " + scannerId + ", was scanner already closed?, transaction id " + transId + ", nextCallSeq was " + nextCallSeq + ", for region " + regionInfo.getRegionNameAsString()); + } + else + { + rsh.nextCallSeq = nextCallSeq; - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + ", regionName " + regionInfo.getRegionNameAsString() + + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + ", regionName " + regionInfo.getRegionNameAsString() + ", nextCallSeq " + nextCallSeq + ", rsh.nextCallSeq " + rsh.nextCallSeq + ", close scanner is " + closeScanner); - } + } + } + } + } } - } } - } - org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PerformScanResponse.Builder performResponseBuilder = PerformScanResponse.newBuilder(); performResponseBuilder.setHasMore(hasMore); performResponseBuilder.setNextCallSeq(nextCallSeq); @@ -2167,7 +2191,7 @@ CoprocessorService, Coprocessor { t = e; } - if (mue == null && type == MutationType.PUT && proto.hasRow()) + if ((mue == null && type == MutationType.PUT && proto.hasRow()) && (put != null)) { // Process in local memory try { @@ -2438,6 +2462,7 @@ CoprocessorService, Coprocessor { long transactionId = request.getTransactionId(); T max = null; try { + checkBlockNonPhase2(transactionId); // throws IOException ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); T temp; Scan scan = ProtobufUtil.toScan(request.getScan()); @@ -2493,6 +2518,7 @@ CoprocessorService, Coprocessor { long transactionId = request.getTransactionId(); T min = null; try { + checkBlockNonPhase2(transactionId); // throws IOException ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); T temp; Scan scan = ProtobufUtil.toScan(request.getScan()); @@ -2546,6 +2572,7 @@ CoprocessorService, Coprocessor { long sum = 0l; long transactionId = request.getTransactionId(); try { + checkBlockNonPhase2(transactionId); // throws IOException ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); S sumVal = null; T temp; @@ -2600,6 +2627,7 @@ CoprocessorService, Coprocessor { RegionScanner scanner = null; long transactionId = 0L; try { + checkBlockNonPhase2(transactionId); // throws IOException Scan scan = ProtobufUtil.toScan(request.getScan()); byte[][] colFamilies = scan.getFamilies(); byte[] colFamily = colFamilies != null ? colFamilies[0] : null; @@ -3109,14 +3137,35 @@ CoprocessorService, Coprocessor { this.transactionsById); } - AtomicBoolean closingCheck = (AtomicBoolean)transactionsByIdTestz - .get(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyClosingVar); - if(closingCheck != null) { - this.closing = closingCheck; + AtomicBoolean blockAllCheck = (AtomicBoolean)transactionsByIdTestz + .get(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockAllVar); + if(blockAllCheck != null) { + this.blockAll = blockAllCheck; } else { - transactionsByIdTestz.put(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyClosingVar, - this.closing); + transactionsByIdTestz.put(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockAllVar, + this.blockAll); + } + + + AtomicBoolean blockNonPhase2Check = (AtomicBoolean)transactionsByIdTestz + .get(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockNonPhase2Var); + if(blockNonPhase2Check != null) { + this.blockNonPhase2 = blockNonPhase2Check; + } + else { + transactionsByIdTestz.put(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockNonPhase2Var, + this.blockNonPhase2); + } + + AtomicBoolean newTransCheck = (AtomicBoolean)transactionsByIdTestz + .get(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockNewTransVar); + if(newTransCheck != null) { + this.blockNewTrans = newTransCheck; + } + else { + transactionsByIdTestz.put(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockNewTransVar, + this.blockNewTrans); } ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scannersCheck = (ConcurrentHashMap<Long,TransactionalRegionScannerHolder>)transactionsByIdTestz @@ -3146,18 +3195,49 @@ CoprocessorService, Coprocessor { } // Internal support methods + /** + * Checks if the region is closing and needs to block all activity + * @param long transactionId + * @return String + * @throws IOException + */ + private void checkBlockAll(final long transactionId) throws IOException { + if (blockAll.get()) { + if(LOG.isWarnEnabled()) LOG.warn("TrxRegionEndpoint coprocessor: checkBlockAll - txId " + transactionId + ", No Transactional activity allowed."); + throw new IOException("closing region, no more transactional activity allowed. Region: " + regionInfo.getRegionNameAsString()); + } + } /** - * Checks if the region is closing + * Checks if the region is closing and needs to block non phase 2 activity * @param long transactionId * @return String * @throws IOException */ - private void checkClosing(final long transactionId) throws IOException { - if (closing.get()) { - if(LOG.isWarnEnabled()) LOG.warn("TrxRegionEndpoint coprocessor: checkClosing - txId " + transactionId + ", Trafodion Recovery: Raising exception. no more new transactions allowed."); - throw new IOException("closing region, no more new transactions allowed. Region: " + regionInfo.getRegionNameAsString()); + private void checkBlockNonPhase2(final long transactionId) throws IOException { + if (blockNonPhase2.get()) { + if(LOG.isWarnEnabled()) LOG.warn("TrxRegionEndpoint coprocessor: checkBlockNonPhase2 - txId " + transactionId + ", No Transactional activity allowed."); + throw new IOException("closing region, no more non phase 2 transactional activity allowed. Region: " + regionInfo.getRegionNameAsString()); } + + // sometimes we only set the most sever in which case we always need to check the higher up levels + checkBlockAll(transactionId); + } + + /** + * Checks if new transactions are disabled + * @param long transactionId + * @return String + * @throws IOException + */ + private void checkBlockNewTrans(final long transactionId) throws IOException { + if (blockNewTrans.get()) { + if(LOG.isWarnEnabled()) LOG.warn("TrxRegionEndpoint coprocessor: checkNewTrans - txId " + transactionId + ", No more new transactions allowed."); + throw new IOException("closing region, no more new transactions allowed. Region: " + regionInfo.getRegionNameAsString()); + } + + // sometimes we only set the most sever in which case we always need to check the higher up levels + checkBlockNonPhase2(transactionId); } /** @@ -3784,6 +3864,9 @@ CoprocessorService, Coprocessor { public void delete(final long transactionId, final Delete delete) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: delete -- ENTRY txId: " + transactionId); + + checkBlockNonPhase2(transactionId); + if(this.checkRowBelongs) checkRow(delete.getRow(), "Delete"); TrxTransactionState state = this.beginTransIfNotExist(transactionId); @@ -3965,6 +4048,8 @@ CoprocessorService, Coprocessor { } */ + checkBlockNonPhase2(transactionId); + Scan scan = new Scan(get); List<Cell> results = new ArrayList<Cell>(); @@ -4078,6 +4163,9 @@ CoprocessorService, Coprocessor { public void put(final long transactionId, final Put put) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("Enter TrxRegionEndpoint coprocessor: put, txid: " + transactionId); + + checkBlockNonPhase2(transactionId); + if(this.checkRowBelongs) checkRow(put.getRow(),"Put"); TrxTransactionState state = this.beginTransIfNotExist(transactionId); @@ -4213,6 +4301,8 @@ CoprocessorService, Coprocessor { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: beginTransaction -- ENTRY txId: " + transactionId); + checkBlockNonPhase2(transactionId); + // TBD until integration with recovery if (reconstructIndoubts == 0) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: RECOV beginTransaction -- ENTRY txId: " + transactionId); @@ -4329,7 +4419,7 @@ CoprocessorService, Coprocessor { if (LOG.isTraceEnabled()) LOG.trace("Enter TrxRegionEndpoint coprocessor: beginTransIfNotExist, txid: " + transactionId + ", regionName " + regionInfo.getRegionNameAsString() + " transactionsById size: " + transactionsById.size()); - checkClosing(transactionId); + checkBlockNewTrans(transactionId); String key = getTransactionalUniqueId(transactionId); synchronized (transactionsById) { @@ -4366,6 +4456,9 @@ CoprocessorService, Coprocessor { " ignoreUnknownTransaction: " + ignoreUnknownTransactionException); CommitProgress commitStatus = CommitProgress.NONE; TrxTransactionState state; + + checkBlockAll(transactionId); + try { state = getTransactionState(transactionId); } catch (UnknownTransactionException e) { @@ -4433,6 +4526,9 @@ CoprocessorService, Coprocessor { String lv_regionName = new String(m_Region.getRegionInfo().getRegionNameAsString()); if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest -- ENTRY txId: " + transactionId + " participantNum " + participantNum + ", regionName " + lv_regionName); + + checkBlockNonPhase2(transactionId); + TrxTransactionState state; int lv_totalCommits = 0; @@ -4713,6 +4809,8 @@ CoprocessorService, Coprocessor { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: abort transactionId: " + transactionId + " " + m_Region.getRegionInfo().getRegionNameAsString()); + checkBlockNonPhase2(transactionId); + TrxTransactionState state; try { state = getTransactionState(transactionId); @@ -4830,6 +4928,9 @@ CoprocessorService, Coprocessor { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: " + transactionId); + + checkBlockNonPhase2(transactionId); + int status = commitRequest(transactionId, participantNum); if (status == COMMIT_OK) { @@ -5346,6 +5447,12 @@ CoprocessorService, Coprocessor { } } public void flushToFS(Path flushPath) throws IOException { + + if(LOG.isTraceEnabled()) LOG.trace("flushToFS -- ENTRY, Path: " + flushPath.toString()); + + if(LOG.isTraceEnabled()) LOG.trace("flushToFS -- transactionsById (" + transactionsById.size() + "), commitedTransactionsBySequenceNumber (" + commitedTransactionsBySequenceNumber.size() + ")"); + + TransactionPersist.Builder txnPersistBuilder = TransactionPersist.newBuilder(); fs.delete(flushPath, true); @@ -5356,16 +5463,22 @@ CoprocessorService, Coprocessor { Map<Long, TrxTransactionState> transactionMap = new HashMap<Long, TrxTransactionState>(); - for(TrxTransactionState ts : transactionsById.values()) { - transactionMap.put(ts.getTransactionId(), ts); - txnPersistBuilder.addTxById(ts.getTransactionId()); + synchronized (transactionsById) { + for(TrxTransactionState ts : transactionsById.values()) { + transactionMap.put(ts.getTransactionId(), ts); + txnPersistBuilder.addTxById(ts.getTransactionId()); + } } - for(Map.Entry<Long, TrxTransactionState> entry : - commitedTransactionsBySequenceNumber.entrySet()) { - transactionMap.put(entry.getValue().getTransactionId(), entry.getValue()); - txnPersistBuilder.addSeqNoListSeq(entry.getKey()); - txnPersistBuilder.addSeqNoListTxn(entry.getValue().getTransactionId()); + + synchronized (commitedTransactionsBySequenceNumber) { + for(Map.Entry<Long, TrxTransactionState> entry : + commitedTransactionsBySequenceNumber.entrySet()) { + transactionMap.put(entry.getValue().getTransactionId(), entry.getValue()); + txnPersistBuilder.addSeqNoListSeq(entry.getKey()); + txnPersistBuilder.addSeqNoListTxn(entry.getValue().getTransactionId()); + } } + for(TrxTransactionState ts : transactionMap.values()) { for(TrxTransactionState ts2 : ts.getTransactionsToCheck()) { transactionMap.put(ts.getTransactionId(), ts); @@ -5505,7 +5618,13 @@ CoprocessorService, Coprocessor { } } transactionsById.put(key, ts); - transactionLeases.createLease(key, transactionLeaseTimeout, new TransactionLeaseListener(txid)); + + try { + transactionLeases.createLease(key, transactionLeaseTimeout, new TransactionLeaseListener(txid)); + } catch (LeaseStillHeldException e) { + transactionLeases.renewLease(key); + } + } else { TrxTransactionState tsEntry = new TrxTransactionState(txid, @@ -5536,8 +5655,27 @@ CoprocessorService, Coprocessor { } - public void setClosing(boolean value) { - closing.set(value); + public void setBlockAll(boolean value) { + + blockAll.set(value); + + // for safety + if (value == true) { + blockNewTrans.set(value); + blockNonPhase2.set(value); + } + } + + public void setBlockNonPhase2(boolean value) { + blockNonPhase2.set(value); + + // for safety + if (value == true) + blockNewTrans.set(value); } + + public void setNewTrans(boolean value) { + blockNewTrans.set(value); + } } //1} http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/2b46c9c0/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java index ddf4f4e..6b0dd78 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java @@ -92,7 +92,9 @@ public static final String trxkeycommitedTransactionsBySequenceNumber = "commite public static final String trxkeycommitPendingTransactions = "commitPendingTransactions"; public static final String trxkeypendingTransactionsById = "pendingTransactionsById"; public static final String trxkeyindoubtTransactionsCountByTmid = "indoubtTransactionsCountByTmid"; -public static final String trxkeyClosingVar = "checkClosingVariable"; +public static final String trxkeyCheckBlockAllVar = "checkBlockAllVar"; +public static final String trxkeyCheckBlockNonPhase2Var = "checkBlockNonPhase2Var"; +public static final String trxkeyCheckBlockNewTransVar = "checkBlockNewTransVar"; public static final String trxkeyScanners = "trxScanners"; public static final String SPLIT_DELAY_NOFLUSH = "hbase.transaction.split.delay.noflush"; @@ -127,7 +129,9 @@ static ConcurrentHashMap<String, Object> trxRegionMap; private ConcurrentHashMap<String, TrxTransactionState> transactionsById = new ConcurrentHashMap<String, TrxTransactionState>(); private Set<TrxTransactionState> commitPendingTransactions = Collections.synchronizedSet(new HashSet<TrxTransactionState>()); -private AtomicBoolean closing = new AtomicBoolean(false); +private AtomicBoolean blockAll = new AtomicBoolean(false); +private AtomicBoolean blockNonPhase2 = new AtomicBoolean(false); +private AtomicBoolean blockNewTrans = new AtomicBoolean(false); private boolean hasClosed = false; private boolean hasFlushed = false; @@ -255,15 +259,32 @@ public void start(CoprocessorEnvironment e) throws IOException { this.commitPendingTransactions); } - AtomicBoolean closingCheck = (AtomicBoolean)transactionsRefMap - .get(regionName+trxkeyClosingVar); - if(closingCheck != null) { - this.closing = closingCheck; + AtomicBoolean blockAllCheck = (AtomicBoolean)transactionsRefMap + .get(regionName+trxkeyCheckBlockAllVar); + if(blockAllCheck != null) { + this.blockAll = blockAllCheck; + } + else { + transactionsRefMap.put(regionName+trxkeyCheckBlockAllVar, this.blockAll); + } + AtomicBoolean blockNonPhase2Check = (AtomicBoolean)transactionsRefMap + .get(regionName+trxkeyCheckBlockNonPhase2Var); + if(blockNonPhase2Check != null) { + this.blockNonPhase2 = blockNonPhase2Check; } else { - transactionsRefMap.put(regionName+trxkeyClosingVar, this.closing); + transactionsRefMap.put(regionName+trxkeyCheckBlockNonPhase2Var, this.blockNonPhase2); } + AtomicBoolean blockNewTransCheck = (AtomicBoolean)transactionsRefMap + .get(regionName+trxkeyCheckBlockNewTransVar); + if(blockNewTransCheck != null) { + this.blockNewTrans = blockNewTransCheck; + } + else { + transactionsRefMap.put(regionName+trxkeyCheckBlockNewTransVar,this.blockNewTrans); + } + @SuppressWarnings("unchecked") ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scannersCheck = (ConcurrentHashMap<Long,TransactionalRegionScannerHolder>)transactionsRefMap @@ -621,25 +642,31 @@ public void createRecoveryzNode(int node, String encodedName, byte [] data) thro public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow) throws IOException { if(LOG.isTraceEnabled()) LOG.trace("preSplit -- ENTRY region: " + regionInfo.getRegionNameAsString()); + if(LOG.isTraceEnabled()) LOG.trace("preSplit -- transactionsById (" + transactionsById.size() + " ), commitPendingTransactions (" + commitPendingTransactions.size() +"), scanners (" + scanners.size() + ")"); + + blockNewTrans.set(true); + if(splitDelayNoFlush) { if(!this.earlyDrain) - sbHelper.activeWait(transactionsById, activeDelayLen, splitDelayLimit); - closing.set(true); - sbHelper.pendingWait(commitPendingTransactions, pendingDelayLen); + sbHelper.activeWait(transactionsById, activeDelayLen, splitDelayLimit); + sbHelper.pendingWait(commitPendingTransactions, pendingDelayLen); } else { + blockNonPhase2.set(true); sbHelper.pendingAndScannersWait(commitPendingTransactions, scanners, pendingDelayLen); - closing.set(true); sbHelper.setSplit(); } + blockAll.set(true); if(LOG.isTraceEnabled()) LOG.trace("preSplit -- EXIT region: " + regionInfo.getRegionNameAsString()); } @Override public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r) { + if(LOG.isTraceEnabled()) LOG.trace("postSplit -- ENTRY"); + if(splitDelayNoFlush) return; @@ -652,15 +679,20 @@ public void createRecoveryzNode(int node, String encodedName, byte [] data) thro } else { try { - treL.setClosing(true); - treR.setClosing(true); + // don't need to set NewTrans flag because blockNonPhase2 will catch up + treL.setBlockAll(true); + treR.setBlockAll(true); Thread readThread = new Thread(new TxnReadThread(treL, sbHelper.getPath(), true)); readThread.start(); //treL.readTxnInfo(sbHelper.getPath(), true); treR.readTxnInfo(sbHelper.getPath(), true); readThread.join(); - treL.setClosing(false); - treR.setClosing(false); + treL.setBlockAll(false); + treR.setBlockAll(false); + treL.setBlockNonPhase2(false); + treR.setBlockNonPhase2(false); + treL.setNewTrans(false); + treR.setNewTrans(false); sbHelper.clearSplit(); } catch (IOException ioe) { if(LOG.isErrorEnabled()) LOG.error("Unable to read Transaction Info for transactional split coordination: " + ioe); @@ -679,7 +711,11 @@ public void createRecoveryzNode(int node, String encodedName, byte [] data) thro c.getEnvironment().getRegionServerServices().isStopped()) return; + if(LOG.isTraceEnabled()) LOG.trace("preClose -- commitPendingTransactions (" + commitPendingTransactions.size() +")"); + if (!hasClosed) { + blockNonPhase2.set(true); + if(LOG.isInfoEnabled()) { HRegion region = c.getEnvironment().getRegion(); LOG.debug("preClose -- setting close var to true on: " + region.getRegionNameAsString()); @@ -689,7 +725,7 @@ public void createRecoveryzNode(int node, String encodedName, byte [] data) thro } catch(IOException ioe) { LOG.error("Encountered exception when calling pendingAndScannersWait(): " + ioe); } - closing.set(true); + blockAll.set(true); hasClosed = true; } @@ -728,7 +764,9 @@ public void createRecoveryzNode(int node, String encodedName, byte [] data) thro transactionsRefMap.remove(regionName+trxkeyindoubtTransactionsCountByTmid); transactionsRefMap.remove(regionName+trxkeytransactionsById); transactionsRefMap.remove(regionName+trxkeycommitPendingTransactions); - transactionsRefMap.remove(regionName+trxkeyClosingVar); + transactionsRefMap.remove(regionName+trxkeyCheckBlockAllVar); + transactionsRefMap.remove(regionName+trxkeyCheckBlockNonPhase2Var); + transactionsRefMap.remove(regionName+trxkeyCheckBlockNewTransVar); transactionsRefMap.remove(regionName+trxkeyScanners); }
