Repository: incubator-trafodion
Updated Branches:
  refs/heads/master ff4b7dd73 -> ed98ed849


TRAFODION-1648


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/2b46c9c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/2b46c9c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/2b46c9c0

Branch: refs/heads/master
Commit: 2b46c9c008d00bab539ecbf7f52cf32c173bf561
Parents: 4550081
Author: Trina Krug <[email protected]>
Authored: Thu Jan 21 21:46:34 2016 +0000
Committer: Trina Krug <[email protected]>
Committed: Thu Jan 21 21:46:34 2016 +0000

----------------------------------------------------------------------
 .../transactional/TrxRegionEndpoint.java        | 318 +++++++++++++------
 .../transactional/TrxRegionObserver.java        |  72 ++++-
 2 files changed, 283 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/2b46c9c0/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
index 583fca2..804afdf 100755
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
@@ -328,7 +328,9 @@ CoprocessorService, Coprocessor {
   private FileSystem fs = null;
   private RegionCoprocessorHost rch = null;
   private HLog tHLog = null;
-  private AtomicBoolean closing = new AtomicBoolean(false);
+  private AtomicBoolean blockAll = new AtomicBoolean(false);
+  private AtomicBoolean blockNonPhase2 = new AtomicBoolean(false);
+  private AtomicBoolean blockNewTrans = new AtomicBoolean(false);
   private boolean configuredEarlyLogging = false;
   private boolean configuredConflictReinstate = false;
   private static Object zkRecoveryCheckLock = new Object();
@@ -1063,7 +1065,7 @@ CoprocessorService, Coprocessor {
       }
 
       // Process in local memory
-      if (delete != null && t == null)
+      if ((delete != null) && (t == null))
       {
         if (request.hasRow()) {
 
@@ -1187,7 +1189,7 @@ CoprocessorService, Coprocessor {
       }
 
       // Process in local memory
-      if (put != null)
+      if ((put != null) && (t == null))
       {
         if (request.hasRow()) {
           row = request.getRow();
@@ -1308,6 +1310,8 @@ CoprocessorService, Coprocessor {
 
     if (oop == null) {
       try {
+         // we want to allow closing scaners and remove operations up until 
the very end.
+         checkBlockAll(transId);
          scanner = removeScanner(scannerId);
 
          if (scanner != null) { 
@@ -1425,6 +1429,7 @@ CoprocessorService, Coprocessor {
          if (type == MutationType.DELETE && proto.hasRow())
          {
            try {
+               checkBlockNonPhase2(transactionId); // throws IOException
                delete = ProtobufUtil.toDelete(proto);
            } catch (Throwable e) {
              if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor:deleteMultiple - txId " + transactionId + ", Caught exception after 
protobuf conversion delete"
@@ -1433,7 +1438,7 @@ CoprocessorService, Coprocessor {
            }
 
            // Process in local memory
-           if (delete != null)
+           if ((delete != null) && (t == null))
            {
              try {
                delete(transactionId, delete);
@@ -1523,6 +1528,7 @@ CoprocessorService, Coprocessor {
     else
     {
       try {
+          checkBlockNonPhase2(transactionId); // throws IOException
           delete = ProtobufUtil.toDelete(proto); 
       } catch (Throwable e) {
         if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor:delete - txId " + transactionId + ", Caught exception " + 
e.getMessage() + " " + stackTraceToString(e));
@@ -1530,17 +1536,19 @@ CoprocessorService, Coprocessor {
       }
 
       // Process in local memory
-      try {
-        delete(transactionId, delete);
-      } catch (Throwable e) {
-        if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor:delete - txId " + transactionId + ", Caught exception after 
internal delete - "
-             + e.getMessage() + " " + stackTraceToString(e));
-        t = e;
-      }
+     if ((delete != null) && (t == null))
+     {
+          try {
+            delete(transactionId, delete);
+          } catch (Throwable e) {
+            if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor:delete - txId " + transactionId + ", Caught exception after 
internal delete - "
+                 + e.getMessage() + " " + stackTraceToString(e));
+            t = e;
+          }
 
-      if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
delete - txId "  + transactionId + ", regionName " + 
regionInfo.getRegionNameAsString() + ", type " + type + ", row " + 
Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex " + 
Hex.encodeHexString(proto.getRow().toByteArray()));
+          if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
delete - txId "  + transactionId + ", regionName " + 
regionInfo.getRegionNameAsString() + ", type " + type + ", row " + 
Bytes.toStringBinary(proto.getRow().toByteArray()) + ", row in hex " + 
Hex.encodeHexString(proto.getRow().toByteArray()));
+        }
     }
-
     
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.DeleteTransactionalResponse.Builder
 deleteTransactionalResponseBuilder = DeleteTransactionalResponse.newBuilder();
 
     deleteTransactionalResponseBuilder.setHasException(false);
@@ -1584,6 +1592,7 @@ CoprocessorService, Coprocessor {
     WrongRegionException wre = null;
     org.apache.hadoop.hbase.client.Result result2 = null;
     long transactionId = request.getTransactionId();
+    boolean exceptionThrown = false;
 
     /* commenting it out for the time-being
     java.lang.String name = ((com.google.protobuf.ByteString) 
request.getRegionName()).toStringUtf8();
@@ -1605,59 +1614,65 @@ CoprocessorService, Coprocessor {
         else {
           if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
get - performing memoryPercentage " + memoryPercentage + ", generating memory 
usage exceeds indicated percentage exception");
           mue = new MemoryUsageException("get memory usage exceeds " + 
memoryUsageThreshold + " percent, trxId is " + transactionId);
+          exceptionThrown = true;
         }
       }
       else
       {
         try {
+          checkBlockNonPhase2(transactionId); // throws IOException
           get = ProtobufUtil.toGet(proto);
         } catch (Throwable e) {
           if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor:get - txId " + transactionId + ", Caught exception " + 
e.getMessage() + " " + stackTraceToString(e));
           t = e;
+          exceptionThrown = true;
         }
 
-        Scan scan = new Scan(get);
-        List<Cell> results = new ArrayList<Cell>();
+        if (exceptionThrown == false) 
+        {
+            Scan scan = new Scan(get);
+            List<Cell> results = new ArrayList<Cell>();
 
-        try {
+            try {
         
-          if (LOG.isTraceEnabled()) {
-            byte[] row = proto.getRow().toByteArray();
-            byte[] getrow = get.getRow();
-            String rowKey = Bytes.toString(row);
-            String getRowKey = Bytes.toString(getrow);
+              if (LOG.isTraceEnabled()) {
+                byte[] row = proto.getRow().toByteArray();
+                byte[] getrow = get.getRow();
+                String rowKey = Bytes.toString(row);
+                String getRowKey = Bytes.toString(getrow);
 
-            LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + 
transactionId + ", Calling getScanner for regionName " + 
regionInfo.getRegionNameAsString() + ", row = " + Bytes.toStringBinary(row) + 
", row in hex " + Hex.encodeHexString(row) + ", getrow = " + 
Bytes.toStringBinary(getrow) + ", getrow in hex " + 
Hex.encodeHexString(getrow));
-          }
+                LOG.trace("TrxRegionEndpoint coprocessor: get - txId " + 
transactionId + ", Calling getScanner for regionName " + 
regionInfo.getRegionNameAsString() + ", row = " + Bytes.toStringBinary(row) + 
", row in hex " + Hex.encodeHexString(row) + ", getrow = " + 
Bytes.toStringBinary(getrow) + ", getrow in hex " + 
Hex.encodeHexString(getrow));
+              }
 
-          scanner = getScanner(transactionId, scan);
+              scanner = getScanner(transactionId, scan);
 
-          if (scanner != null)
-            scanner.next(results);
+              if (scanner != null)
+                scanner.next(results);
          
-          result2 = Result.create(results);
-
-          if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
get - txId " + transactionId + ", getScanner result2 isEmpty is " 
+              result2 = Result.create(results);
+  
+              if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor: get - txId " + transactionId + ", getScanner result2 isEmpty is " 
                   + result2.isEmpty() 
                   + ", row " 
                   + Bytes.toStringBinary(result2.getRow())
                   + " result length: "
                   + result2.size()); 
 
-        } catch(Throwable e) {
-          if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
get - txId " + transactionId + ", Caught exception " + e.getMessage() + " " + 
stackTraceToString(e));
-          t = e;
-        }
-        finally {
-          if (scanner != null) {
-            try {
-              scanner.close();
-            } catch(Exception e) {
+            } catch(Throwable e) {
               if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor: get - txId " + transactionId + ", Caught exception " + 
e.getMessage() + " " + stackTraceToString(e));
-              ge = e;
+              t = e;
             }
-          }
-        }
+            finally {
+              if (scanner != null) {
+                try {
+                  scanner.close();
+                } catch(Exception e) {
+                  if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor: get - txId " + transactionId + ", Caught exception " + 
e.getMessage() + " " + stackTraceToString(e));
+                  ge = e;
+                }
+              }
+            }
+         } // ExceptionThrown
       } // End of MemoryUsageCheck
   //}  // End of WrongRegionCheck
 
@@ -1822,6 +1837,7 @@ CoprocessorService, Coprocessor {
           } catch (IOException e) {
             LOG.error("TrxRegionEndpoint coprocessor: openScanner - txId " + 
transId + ", getScanner Error opening scanner, " + e.toString());
             exceptionThrown = true;
+            ioe = e;
           }
         }
 
@@ -1898,6 +1914,8 @@ CoprocessorService, Coprocessor {
     boolean shouldContinue = true;
     TransactionalRegionScannerHolder rsh = null;
 
+    boolean exceptionThrown = false;
+
     if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
performScan - txId " + transId + ", scanner id " + scannerId + ", numberOfRows 
" + numberOfRows + ", nextCallSeq " + nextCallSeq + ", closeScanner is " + 
closeScanner + ", region is " + regionInfo.getRegionNameAsString());
 
     /* commenting it out for the time-being
@@ -1976,12 +1994,15 @@ CoprocessorService, Coprocessor {
        } catch(OutOfOrderScannerNextException ooone) {
          if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
performScan - txId " + transId + ", scanner id " + scannerId + " Caught 
OutOfOrderScannerNextException  " + ooone.getMessage() + " " + 
stackTraceToString(ooone));
          ooo = ooone;
+         exceptionThrown = true;
        } catch(ScannerTimeoutException cste) {
          if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
performScan - txId " + transId + ", scanner id " + scannerId + " Caught 
ScannerTimeoutException  " + cste.getMessage() + " " + 
stackTraceToString(cste));
          ste = cste;
+         exceptionThrown = true;
        } catch(Throwable e) {
          if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
performScan - txId " + transId + ", scanner id " + scannerId + " Caught 
throwable exception " + e.getMessage() + " " + stackTraceToString(e));
          t = e;
+         exceptionThrown = true;
        }
        finally {
          if (scanner != null) {
@@ -2002,43 +2023,46 @@ CoprocessorService, Coprocessor {
            } catch(Exception e) {
              if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor: performScan -  transaction id " + transId + ", Caught general 
exception " + e.getMessage() + " " + stackTraceToString(e));
              ne = e;
+             exceptionThrown = true;
            }
          }
        }
 
-       rsh = scanners.get(scannerId);
+       if (exceptionThrown == false)
+       {
+           rsh = scanners.get(scannerId);
 
-       nextCallSeq++;
+           nextCallSeq++;
  
-       if (rsh == null)
-       {
-        if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
performScan rsh is null");
-          use =  new UnknownScannerException(
-            "ScannerId: " + scannerId + ", already closed?");
-       }
-       else
-       {
-         rsh.nextCallSeq = nextCallSeq;
+           if (rsh == null)
+           {
+            if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor: performScan rsh is null");
+              use =  new UnknownScannerException(
+                "ScannerId: " + scannerId + ", already closed?");
+           }
+           else
+           {
+             rsh.nextCallSeq = nextCallSeq;
 
-       if (rsh == null)
-       {
-        if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
performScan - txId " + transId + ", performScan rsh is null, 
UnknownScannerException for scannerId: " + scannerId + ", nextCallSeq was " + 
nextCallSeq + ", for region " + regionInfo.getRegionNameAsString());
-          use =  new UnknownScannerException(
-             "ScannerId: " + scannerId + ", was scanner already closed?, 
transaction id " + transId + ", nextCallSeq was " + nextCallSeq + ", for region 
" + regionInfo.getRegionNameAsString());
-       }
-       else
-       {
-         rsh.nextCallSeq = nextCallSeq;
+           if (rsh == null)
+           {
+            if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor: performScan - txId " + transId + ", performScan rsh is null, 
UnknownScannerException for scannerId: " + scannerId + ", nextCallSeq was " + 
nextCallSeq + ", for region " + regionInfo.getRegionNameAsString());
+              use =  new UnknownScannerException(
+                 "ScannerId: " + scannerId + ", was scanner already closed?, 
transaction id " + transId + ", nextCallSeq was " + nextCallSeq + ", for region 
" + regionInfo.getRegionNameAsString());
+           }
+           else
+           {
+             rsh.nextCallSeq = nextCallSeq;
 
-         if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
performScan - txId " + transId + ", scanner id " + scannerId + ", regionName " 
+ regionInfo.getRegionNameAsString() +
+             if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
coprocessor: performScan - txId " + transId + ", scanner id " + scannerId + ", 
regionName " + regionInfo.getRegionNameAsString() +
 ", nextCallSeq " + nextCallSeq + ", rsh.nextCallSeq " + rsh.nextCallSeq + ", 
close scanner is " + closeScanner);
 
-      }
+          }
+         }
+        }
+       }
      }
-    }
    }
-     }
-
    
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PerformScanResponse.Builder
 performResponseBuilder = PerformScanResponse.newBuilder();
    performResponseBuilder.setHasMore(hasMore);
    performResponseBuilder.setNextCallSeq(nextCallSeq);
@@ -2167,7 +2191,7 @@ CoprocessorService, Coprocessor {
           t = e;
         }
 
-      if (mue == null && type == MutationType.PUT && proto.hasRow())
+      if ((mue == null && type == MutationType.PUT && proto.hasRow()) && (put 
!= null))
       {
         // Process in local memory
         try {   
@@ -2438,6 +2462,7 @@ CoprocessorService, Coprocessor {
     long transactionId = request.getTransactionId();
     T max = null;
     try {
+      checkBlockNonPhase2(transactionId); // throws IOException
       ColumnInterpreter<T, S, P, Q, R> ci = 
constructColumnInterpreterFromRequest(request);
       T temp;
       Scan scan = ProtobufUtil.toScan(request.getScan());
@@ -2493,6 +2518,7 @@ CoprocessorService, Coprocessor {
     long transactionId = request.getTransactionId();
     T min = null;
     try {
+      checkBlockNonPhase2(transactionId); // throws IOException
       ColumnInterpreter<T, S, P, Q, R> ci = 
constructColumnInterpreterFromRequest(request);
       T temp;
       Scan scan = ProtobufUtil.toScan(request.getScan());
@@ -2546,6 +2572,7 @@ CoprocessorService, Coprocessor {
     long sum = 0l;
     long transactionId = request.getTransactionId();
     try {
+      checkBlockNonPhase2(transactionId); // throws IOException
       ColumnInterpreter<T, S, P, Q, R> ci = 
constructColumnInterpreterFromRequest(request);
       S sumVal = null;
       T temp;
@@ -2600,6 +2627,7 @@ CoprocessorService, Coprocessor {
     RegionScanner scanner = null;
     long transactionId = 0L;
     try {
+      checkBlockNonPhase2(transactionId); // throws IOException
       Scan scan = ProtobufUtil.toScan(request.getScan());
       byte[][] colFamilies = scan.getFamilies();
       byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
@@ -3109,14 +3137,35 @@ CoprocessorService, Coprocessor {
                                   this.transactionsById);
     }
 
-    AtomicBoolean closingCheck = (AtomicBoolean)transactionsByIdTestz
-            
.get(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyClosingVar);
-    if(closingCheck != null) {
-        this.closing = closingCheck;
+    AtomicBoolean blockAllCheck = (AtomicBoolean)transactionsByIdTestz
+            
.get(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockAllVar);
+    if(blockAllCheck != null) {
+        this.blockAll = blockAllCheck;
     }
     else {
-        
transactionsByIdTestz.put(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyClosingVar,
-                                  this.closing);
+        
transactionsByIdTestz.put(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockAllVar,
+                                  this.blockAll);
+    }
+   
+
+    AtomicBoolean blockNonPhase2Check = (AtomicBoolean)transactionsByIdTestz
+            
.get(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockNonPhase2Var);
+    if(blockNonPhase2Check != null) {
+        this.blockNonPhase2 = blockNonPhase2Check;
+    }
+    else {
+        
transactionsByIdTestz.put(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockNonPhase2Var,
+                                  this.blockNonPhase2);
+    }
+   
+    AtomicBoolean newTransCheck = (AtomicBoolean)transactionsByIdTestz
+            
.get(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockNewTransVar);
+    if(newTransCheck != null) {
+        this.blockNewTrans = newTransCheck;
+    }
+    else {
+        
transactionsByIdTestz.put(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyCheckBlockNewTransVar,
+                                  this.blockNewTrans);
     }
     ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scannersCheck =
         
(ConcurrentHashMap<Long,TransactionalRegionScannerHolder>)transactionsByIdTestz
@@ -3146,18 +3195,49 @@ CoprocessorService, Coprocessor {
   }
 
   // Internal support methods
+   /**
+   * Checks if the region is closing and needs to block all activity
+   * @param long transactionId
+   * @return String 
+   * @throws IOException 
+   */
+  private void checkBlockAll(final long transactionId) throws IOException {
+    if (blockAll.get()) {
+      if(LOG.isWarnEnabled()) LOG.warn("TrxRegionEndpoint coprocessor: 
checkBlockAll - txId " + transactionId + ", No Transactional activity 
allowed.");
+      throw new IOException("closing region, no more transactional activity 
allowed. Region: " + regionInfo.getRegionNameAsString());
+    }
+  }
 
   /**
-   * Checks if the region is closing
+   * Checks if the region is closing and needs to block non phase 2 activity
    * @param long transactionId
    * @return String 
    * @throws IOException 
    */
-  private void checkClosing(final long transactionId) throws IOException {
-    if (closing.get()) {
-      if(LOG.isWarnEnabled()) LOG.warn("TrxRegionEndpoint coprocessor: 
checkClosing - txId " + transactionId + ", Trafodion Recovery: Raising 
exception. no more new transactions allowed.");
-      throw new IOException("closing region, no more new transactions allowed. 
Region: " + regionInfo.getRegionNameAsString());
+  private void checkBlockNonPhase2(final long transactionId) throws 
IOException {
+    if (blockNonPhase2.get()) {
+      if(LOG.isWarnEnabled()) LOG.warn("TrxRegionEndpoint coprocessor: 
checkBlockNonPhase2 - txId " + transactionId + ", No Transactional activity 
allowed.");
+      throw new IOException("closing region, no more non phase 2 transactional 
activity allowed. Region: " + regionInfo.getRegionNameAsString());
     }
+    
+    // sometimes we only set the most sever in which case we always need to 
check the higher up levels
+    checkBlockAll(transactionId);
+  }
+
+  /**
+   * Checks if new transactions are disabled
+   * @param long transactionId
+   * @return String 
+   * @throws IOException 
+   */
+  private void checkBlockNewTrans(final long transactionId) throws IOException 
{
+    if (blockNewTrans.get()) {
+      if(LOG.isWarnEnabled()) LOG.warn("TrxRegionEndpoint coprocessor: 
checkNewTrans - txId " + transactionId + ", No more new transactions allowed.");
+       throw new IOException("closing region, no more new transactions 
allowed. Region: " + regionInfo.getRegionNameAsString());
+     }
+
+    // sometimes we only set the most sever in which case we always need to 
check the higher up levels
+    checkBlockNonPhase2(transactionId);
   }
 
   /**
@@ -3784,6 +3864,9 @@ CoprocessorService, Coprocessor {
   public void delete(final long transactionId, final Delete delete)
     throws IOException {
     if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: delete 
-- ENTRY txId: " + transactionId);
+  
+    checkBlockNonPhase2(transactionId);
+
     if(this.checkRowBelongs)
       checkRow(delete.getRow(), "Delete");
     TrxTransactionState state = this.beginTransIfNotExist(transactionId);
@@ -3965,6 +4048,8 @@ CoprocessorService, Coprocessor {
     }
     */
 
+    checkBlockNonPhase2(transactionId);
+
     Scan scan = new Scan(get);
     List<Cell> results = new ArrayList<Cell>();
 
@@ -4078,6 +4163,9 @@ CoprocessorService, Coprocessor {
   public void put(final long transactionId, final Put put)
     throws IOException {
     if (LOG.isTraceEnabled()) LOG.trace("Enter TrxRegionEndpoint coprocessor: 
put, txid: " + transactionId);
+
+    checkBlockNonPhase2(transactionId);
+
     if(this.checkRowBelongs)
       checkRow(put.getRow(),"Put");
     TrxTransactionState state = this.beginTransIfNotExist(transactionId);
@@ -4213,6 +4301,8 @@ CoprocessorService, Coprocessor {
 
     if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
beginTransaction -- ENTRY txId: " + transactionId);
 
+    checkBlockNonPhase2(transactionId);
+
     // TBD until integration with recovery 
     if (reconstructIndoubts == 0) {
        if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
RECOV beginTransaction -- ENTRY txId: " + transactionId);
@@ -4329,7 +4419,7 @@ CoprocessorService, Coprocessor {
     if (LOG.isTraceEnabled()) LOG.trace("Enter TrxRegionEndpoint coprocessor: 
beginTransIfNotExist, txid: "
             + transactionId + ", regionName " + 
regionInfo.getRegionNameAsString()
             + " transactionsById size: " + transactionsById.size());
-    checkClosing(transactionId);
+    checkBlockNewTrans(transactionId);
 
     String key = getTransactionalUniqueId(transactionId);
     synchronized (transactionsById) {
@@ -4366,6 +4456,9 @@ CoprocessorService, Coprocessor {
               " ignoreUnknownTransaction: " + 
ignoreUnknownTransactionException);
     CommitProgress commitStatus = CommitProgress.NONE;
     TrxTransactionState state;
+
+    checkBlockAll(transactionId);
+
     try {
       state = getTransactionState(transactionId);
     } catch (UnknownTransactionException e) {
@@ -4433,6 +4526,9 @@ CoprocessorService, Coprocessor {
     String lv_regionName = new 
String(m_Region.getRegionInfo().getRegionNameAsString());
     if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: 
commitRequest -- ENTRY txId: "
                + transactionId + " participantNum " + participantNum + ", 
regionName " + lv_regionName);
+
+    checkBlockNonPhase2(transactionId);
+
     TrxTransactionState state;
 
     int lv_totalCommits = 0;
@@ -4713,6 +4809,8 @@ CoprocessorService, Coprocessor {
 
     if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: abort 
transactionId: " + transactionId + " " + 
m_Region.getRegionInfo().getRegionNameAsString());
 
+    checkBlockNonPhase2(transactionId);
+
     TrxTransactionState state;
     try {
       state = getTransactionState(transactionId);
@@ -4830,6 +4928,9 @@ CoprocessorService, Coprocessor {
 
     if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
commitIfPossible -- ENTRY txId: "
                + transactionId);
+
+    checkBlockNonPhase2(transactionId);
+
     int status = commitRequest(transactionId, participantNum);
   
     if (status == COMMIT_OK) {
@@ -5346,6 +5447,12 @@ CoprocessorService, Coprocessor {
     }
   }
   public void flushToFS(Path flushPath) throws IOException {
+
+   if(LOG.isTraceEnabled()) LOG.trace("flushToFS -- ENTRY, Path: " + 
flushPath.toString());
+
+   if(LOG.isTraceEnabled()) LOG.trace("flushToFS -- transactionsById (" + 
transactionsById.size() + "), commitedTransactionsBySequenceNumber (" + 
commitedTransactionsBySequenceNumber.size() + ")");
+
+
     TransactionPersist.Builder txnPersistBuilder = 
TransactionPersist.newBuilder();
     fs.delete(flushPath, true);
 
@@ -5356,16 +5463,22 @@ CoprocessorService, Coprocessor {
 
     Map<Long, TrxTransactionState> transactionMap = new HashMap<Long, 
TrxTransactionState>();
 
-    for(TrxTransactionState ts : transactionsById.values()) {
-      transactionMap.put(ts.getTransactionId(), ts);
-      txnPersistBuilder.addTxById(ts.getTransactionId());
+    synchronized (transactionsById) {
+        for(TrxTransactionState ts : transactionsById.values()) {
+            transactionMap.put(ts.getTransactionId(), ts);
+            txnPersistBuilder.addTxById(ts.getTransactionId());
+        }
     }
-    for(Map.Entry<Long, TrxTransactionState> entry :
-        commitedTransactionsBySequenceNumber.entrySet()) {
-      transactionMap.put(entry.getValue().getTransactionId(), 
entry.getValue());
-      txnPersistBuilder.addSeqNoListSeq(entry.getKey());
-      txnPersistBuilder.addSeqNoListTxn(entry.getValue().getTransactionId());
+
+    synchronized (commitedTransactionsBySequenceNumber) {
+        for(Map.Entry<Long, TrxTransactionState> entry :
+            commitedTransactionsBySequenceNumber.entrySet()) {
+                transactionMap.put(entry.getValue().getTransactionId(), 
entry.getValue());
+                txnPersistBuilder.addSeqNoListSeq(entry.getKey());
+                
txnPersistBuilder.addSeqNoListTxn(entry.getValue().getTransactionId());
+         }
     }
+
     for(TrxTransactionState ts : transactionMap.values()) {
       for(TrxTransactionState ts2 : ts.getTransactionsToCheck()) {
         transactionMap.put(ts.getTransactionId(), ts);
@@ -5505,7 +5618,13 @@ CoprocessorService, Coprocessor {
                     }
                   }
                   transactionsById.put(key, ts);
-                  transactionLeases.createLease(key, transactionLeaseTimeout, 
new TransactionLeaseListener(txid));
+
+                  try {
+                      transactionLeases.createLease(key, 
transactionLeaseTimeout, new TransactionLeaseListener(txid));
+                  } catch (LeaseStillHeldException e) {
+                      transactionLeases.renewLease(key);
+                  }
+
                 }
                 else {
                   TrxTransactionState tsEntry = new TrxTransactionState(txid,
@@ -5536,8 +5655,27 @@ CoprocessorService, Coprocessor {
 
   }
 
-  public void setClosing(boolean value) {
-    closing.set(value);
+  public void setBlockAll(boolean value) {
+
+    blockAll.set(value);
+
+    // for safety
+    if (value == true) {
+        blockNewTrans.set(value);
+        blockNonPhase2.set(value);
+    }
+ }
+
+  public void setBlockNonPhase2(boolean value) {
+    blockNonPhase2.set(value);
+
+    // for safety
+    if (value == true)
+        blockNewTrans.set(value);
   }
+
+  public void setNewTrans(boolean value) {
+    blockNewTrans.set(value);
+   }
 }
 //1}

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/2b46c9c0/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java
index ddf4f4e..6b0dd78 100644
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java
@@ -92,7 +92,9 @@ public static final String 
trxkeycommitedTransactionsBySequenceNumber = "commite
 public static final String trxkeycommitPendingTransactions = 
"commitPendingTransactions";
 public static final String trxkeypendingTransactionsById = 
"pendingTransactionsById";
 public static final String trxkeyindoubtTransactionsCountByTmid = 
"indoubtTransactionsCountByTmid";
-public static final String trxkeyClosingVar = "checkClosingVariable";
+public static final String trxkeyCheckBlockAllVar = "checkBlockAllVar";
+public static final String trxkeyCheckBlockNonPhase2Var = 
"checkBlockNonPhase2Var";
+public static final String trxkeyCheckBlockNewTransVar = 
"checkBlockNewTransVar";
 public static final String trxkeyScanners = "trxScanners";
 
 public static final String SPLIT_DELAY_NOFLUSH = 
"hbase.transaction.split.delay.noflush";
@@ -127,7 +129,9 @@ static ConcurrentHashMap<String, Object> trxRegionMap;
 
 private ConcurrentHashMap<String, TrxTransactionState> transactionsById = new 
ConcurrentHashMap<String, TrxTransactionState>();
 private Set<TrxTransactionState> commitPendingTransactions = 
Collections.synchronizedSet(new HashSet<TrxTransactionState>());
-private AtomicBoolean closing = new AtomicBoolean(false);
+private AtomicBoolean blockAll = new AtomicBoolean(false);
+private AtomicBoolean blockNonPhase2 = new AtomicBoolean(false);
+private AtomicBoolean blockNewTrans = new AtomicBoolean(false);
 private boolean hasClosed = false;
 private boolean hasFlushed = false;
 
@@ -255,15 +259,32 @@ public void start(CoprocessorEnvironment e) throws 
IOException {
                               this.commitPendingTransactions);
    }
 
-   AtomicBoolean closingCheck = (AtomicBoolean)transactionsRefMap
-                                               
.get(regionName+trxkeyClosingVar);
-   if(closingCheck != null) {
-       this.closing = closingCheck;
+   AtomicBoolean blockAllCheck = (AtomicBoolean)transactionsRefMap
+                                               
.get(regionName+trxkeyCheckBlockAllVar);
+   if(blockAllCheck != null) {
+       this.blockAll = blockAllCheck;
+    }
+    else {
+       transactionsRefMap.put(regionName+trxkeyCheckBlockAllVar, 
this.blockAll);
+    }
+   AtomicBoolean blockNonPhase2Check = (AtomicBoolean)transactionsRefMap
+                                               
.get(regionName+trxkeyCheckBlockNonPhase2Var);
+   if(blockNonPhase2Check != null) {
+       this.blockNonPhase2 = blockNonPhase2Check;
    }
    else {
-       transactionsRefMap.put(regionName+trxkeyClosingVar, this.closing);
+       transactionsRefMap.put(regionName+trxkeyCheckBlockNonPhase2Var, 
this.blockNonPhase2);
    }
 
+    AtomicBoolean blockNewTransCheck = (AtomicBoolean)transactionsRefMap
+                                                 
.get(regionName+trxkeyCheckBlockNewTransVar);
+    if(blockNewTransCheck != null) {
+        this.blockNewTrans = blockNewTransCheck;
+    }
+    else {
+        
transactionsRefMap.put(regionName+trxkeyCheckBlockNewTransVar,this.blockNewTrans);
+    }
+
    @SuppressWarnings("unchecked")
    ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scannersCheck =
        
(ConcurrentHashMap<Long,TransactionalRegionScannerHolder>)transactionsRefMap
@@ -621,25 +642,31 @@ public void createRecoveryzNode(int node, String 
encodedName, byte [] data) thro
     public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, 
byte[] splitRow) throws IOException {
         if(LOG.isTraceEnabled()) LOG.trace("preSplit -- ENTRY region: " + 
regionInfo.getRegionNameAsString());
 
+        if(LOG.isTraceEnabled()) LOG.trace("preSplit -- transactionsById (" + 
transactionsById.size() + " ), commitPendingTransactions (" + 
commitPendingTransactions.size() +"), scanners (" + scanners.size() + ")");
+
+        blockNewTrans.set(true);
+
         if(splitDelayNoFlush) {
             if(!this.earlyDrain)
-              sbHelper.activeWait(transactionsById, activeDelayLen, 
splitDelayLimit);
-            closing.set(true);
-            sbHelper.pendingWait(commitPendingTransactions, pendingDelayLen);
+               sbHelper.activeWait(transactionsById, activeDelayLen, 
splitDelayLimit);
+             sbHelper.pendingWait(commitPendingTransactions, pendingDelayLen); 
 
         }
         else {
+            blockNonPhase2.set(true);
             sbHelper.pendingAndScannersWait(commitPendingTransactions, 
scanners, pendingDelayLen);
-            closing.set(true);
 
             sbHelper.setSplit();
         }
 
+        blockAll.set(true);
         if(LOG.isTraceEnabled()) LOG.trace("preSplit -- EXIT region: " + 
regionInfo.getRegionNameAsString());
     }
 
     @Override
     public void        postSplit(ObserverContext<RegionCoprocessorEnvironment> 
e, HRegion l, HRegion r) {
 
+        if(LOG.isTraceEnabled()) LOG.trace("postSplit -- ENTRY");
+
         if(splitDelayNoFlush)
           return;
 
@@ -652,15 +679,20 @@ public void createRecoveryzNode(int node, String 
encodedName, byte [] data) thro
           }
           else {
           try {
-             treL.setClosing(true);
-             treR.setClosing(true);
+             // don't need to set NewTrans flag because blockNonPhase2 will 
catch up
+             treL.setBlockAll(true);
+             treR.setBlockAll(true);
              Thread readThread = new Thread(new TxnReadThread(treL, 
sbHelper.getPath(), true));
              readThread.start();
              //treL.readTxnInfo(sbHelper.getPath(), true);
              treR.readTxnInfo(sbHelper.getPath(), true);
              readThread.join();
-             treL.setClosing(false);
-             treR.setClosing(false);
+             treL.setBlockAll(false);
+             treR.setBlockAll(false);
+             treL.setBlockNonPhase2(false);
+             treR.setBlockNonPhase2(false);
+             treL.setNewTrans(false);
+             treR.setNewTrans(false);
              sbHelper.clearSplit();
           } catch (IOException ioe) {
              if(LOG.isErrorEnabled()) LOG.error("Unable to read Transaction 
Info for transactional split coordination: " + ioe);
@@ -679,7 +711,11 @@ public void createRecoveryzNode(int node, String 
encodedName, byte [] data) thro
             c.getEnvironment().getRegionServerServices().isStopped())
             return;
 
+        if(LOG.isTraceEnabled()) LOG.trace("preClose -- 
commitPendingTransactions (" + commitPendingTransactions.size() +")");
+
         if (!hasClosed) {
+                blockNonPhase2.set(true);
+
                if(LOG.isInfoEnabled()) {
                    HRegion region = c.getEnvironment().getRegion();
                    LOG.debug("preClose -- setting close var to true on: " + 
region.getRegionNameAsString());
@@ -689,7 +725,7 @@ public void createRecoveryzNode(int node, String 
encodedName, byte [] data) thro
                } catch(IOException ioe) {
                  LOG.error("Encountered exception when calling 
pendingAndScannersWait(): " + ioe);
                }
-               closing.set(true);
+               blockAll.set(true);
                hasClosed = true;
         }
 
@@ -728,7 +764,9 @@ public void createRecoveryzNode(int node, String 
encodedName, byte [] data) thro
         
transactionsRefMap.remove(regionName+trxkeyindoubtTransactionsCountByTmid);
         transactionsRefMap.remove(regionName+trxkeytransactionsById);
         transactionsRefMap.remove(regionName+trxkeycommitPendingTransactions);
-        transactionsRefMap.remove(regionName+trxkeyClosingVar);
+        transactionsRefMap.remove(regionName+trxkeyCheckBlockAllVar);
+        transactionsRefMap.remove(regionName+trxkeyCheckBlockNonPhase2Var);
+        transactionsRefMap.remove(regionName+trxkeyCheckBlockNewTransVar);
         transactionsRefMap.remove(regionName+trxkeyScanners);
     }
 


Reply via email to