Fix to correct loss of updates following a regionServer failure v2

Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/c74e3d62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/c74e3d62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/c74e3d62

Branch: refs/heads/master
Commit: c74e3d62c509c28d890a8329f4346b4b80698064
Parents: 9fc659a
Author: Sean Broeder <[email protected]>
Authored: Fri Jul 1 21:43:38 2016 +0000
Committer: Sean Broeder <[email protected]>
Committed: Wed Jul 6 19:17:17 2016 +0000

----------------------------------------------------------------------
 .../hbase/client/transactional/RMInterface.java |  176 +-
 .../transactional/TransactionManager.java       |   12 +-
 .../client/transactional/TransactionState.java  |   16 +-
 .../transactional/TrxRegionEndpoint.java.tmpl   |   65 +-
 .../generated/SsccRegionProtos.java             |   21 +
 .../generated/TrxRegionProtos.java              | 2418 +++++++++++++-----
 .../hbase-trx/src/main/protobuf/TrxRegion.proto |   21 +-
 7 files changed, 2103 insertions(+), 626 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
index df74a45..fe98284 100644
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
@@ -38,14 +38,19 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.client.transactional.TransactionManager;
 import org.apache.hadoop.hbase.client.transactional.TransactionState;
@@ -68,6 +73,18 @@ import 
org.apache.hadoop.hbase.regionserver.transactional.IdTm;
 import org.apache.hadoop.hbase.regionserver.transactional.IdTmException;
 import org.apache.hadoop.hbase.regionserver.transactional.IdTmId;
 
