Fix to correct loss of updates following a regionServer failure v2
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/c74e3d62 Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/c74e3d62 Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/c74e3d62 Branch: refs/heads/master Commit: c74e3d62c509c28d890a8329f4346b4b80698064 Parents: 9fc659a Author: Sean Broeder <[email protected]> Authored: Fri Jul 1 21:43:38 2016 +0000 Committer: Sean Broeder <[email protected]> Committed: Wed Jul 6 19:17:17 2016 +0000 ---------------------------------------------------------------------- .../hbase/client/transactional/RMInterface.java | 176 +- .../transactional/TransactionManager.java | 12 +- .../client/transactional/TransactionState.java | 16 +- .../transactional/TrxRegionEndpoint.java.tmpl | 65 +- .../generated/SsccRegionProtos.java | 21 + .../generated/TrxRegionProtos.java | 2418 +++++++++++++----- .../hbase-trx/src/main/protobuf/TrxRegion.proto | 21 +- 7 files changed, 2103 insertions(+), 626 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java index df74a45..fe98284 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java @@ -38,14 +38,19 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.client.transactional.TransactionManager; import org.apache.hadoop.hbase.client.transactional.TransactionState; @@ -68,6 +73,18 @@ import org.apache.hadoop.hbase.regionserver.transactional.IdTm; import org.apache.hadoop.hbase.regionserver.transactional.IdTmException; import org.apache.hadoop.hbase.regionserver.transactional.IdTmId; +import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService; +import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest; +import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse; + +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; + +import org.apache.zookeeper.KeeperException; + +import com.google.protobuf.ByteString; + +import java.util.ArrayList; import java.util.Map; import java.util.HashMap; import java.util.Iterator; @@ -75,6 +92,12 @@ import java.util.List; import java.util.Set; import java.util.HashSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; public class RMInterface { static final Log LOG = LogFactory.getLog(RMInterface.class); @@ -84,6 +107,9 @@ public class RMInterface { public AlgorithmType TRANSACTION_ALGORITHM; static Map<Long, Set<RMInterface>> mapRMsPerTransaction = new HashMap<Long, Set<RMInterface>>(); private TransactionalTableClient ttable = null; + private ExecutorService threadPool; + private CompletionService<Integer> compPool; + private int intThreads = 16; static { System.loadLibrary("stmlib"); } @@ -137,6 +163,130 @@ public class RMInterface { } + public void pushRegionEpoch (HTableDescriptor desc, final TransactionState ts) throws IOException { + LOG.info("pushRegionEpoch start; transId: " + ts.getTransactionId()); + + TransactionalTable ttable1 = new TransactionalTable(Bytes.toBytes(desc.getNameAsString())); + HConnection connection = ttable1.getConnection(); + long lvTransid = ts.getTransactionId(); + RegionLocator rl = connection.getRegionLocator(desc.getTableName()); + List<HRegionLocation> regionList = rl.getAllRegionLocations(); + + boolean complete = false; + int loopCount = 0; + int result = 0; + + for (HRegionLocation location : regionList) { + final byte[] regionName = location.getRegionInfo().getRegionName(); + if (compPool == null){ + LOG.info("pushRegionEpoch compPool is null"); + threadPool = Executors.newFixedThreadPool(intThreads); + compPool = new ExecutorCompletionService<Integer>(threadPool); + } + + final HRegionLocation lv_location = location; + final HConnection lv_connection = connection; + compPool.submit(new RMCallable2(ts, lv_location, lv_connection ) { + public Integer call() throws IOException { + return pushRegionEpochX(ts, lv_location, lv_connection); + } + }); + try { + result = compPool.take().get(); + } catch(Exception ex) { + throw new IOException(ex); + } + if ( result != 0 ){ + LOG.error("pushRegionEpoch result " + result + " returned from region " + + location.getRegionInfo().getRegionName()); + throw new IOException("pushRegionEpoch result " + result + " returned from region " + + location.getRegionInfo().getRegionName()); + } + } + if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpoch end transid: " + ts.getTransactionId()); + return; + } + + private abstract class RMCallable2 implements Callable<Integer>{ + TransactionState transactionState; + HRegionLocation location; + HConnection connection; + HTable table; + byte[] startKey; + byte[] endKey_orig; + byte[] endKey; + + RMCallable2(TransactionState txState, HRegionLocation location, HConnection connection) { + this.transactionState = txState; + this.location = location; + this.connection = connection; + try { + table = new HTable(location.getRegionInfo().getTable(), connection); + } catch(IOException e) { + LOG.error("Error obtaining HTable instance " + e); + table = null; + } + startKey = location.getRegionInfo().getStartKey(); + endKey_orig = location.getRegionInfo().getEndKey(); + endKey = TransactionManager.binaryIncrementPos(endKey_orig, -1); + + } + + public Integer pushRegionEpochX(final TransactionState txState, + final HRegionLocation location, HConnection connection) throws IOException { + if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Entry txState: " + txState + + " location: " + location); + + Batch.Call<TrxRegionService, PushEpochResponse> callable = + new Batch.Call<TrxRegionService, PushEpochResponse>() { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<PushEpochResponse> rpcCallback = + new BlockingRpcCallback<PushEpochResponse>(); + + @Override + public PushEpochResponse call(TrxRegionService instance) throws IOException { + org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest.Builder + builder = PushEpochRequest.newBuilder(); + builder.setTransactionId(txState.getTransactionId()); + builder.setEpoch(txState.getStartEpoch()); + builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(location.getRegionInfo().getRegionName()))); + instance.pushOnlineEpoch(controller, builder.build(), rpcCallback); + return rpcCallback.get(); + } + }; + + Map<byte[], PushEpochResponse> result = null; + try { + if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- before coprocessorService: startKey: " + + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8")); + result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable); + } catch (Throwable e) { + String msg = "ERROR occurred while calling pushRegionEpoch coprocessor service in pushRegionEpochX"; + LOG.error(msg + ":" + e); + throw new IOException(msg); + } + + if(result.size() == 1){ + // size is 1 + for (PushEpochResponse eresponse : result.values()){ + if(eresponse.getHasException()) { + String exceptionString = new String (eresponse.getException().toString()); + LOG.error("pushRegionEpochX - coprocessor exceptionString: " + exceptionString); + throw new IOException(eresponse.getException()); + } + } + } + else { + LOG.error("pushRegionEpochX, received incorrect result size: " + result.size() + " txid: " + + txState.getTransactionId() + " location: " + location.getRegionInfo().getRegionNameAsString()); + return 1; + } + if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Exit txState: " + txState + + " location: " + location); + return 0; + } + } + public synchronized TransactionState registerTransaction(final long transactionID, final byte[] row) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("Enter registerTransaction, transaction ID: " + transactionID); boolean register = false; @@ -222,12 +372,20 @@ public class RMInterface { } public void createTable(HTableDescriptor desc, byte[][] keys, int numSplits, int keyLength, long transID) throws IOException { - - if (LOG.isTraceEnabled()) LOG.trace("createTable ENTER: "); - byte[] lv_byte_desc = desc.toByteArray(); - byte[] lv_byte_tblname = desc.getNameAsString().getBytes(); - if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc bytearray: " + lv_byte_desc + "desc in hex: " + Hex.encodeHexString(lv_byte_desc)); - createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, lv_byte_tblname); + if (LOG.isTraceEnabled()) LOG.trace("Enter createTable, txid: " + transID + " Table: " + desc.getNameAsString()); + byte[] lv_byte_desc = desc.toByteArray(); + byte[] lv_byte_tblname = desc.getNameAsString().getBytes(); + if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc bytearray: " + lv_byte_desc + "desc in hex: " + Hex.encodeHexString(lv_byte_desc)); + createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, lv_byte_tblname); + TransactionState ts = mapTransactionStates.get(transID); + if (LOG.isTraceEnabled()) LOG.trace("createTable: pushing epoch into regions for : " + desc.getNameAsString()); + if (ts == null){ + if (LOG.isTraceEnabled()) LOG.trace("pushing epoch into regions but unable to get ts object for transID : " + transID); + throw new IOException("createTable push epoch exception for table " + desc.getNameAsString()); + } + pushRegionEpoch(desc, ts); + if (LOG.isTraceEnabled()) LOG.trace("createTable: epoch pushed into regions for : " + desc.getNameAsString()); + if (LOG.isTraceEnabled()) LOG.trace("Exit createTable, txid: " + transID + " Table: " + desc.getNameAsString()); } public void truncateTableOnAbort(String tblName, long transID) throws IOException { @@ -261,7 +419,13 @@ public class RMInterface { static public synchronized void unregisterTransaction(final long transactionID) { TransactionState ts = null; if (LOG.isTraceEnabled()) LOG.trace("Enter unregisterTransaction txid: " + transactionID); + try { ts = mapTransactionStates.remove(transactionID); + } catch (Exception e) { + LOG.warn("Ignoring exception. mapTransactionStates.remove for transid " + transactionID + + " failed with exception " + e); + return; + } if (ts == null) { LOG.warn("mapTransactionStates.remove did not find transid " + transactionID); } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java index 9d90ace..9c950d7 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java @@ -608,9 +608,9 @@ public class TransactionManager { * Return : Commit vote (yes, no, read only) * Purpose : Call prepare for a given regionserver */ - public Integer doPrepareX(final byte[] regionName, final long transactionId, final long startEpoc, final int participantNum, final TransactionRegionLocation location) + public Integer doPrepareX(final byte[] regionName, final long transactionId, final long startEpoch, final int participantNum, final TransactionRegionLocation location) throws IOException, CommitUnsuccessfulException { - if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- ENTRY txid: " + transactionId + " startEpoc " + startEpoc + if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- ENTRY txid: " + transactionId + " startEpoch " + startEpoch + " participantNum " + participantNum + " RegionName " + Bytes.toString(regionName) + " TableName " + table.toString() + " location " + location ); int commitStatus = 0; @@ -632,7 +632,7 @@ public class TransactionManager { public CommitRequestResponse call(TrxRegionService instance) throws IOException { org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestRequest.Builder builder = CommitRequestRequest.newBuilder(); builder.setTransactionId(transactionId); - builder.setStartEpoc(startEpoc); + builder.setStartEpoch(startEpoch); builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName))); builder.setParticipantNum(participantNum); @@ -1581,7 +1581,7 @@ public class TransactionManager { //long transactionId = if (LOG.isTraceEnabled()) LOG.trace("Enter beginTransaction, txid: " + transactionId); TransactionState ts = new TransactionState(transactionId); - ts.setStartEpoc(EnvironmentEdgeManager.currentTime()); + ts.setStartEpoch(EnvironmentEdgeManager.currentTime()); long startIdVal = -1; // Set the startid @@ -1698,7 +1698,7 @@ public class TransactionManager { public Integer call() throws CommitUnsuccessfulException, IOException { return doPrepareX(location.getRegionInfo().getRegionName(), - transactionState.getTransactionId(), transactionState.getStartEpoc(), lvParticipantNum, + transactionState.getTransactionId(), transactionState.getStartEpoch(), lvParticipantNum, location); } }); @@ -1783,7 +1783,7 @@ public class TransactionManager { compPool.submit(new TransactionManagerCallable(transactionState, location, connection) { public Integer call() throws IOException, CommitUnsuccessfulException { - return doPrepareX(regionName, transactionState.getTransactionId(), transactionState.getStartEpoc(), lvParticipantNum, myLocation); + return doPrepareX(regionName, transactionState.getTransactionId(), transactionState.getStartEpoch(), lvParticipantNum, myLocation); } }); } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java index 7c837f4..2dfcc93 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java @@ -46,7 +46,7 @@ public class TransactionState { private final long transactionId; private TransState status; - private long startEpoc; + private long startEpoch; private long startId; private long commitId; @@ -367,17 +367,17 @@ public class TransactionState { * Set the startEpoc. * */ - public void setStartEpoc(final long epoc) { - this.startEpoc = epoc; + public void setStartEpoch(final long epoch) { + this.startEpoch = epoch; } /** - * Get the startEpoc. + * Get the startEpoch. * - * @return Return the startEpoc. + * @return Return the startEpoch. */ - public long getStartEpoc() { - return startEpoc; + public long getStartEpoch() { + return startEpoch; } /** @@ -420,7 +420,7 @@ public class TransactionState { @Override public String toString() { return "transactionId: " + transactionId + ", startId: " + startId + ", commitId: " + commitId + - ", startEpoc: " + startEpoc + ", participants: " + participatingRegions.size() + ", startEpoch: " + startEpoch + ", participants: " + participatingRegions.size() + ", ignoring: " + regionsToIgnore.size() + ", hasDDL: " + hasDDLTx() + ", state: " + status.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl index e351eb3..0bbca72 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl @@ -210,6 +210,8 @@ import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProt import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutTransactionalResponse; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalRequest; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalResponse; +import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest; +import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestRequest; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestResponse; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteRequest; @@ -304,7 +306,7 @@ CoprocessorService, Coprocessor { private int regionState = 0; private Path recoveryTrxPath = null; private int cleanAT = 0; - private long onlineEpoc = EnvironmentEdgeManager.currentTime(); + private long onlineEpoch = EnvironmentEdgeManager.currentTime(); private long[] commitCheckTimes = new long[50]; private long[] hasConflictTimes = new long[50]; @@ -789,7 +791,7 @@ CoprocessorService, Coprocessor { boolean reply = false; long transactionId = request.getTransactionId(); long commitId = request.getCommitId(); - long startEpoc = request.getStartEpoc(); + long startEpoch = request.getStartEpoch(); final int participantNum = request.getParticipantNum(); Throwable t = null; WrongRegionException wre = null; @@ -811,7 +813,7 @@ CoprocessorService, Coprocessor { // Process local memory try { if (LOG.isDebugEnabled()) LOG.debug("commitIfPossible - txId " + transactionId + ", regionName, " + regionInfo.getRegionNameAsString() + "calling internal commitIfPossible"); - reply = commitIfPossible(transactionId, startEpoc, commitId, participantNum); + reply = commitIfPossible(transactionId, startEpoch, commitId, participantNum); } catch (Throwable e) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible - txId " + transactionId + ", Caught exception after internal commitIfPossible call " + e.getMessage() + " " + stackTraceToString(e)); @@ -852,12 +854,12 @@ CoprocessorService, Coprocessor { Throwable t = null; WrongRegionException wre = null; long transactionId = request.getTransactionId(); - long startEpoc = request.getStartEpoc(); + long startEpoch = request.getStartEpoch(); int participantNum = request.getParticipantNum(); boolean dropTableRecorded = request.getDropTableRecorded(); if (LOG.isTraceEnabled()) LOG.trace("commitRequest - txId " - + transactionId + ", startEpoc " + startEpoc + ", participantNum " + participantNum + ", dropTableRecorded " + dropTableRecorded + + + transactionId + ", startEpoch " + startEpoch + ", participantNum " + participantNum + ", dropTableRecorded " + dropTableRecorded + ", regionName " + regionInfo.getRegionNameAsString()); /* commenting out for the time being @@ -876,7 +878,7 @@ CoprocessorService, Coprocessor { { // Process local memory try { - status = commitRequest(transactionId, startEpoc, participantNum, dropTableRecorded); + status = commitRequest(transactionId, startEpoch, participantNum, dropTableRecorded); } catch (UnknownTransactionException u) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught UnknownTransactionException after internal commitRequest call - " + u.toString()); ute = u; @@ -935,7 +937,7 @@ CoprocessorService, Coprocessor { Throwable t = null; WrongRegionException wre = null; long transactionId = request.getTransactionId(); - long startEpoc = request.getStartEpoc(); + long startEpoch = request.getStartEpoch(); int i = 0; int numOfRegion = request.getRegionNameCount(); String requestRegionName; @@ -978,8 +980,8 @@ CoprocessorService, Coprocessor { commitRequestMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_REGIONNOTFOUND_ERR.toString()); } else { - if (i == (numOfRegion - 1)) {status = regionEPCP.commitRequest(transactionId, startEpoc, i, true);} // only the last region flush - else {status = regionEPCP.commitRequest(transactionId, startEpoc, i, false);} + if (i == (numOfRegion - 1)) {status = regionEPCP.commitRequest(transactionId, startEpoch, i, true);} // only the last region flush + else {status = regionEPCP.commitRequest(transactionId, startEpoch, i, false);} } if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitRequestMultiple ends"); //status = commitRequest(transactionId); @@ -5099,16 +5101,16 @@ CoprocessorService, Coprocessor { * @return TransactionRegionInterface commit code * @throws IOException */ - public int commitRequest(final long transactionId, final long startEpoc, final int participantNum) throws IOException, UnknownTransactionException { - return commitRequest(transactionId, startEpoc, participantNum, true, false); + public int commitRequest(final long transactionId, final long startEpoch, final int participantNum) throws IOException, UnknownTransactionException { + return commitRequest(transactionId, startEpoch, participantNum, true, false); } - public int commitRequest(final long transactionId, final long startEpoc, final int participantNum, final boolean dropTableRecorded) + public int commitRequest(final long transactionId, final long startEpoch, final int participantNum, final boolean dropTableRecorded) throws IOException, UnknownTransactionException { - return commitRequest(transactionId, startEpoc, participantNum, true, dropTableRecorded); + return commitRequest(transactionId, startEpoch, participantNum, true, dropTableRecorded); } - public int commitRequest(final long transactionId, final long startEpoc, final int participantNum, boolean flushHLOG, + public int commitRequest(final long transactionId, final long startEpoch, final int participantNum, boolean flushHLOG, boolean dropTableRecorded) throws IOException, UnknownTransactionException { long txid = 0; @@ -5119,9 +5121,9 @@ CoprocessorService, Coprocessor { checkBlockNonPhase2(transactionId); TrxTransactionState state; - if (startEpoc < onlineEpoc) { + if (startEpoch < onlineEpoch) { LOG.info("commitRequest txId: " - + transactionId + " startEpoc " + startEpoc + " is less than region's onlineEpoc " + onlineEpoc + + transactionId + " startEpoch " + startEpoch + " is less than region's onlineEpoch " + onlineEpoch + " for regionName " + lv_regionName + " must return COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR "); return COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR; @@ -5527,7 +5529,7 @@ CoprocessorService, Coprocessor { * @return boolean * @throws IOException */ - public boolean commitIfPossible(final long transactionId, final long startEpoc, final long commitId, final int participantNum) + public boolean commitIfPossible(final long transactionId, final long startEpoch, final long commitId, final int participantNum) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: " @@ -5535,7 +5537,7 @@ CoprocessorService, Coprocessor { checkBlockNonPhase2(transactionId); - int status = commitRequest(transactionId, startEpoc, participantNum); + int status = commitRequest(transactionId, startEpoch, participantNum); if (status == COMMIT_OK) { @@ -6050,6 +6052,27 @@ CoprocessorService, Coprocessor { } } } + + public void pushOnlineEpoch(RpcController controller, + PushEpochRequest request, RpcCallback<PushEpochResponse> done) { + + org.apache.hadoop.hbase.client.Result result = null; + long transId = request.getTransactionId(); + long tmpEpoch = request.getEpoch(); + + if (LOG.isTraceEnabled()) LOG.trace("pushOnlineEpoch ENTRY. Epoch " + tmpEpoch + + " in region: " + regionInfo.getRegionNameAsString()); + + this.onlineEpoch = tmpEpoch; + + org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse.Builder pushEpochResponseBuilder = PushEpochResponse.newBuilder(); + pushEpochResponseBuilder.setHasException(false); + + PushEpochResponse epochResponse = pushEpochResponseBuilder.build(); + done.run(epochResponse); + } + + public void flushToFS(Path flushPath) throws IOException { if(LOG.isTraceEnabled()) LOG.trace("flushToFS -- ENTRY, Path: " + flushPath.toString()); @@ -6089,7 +6112,7 @@ CoprocessorService, Coprocessor { } } txnPersistBuilder.setNextSeqId(nextSequenceId.get()); - txnPersistBuilder.setOnlineEpoc(this.onlineEpoc); + txnPersistBuilder.setOnlineEpoch(this.onlineEpoch); ByteArrayOutputStream output = new ByteArrayOutputStream(); @@ -6251,8 +6274,8 @@ CoprocessorService, Coprocessor { } this.nextSequenceId = new AtomicLong(txnPersistMsg.getNextSeqId()); - this.onlineEpoc = txnPersistMsg.getOnlineEpoc(); - LOG.info("Setting onlineEpoc after split to " + this.onlineEpoc); + this.onlineEpoch = txnPersistMsg.getOnlineEpoch(); + LOG.info("Setting onlineEpoch after split to " + this.onlineEpoch); } catch(IOException e) { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java index 79a687b..71a8a49 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java @@ -1,3 +1,24 @@ +// @@@ START COPYRIGHT @@@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// @@@ END COPYRIGHT @@@ + // Generated by the protocol buffer compiler. DO NOT EDIT! // source: SsccRegion.proto