+import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
+import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest;
+import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse;
+
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.ByteString;
+
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -75,6 +92,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
 
 public class RMInterface {
     static final Log LOG = LogFactory.getLog(RMInterface.class);
@@ -84,6 +107,9 @@ public class RMInterface {
     public AlgorithmType TRANSACTION_ALGORITHM;
     static Map<Long, Set<RMInterface>> mapRMsPerTransaction = new 
HashMap<Long,  Set<RMInterface>>();
     private TransactionalTableClient ttable = null;
+    private ExecutorService threadPool;
+    private CompletionService<Integer> compPool;
+    private int intThreads = 16;
     static {
         System.loadLibrary("stmlib");
     }
@@ -137,6 +163,130 @@ public class RMInterface {
 
     }
 
+    public void pushRegionEpoch (HTableDescriptor desc, final TransactionState 
ts) throws IOException {
+       LOG.info("pushRegionEpoch start; transId: " + ts.getTransactionId());
+
+       TransactionalTable ttable1 = new 
TransactionalTable(Bytes.toBytes(desc.getNameAsString()));
+       HConnection connection = ttable1.getConnection();
+       long lvTransid = ts.getTransactionId();
+       RegionLocator rl = connection.getRegionLocator(desc.getTableName());
+       List<HRegionLocation> regionList = rl.getAllRegionLocations();
+
+       boolean complete = false;
+       int loopCount = 0;
+       int result = 0;
+
+       for (HRegionLocation location : regionList) {
+          final byte[] regionName = location.getRegionInfo().getRegionName();
+          if (compPool == null){
+              LOG.info("pushRegionEpoch compPool is null");
+              threadPool = Executors.newFixedThreadPool(intThreads);
+              compPool = new ExecutorCompletionService<Integer>(threadPool);
+          }
+
+          final HRegionLocation lv_location = location;
+          final HConnection lv_connection = connection;
+          compPool.submit(new RMCallable2(ts, lv_location, lv_connection ) {
+             public Integer call() throws IOException {
+                return pushRegionEpochX(ts, lv_location, lv_connection);
+             }
+          });
+          try {
+            result = compPool.take().get();
+          } catch(Exception ex) {
+            throw new IOException(ex);
+          }
+          if ( result != 0 ){
+             LOG.error("pushRegionEpoch result " + result + " returned from 
region "
+                          + location.getRegionInfo().getRegionName());
+             throw new IOException("pushRegionEpoch result " + result + " 
returned from region "
+                      + location.getRegionInfo().getRegionName());
+          }
+       }
+       if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpoch end transid: " + 
ts.getTransactionId());
+       return;
+    }
+
+    private abstract class RMCallable2 implements Callable<Integer>{
+       TransactionState transactionState;
+       HRegionLocation  location;
+       HConnection connection;
+       HTable table;
+       byte[] startKey;
+       byte[] endKey_orig;
+       byte[] endKey;
+
+       RMCallable2(TransactionState txState, HRegionLocation location, 
HConnection connection) {
+          this.transactionState = txState;
+          this.location = location;
+          this.connection = connection;
+          try {
+             table = new HTable(location.getRegionInfo().getTable(), 
connection);
+          } catch(IOException e) {
+             LOG.error("Error obtaining HTable instance " + e);
+             table = null;
+          }
+          startKey = location.getRegionInfo().getStartKey();
+          endKey_orig = location.getRegionInfo().getEndKey();
+          endKey = TransactionManager.binaryIncrementPos(endKey_orig, -1);
+
+       }
+
+       public Integer pushRegionEpochX(final TransactionState txState,
+                                  final HRegionLocation location, HConnection 
connection) throws IOException {
+          if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Entry 
txState: " + txState
+                   + " location: " + location);
+               
+          Batch.Call<TrxRegionService, PushEpochResponse> callable =
+              new Batch.Call<TrxRegionService, PushEpochResponse>() {
+                 ServerRpcController controller = new ServerRpcController();
+                 BlockingRpcCallback<PushEpochResponse> rpcCallback =
+                    new BlockingRpcCallback<PushEpochResponse>();
+
+                 @Override
+                 public PushEpochResponse call(TrxRegionService instance) 
throws IOException {
+                    
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest.Builder
+                    builder = PushEpochRequest.newBuilder();
+                    builder.setTransactionId(txState.getTransactionId());
+                    builder.setEpoch(txState.getStartEpoch());
+                    
builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(location.getRegionInfo().getRegionName())));
+                    instance.pushOnlineEpoch(controller, builder.build(), 
rpcCallback);
+                    return rpcCallback.get();
+                 }
+              };
+
+              Map<byte[], PushEpochResponse> result = null;
+              try {
+                 if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- 
before coprocessorService: startKey: "
+                     + new String(startKey, "UTF-8") + " endKey: " + new 
String(endKey, "UTF-8"));
+                 result = table.coprocessorService(TrxRegionService.class, 
startKey, endKey, callable);
+              } catch (Throwable e) {
+                 String msg = "ERROR occurred while calling pushRegionEpoch 
coprocessor service in pushRegionEpochX";
+                 LOG.error(msg + ":" + e);
+                 throw new IOException(msg);
+              }
+
+              if(result.size() == 1){
+                 // size is 1
+                 for (PushEpochResponse eresponse : result.values()){
+                   if(eresponse.getHasException()) {
+                     String exceptionString = new String 
(eresponse.getException().toString());
+                     LOG.error("pushRegionEpochX - coprocessor 
exceptionString: " + exceptionString);
+                     throw new IOException(eresponse.getException());
+                   }
+                 }
+              }
+              else {
+                  LOG.error("pushRegionEpochX, received incorrect result size: 
" + result.size() + " txid: "
+                          + txState.getTransactionId() + " location: " + 
location.getRegionInfo().getRegionNameAsString());
+                  return 1;
+              }
+              if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Exit 
txState: " + txState
+                      + " location: " + location);
+              return 0;       
+       }
+    }
+
     public synchronized TransactionState registerTransaction(final long 
transactionID, final byte[] row) throws IOException {
         if (LOG.isTraceEnabled()) LOG.trace("Enter registerTransaction, 
transaction ID: " + transactionID);
         boolean register = false;
@@ -222,12 +372,20 @@ public class RMInterface {
     }
 
     public void createTable(HTableDescriptor desc, byte[][] keys, int 
numSplits, int keyLength, long transID) throws IOException {
-
-        if (LOG.isTraceEnabled()) LOG.trace("createTable ENTER: ");
-            byte[] lv_byte_desc = desc.toByteArray();
-            byte[] lv_byte_tblname = desc.getNameAsString().getBytes();
-            if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc 
bytearray: " + lv_byte_desc + "desc in hex: " + 
Hex.encodeHexString(lv_byte_desc));
-            createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, 
lv_byte_tblname);
+       if (LOG.isTraceEnabled()) LOG.trace("Enter createTable, txid: " + 
transID + " Table: " + desc.getNameAsString());
+        byte[] lv_byte_desc = desc.toByteArray();
+        byte[] lv_byte_tblname = desc.getNameAsString().getBytes();
+        if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc 
bytearray: " + lv_byte_desc + "desc in hex: " + 
Hex.encodeHexString(lv_byte_desc));
+        createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, 
lv_byte_tblname);
+        TransactionState ts = mapTransactionStates.get(transID);
+        if (LOG.isTraceEnabled()) LOG.trace("createTable: pushing epoch into 
regions for : " + desc.getNameAsString());
+        if (ts == null){
+           if (LOG.isTraceEnabled()) LOG.trace("pushing epoch into regions but 
unable to get ts object for transID : " + transID);
+           throw new IOException("createTable push epoch exception for table " 
+ desc.getNameAsString());
+        }
+        pushRegionEpoch(desc, ts);
+        if (LOG.isTraceEnabled()) LOG.trace("createTable: epoch pushed into 
regions for : " + desc.getNameAsString());
+        if (LOG.isTraceEnabled()) LOG.trace("Exit createTable, txid: " + 
transID + " Table: " + desc.getNameAsString());
     }
 
     public void truncateTableOnAbort(String tblName, long transID) throws 
IOException {
@@ -261,7 +419,13 @@ public class RMInterface {
     static public synchronized void unregisterTransaction(final long 
transactionID) {
       TransactionState ts = null;
       if (LOG.isTraceEnabled()) LOG.trace("Enter unregisterTransaction txid: " 
+ transactionID);
+      try {
         ts = mapTransactionStates.remove(transactionID);
+      } catch (Exception e) {
+        LOG.warn("Ignoring exception. mapTransactionStates.remove for transid 
" + transactionID + 
+                 " failed with exception " + e);
+        return;
+      }
       if (ts == null) {
         LOG.warn("mapTransactionStates.remove did not find transid " + 
transactionID);
       }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
index 9d90ace..9c950d7 100644
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
@@ -608,9 +608,9 @@ public class TransactionManager {
       * Return  : Commit vote (yes, no, read only)
       * Purpose : Call prepare for a given regionserver
      */
-    public Integer doPrepareX(final byte[] regionName, final long 
transactionId, final long startEpoc, final int participantNum, final 
TransactionRegionLocation location)
+    public Integer doPrepareX(final byte[] regionName, final long 
transactionId, final long startEpoch, final int participantNum, final 
TransactionRegionLocation location)
           throws IOException, CommitUnsuccessfulException {
-       if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- ENTRY txid: " + 
transactionId + " startEpoc " + startEpoc
+       if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- ENTRY txid: " + 
transactionId + " startEpoch " + startEpoch
                                                           + " participantNum " 
+ participantNum + " RegionName " + Bytes.toString(regionName)
                                                        + " TableName " + 
table.toString() + " location " + location );
        int commitStatus = 0;
@@ -632,7 +632,7 @@ public class TransactionManager {
                 public CommitRequestResponse call(TrxRegionService instance) 
throws IOException {
                    
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestRequest.Builder
 builder = CommitRequestRequest.newBuilder();
                    builder.setTransactionId(transactionId);
-                   builder.setStartEpoc(startEpoc);
+                   builder.setStartEpoch(startEpoch);
                    
builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName)));
                    builder.setParticipantNum(participantNum);
 
@@ -1581,7 +1581,7 @@ public class TransactionManager {
         //long transactionId =
       if (LOG.isTraceEnabled()) LOG.trace("Enter beginTransaction, txid: " + 
transactionId);
       TransactionState ts = new TransactionState(transactionId);
-      ts.setStartEpoc(EnvironmentEdgeManager.currentTime());
+      ts.setStartEpoch(EnvironmentEdgeManager.currentTime());
       long startIdVal = -1;
 
       // Set the startid
@@ -1698,7 +1698,7 @@ public class TransactionManager {
                         public Integer call() throws 
CommitUnsuccessfulException, IOException {
 
                             return 
doPrepareX(location.getRegionInfo().getRegionName(),
-                                    transactionState.getTransactionId(), 
transactionState.getStartEpoc(), lvParticipantNum,
+                                    transactionState.getTransactionId(), 
transactionState.getStartEpoch(), lvParticipantNum,
                                     location);
                         }
                     });
@@ -1783,7 +1783,7 @@ public class TransactionManager {
 
              compPool.submit(new TransactionManagerCallable(transactionState, 
location, connection) {
                public Integer call() throws IOException, 
CommitUnsuccessfulException {
-                 return doPrepareX(regionName, 
transactionState.getTransactionId(), transactionState.getStartEpoc(), 
lvParticipantNum, myLocation);
+                 return doPrepareX(regionName, 
transactionState.getTransactionId(), transactionState.getStartEpoch(), 
lvParticipantNum, myLocation);
                }
              });
            }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
index 7c837f4..2dfcc93 100644
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
@@ -46,7 +46,7 @@ public class TransactionState {
 
     private final long transactionId;
     private TransState status;
-    private long startEpoc;
+    private long startEpoch;
     private long startId;
     private long commitId;
 
@@ -367,17 +367,17 @@ public class TransactionState {
      * Set the startEpoc.
      *
      */
-    public void setStartEpoc(final long epoc) {
-        this.startEpoc = epoc;
+    public void setStartEpoch(final long epoch) {
+        this.startEpoch = epoch;
     }
 
     /**
-     * Get the startEpoc.
+     * Get the startEpoch.
      *
-     * @return Return the startEpoc.
+     * @return Return the startEpoch.
      */
-    public long getStartEpoc() {
-        return startEpoc;
+    public long getStartEpoch() {
+        return startEpoch;
     }
 
     /**
@@ -420,7 +420,7 @@ public class TransactionState {
     @Override
     public String toString() {
         return "transactionId: " + transactionId + ", startId: " + startId + 
", commitId: " + commitId +
-               ", startEpoc: " + startEpoc + ", participants: " + 
participatingRegions.size()
+               ", startEpoch: " + startEpoch + ", participants: " + 
participatingRegions.size()
                + ", ignoring: " + regionsToIgnore.size() + ", hasDDL: " + 
hasDDLTx()
                + ", state: " + status.toString();
     }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
index e351eb3..0bbca72 100644
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
@@ -210,6 +210,8 @@ import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProt
 import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutTransactionalResponse;
 import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalRequest;
 import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalResponse;
+import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest;
+import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse;
 import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestRequest;
 import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestResponse;
 import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteRequest;
@@ -304,7 +306,7 @@ CoprocessorService, Coprocessor {
   private int regionState = 0; 
   private Path recoveryTrxPath = null;
   private int cleanAT = 0;
-  private long onlineEpoc = EnvironmentEdgeManager.currentTime();  
+  private long onlineEpoch = EnvironmentEdgeManager.currentTime();  
   
   private long[] commitCheckTimes   = new long[50];
   private long[] hasConflictTimes   = new long[50];
@@ -789,7 +791,7 @@ CoprocessorService, Coprocessor {
     boolean reply = false;
     long transactionId = request.getTransactionId();
     long commitId = request.getCommitId();
-    long startEpoc = request.getStartEpoc();
+    long startEpoch = request.getStartEpoch();
     final int participantNum = request.getParticipantNum();
     Throwable t = null;
     WrongRegionException wre = null;
@@ -811,7 +813,7 @@ CoprocessorService, Coprocessor {
        // Process local memory
        try {
         if (LOG.isDebugEnabled()) LOG.debug("commitIfPossible - txId "  + 
transactionId + ", regionName, " + regionInfo.getRegionNameAsString() + 
"calling internal commitIfPossible");
-         reply = commitIfPossible(transactionId, startEpoc, commitId, 
participantNum);
+         reply = commitIfPossible(transactionId, startEpoch, commitId, 
participantNum);
        } catch (Throwable e) {
           if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
commitIfPossible - txId " + transactionId + ", Caught exception after internal 
commitIfPossible call "
                    + e.getMessage() + " " + stackTraceToString(e));
@@ -852,12 +854,12 @@ CoprocessorService, Coprocessor {
     Throwable t = null;
     WrongRegionException wre = null;
     long transactionId = request.getTransactionId();
-    long startEpoc = request.getStartEpoc();
+    long startEpoch = request.getStartEpoch();
     int participantNum = request.getParticipantNum();
     boolean dropTableRecorded = request.getDropTableRecorded();
 
     if (LOG.isTraceEnabled()) LOG.trace("commitRequest - txId "
-         + transactionId + ", startEpoc " + startEpoc + ", participantNum " + 
participantNum + ", dropTableRecorded " + dropTableRecorded + 
+         + transactionId + ", startEpoch " + startEpoch + ", participantNum " 
+ participantNum + ", dropTableRecorded " + dropTableRecorded + 
          ", regionName " + regionInfo.getRegionNameAsString());
 
     /*  commenting out for the time being
@@ -876,7 +878,7 @@ CoprocessorService, Coprocessor {
     {
       // Process local memory
       try {
-        status = commitRequest(transactionId, startEpoc, participantNum, 
dropTableRecorded);
+        status = commitRequest(transactionId, startEpoch, participantNum, 
dropTableRecorded);
       } catch (UnknownTransactionException u) {
         if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
commitRequest - txId " + transactionId + ", Caught UnknownTransactionException 
after internal commitRequest call - " + u.toString());
         ute = u;
@@ -935,7 +937,7 @@ CoprocessorService, Coprocessor {
     Throwable t = null;
     WrongRegionException wre = null;
     long transactionId = request.getTransactionId();
-    long startEpoc = request.getStartEpoc();
+    long startEpoch = request.getStartEpoch();
     int i = 0;
     int numOfRegion = request.getRegionNameCount();
     String requestRegionName;
@@ -978,8 +980,8 @@ CoprocessorService, Coprocessor {
                  commitRequestMultipleResponseBuilder.setException(i, 
BatchException.EXCEPTION_REGIONNOTFOUND_ERR.toString());
               }
               else {
-                 if (i == (numOfRegion - 1)) {status = 
regionEPCP.commitRequest(transactionId, startEpoc, i, true);} // only the last 
region flush
-                 else {status = regionEPCP.commitRequest(transactionId, 
startEpoc, i, false);}
+                 if (i == (numOfRegion - 1)) {status = 
regionEPCP.commitRequest(transactionId, startEpoch, i, true);} // only the last 
region flush
+                 else {status = regionEPCP.commitRequest(transactionId, 
startEpoch, i, false);}
               }
               if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint 
commitRequestMultiple ends");
              //status = commitRequest(transactionId);
@@ -5099,16 +5101,16 @@ CoprocessorService, Coprocessor {
    * @return TransactionRegionInterface commit code
    * @throws IOException
    */
-  public int commitRequest(final long transactionId, final long startEpoc, 
final int participantNum) throws IOException, UnknownTransactionException {
-     return commitRequest(transactionId, startEpoc, participantNum, true, 
false);
+  public int commitRequest(final long transactionId, final long startEpoch, 
final int participantNum) throws IOException, UnknownTransactionException {
+     return commitRequest(transactionId, startEpoch, participantNum, true, 
false);
   }
 
-  public int commitRequest(final long transactionId, final long startEpoc, 
final int participantNum, final boolean dropTableRecorded)
+  public int commitRequest(final long transactionId, final long startEpoch, 
final int participantNum, final boolean dropTableRecorded)
                                                     throws IOException, 
UnknownTransactionException {
-      return commitRequest(transactionId, startEpoc, participantNum, true, 
dropTableRecorded);
+      return commitRequest(transactionId, startEpoch, participantNum, true, 
dropTableRecorded);
   }
   
-  public int commitRequest(final long transactionId, final long startEpoc, 
final int participantNum, boolean flushHLOG,
+  public int commitRequest(final long transactionId, final long startEpoch, 
final int participantNum, boolean flushHLOG,
                                                                boolean 
dropTableRecorded) throws IOException,
                                                                                
 UnknownTransactionException {
     long txid = 0;
@@ -5119,9 +5121,9 @@ CoprocessorService, Coprocessor {
     checkBlockNonPhase2(transactionId);
 
     TrxTransactionState state;
-    if (startEpoc < onlineEpoc) {
+    if (startEpoch < onlineEpoch) {
         LOG.info("commitRequest txId: "
-               + transactionId + " startEpoc " + startEpoc + " is less than 
region's onlineEpoc " + onlineEpoc
+               + transactionId + " startEpoch " + startEpoch + " is less than 
region's onlineEpoch " + onlineEpoch
                + " for regionName " + lv_regionName
                + " must return COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR ");
         return COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
@@ -5527,7 +5529,7 @@ CoprocessorService, Coprocessor {
    * @return boolean
    * @throws IOException
    */
-  public boolean commitIfPossible(final long transactionId, final long 
startEpoc, final long commitId, final int participantNum)
+  public boolean commitIfPossible(final long transactionId, final long 
startEpoch, final long commitId, final int participantNum)
     throws IOException {
 
     if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: 
commitIfPossible -- ENTRY txId: "
@@ -5535,7 +5537,7 @@ CoprocessorService, Coprocessor {
 
     checkBlockNonPhase2(transactionId);
 
-    int status = commitRequest(transactionId, startEpoc, participantNum);
+    int status = commitRequest(transactionId, startEpoch, participantNum);
   
     if (status == COMMIT_OK) {
 
@@ -6050,6 +6052,27 @@ CoprocessorService, Coprocessor {
       }
     }
   }
+  
+  public void pushOnlineEpoch(RpcController controller,
+          PushEpochRequest request, RpcCallback<PushEpochResponse> done) {
+
+    org.apache.hadoop.hbase.client.Result result = null;
+    long transId  = request.getTransactionId();
+    long tmpEpoch = request.getEpoch();
+
+    if (LOG.isTraceEnabled()) LOG.trace("pushOnlineEpoch ENTRY.  Epoch " + 
tmpEpoch
+        + " in region: " + regionInfo.getRegionNameAsString());
+
+    this.onlineEpoch = tmpEpoch;
+    
+    
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse.Builder
 pushEpochResponseBuilder = PushEpochResponse.newBuilder();
+    pushEpochResponseBuilder.setHasException(false);
+
+    PushEpochResponse epochResponse = pushEpochResponseBuilder.build();
+    done.run(epochResponse);
+ }
+  
+  
   public void flushToFS(Path flushPath) throws IOException {
 
    if(LOG.isTraceEnabled()) LOG.trace("flushToFS -- ENTRY, Path: " + 
flushPath.toString());
@@ -6089,7 +6112,7 @@ CoprocessorService, Coprocessor {
       }
     }
     txnPersistBuilder.setNextSeqId(nextSequenceId.get());
-    txnPersistBuilder.setOnlineEpoc(this.onlineEpoc);
+    txnPersistBuilder.setOnlineEpoch(this.onlineEpoch);
 
     ByteArrayOutputStream output = new ByteArrayOutputStream();
 
@@ -6251,8 +6274,8 @@ CoprocessorService, Coprocessor {
               }
               
               this.nextSequenceId = new 
AtomicLong(txnPersistMsg.getNextSeqId());
-              this.onlineEpoc = txnPersistMsg.getOnlineEpoc();
-              LOG.info("Setting onlineEpoc after split to " + this.onlineEpoc);
+              this.onlineEpoch = txnPersistMsg.getOnlineEpoch();
+              LOG.info("Setting onlineEpoch after split to " + 
this.onlineEpoch);
             } catch(IOException e) {
                 StringWriter sw = new StringWriter();
                 PrintWriter pw = new PrintWriter(sw);

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
index 79a687b..71a8a49 100644
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
@@ -1,3 +1,24 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
 // Generated by the protocol buffer compiler.  DO NOT EDIT!
 // source: SsccRegion.proto
 

Reply via email to