Github user prashanth-vasudev commented on a diff in the pull request:
https://github.com/apache/incubator-trafodion/pull/993#discussion_r107258795
--- Diff:
core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
---
@@ -1232,8 +1203,218 @@ public void run() {
}
if(LOG.isDebugEnabled()) LOG.debug("Exiting recovery
thread for tm ID: " + tmID);
}
- }
+
+ private Map<Long, TransactionState> getTransactionsFromRegions(
+ Map<String, byte[]> regions)
+ throws IOException, KeeperException,
+ DeserializationException
+ {
+ if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt
region size " + regions.size());
+ for (Map.Entry<String, byte[]> regionEntry : regions.entrySet()) {
+ Map<Long, TransactionState> transactionStates =
+ new HashMap<Long, TransactionState>();
+ List<Long> TxRecoverList = new ArrayList<Long>();
+ String hostnamePort = regionEntry.getKey();
+ byte[] regionBytes = regionEntry.getValue();
+ if (LOG.isDebugEnabled())
+ LOG.debug("TRAF RCOV THREAD:Recovery Thread Processing
region: " + new String(regionBytes));
+ if (recoveryIterations == 0) {
+ if(LOG.isWarnEnabled()) {
+ // Let's get the host name
+ final byte [] delimiter = ",".getBytes();
+ String[] hostname = hostnamePort.split(new
String(delimiter), 3);
+ if (hostname.length < 2) {
+ throw new IllegalArgumentException("hostnamePort
format is incorrect");
+ }
+
+ LOG.warn ("TRAF RCOV THREAD:Starting recovery with " +
regions.size() +
+ " regions to recover. First region hostname: " +
hostnamePort +
+ " Recovery iterations: " + recoveryIterations);
+ }
+ }
+ else {
+ if(recoveryIterations % 10 == 0) {
+ if(LOG.isWarnEnabled()) {
+ // Let's get the host name
+ final byte [] delimiter = ",".getBytes();
+ String[] hostname = hostnamePort.split(new
String(delimiter), 3);
+ if (hostname.length < 2) {
+ throw new IllegalArgumentException("hostnamePort
format is incorrect");
+ }
+ LOG.warn("TRAF RCOV THREAD:Recovery thread
encountered " + regions.size() +
+ " regions to recover. First region hostname: " +
hostnamePort +
+ " Recovery iterations: " + recoveryIterations);
+ }
+ }
+ }
+ try {
+ TxRecoverList = txnManager.recoveryRequest(hostnamePort,
regionBytes, tmID);
+ }
+ catch (IOException e) {
+ // For all cases of Exception, we rely on the region to
redrive the request.
+ // Likely there is nothing to recover, due to a stale
region entry, but it is always safe to redrive.
+ // We log a warning event and delete the ZKNode entry.
+ LOG.warn("TRAF RCOV THREAD:Exception calling
txnManager.recoveryRequest. " + "TM: " +
+ tmID + " regionBytes: [" + regionBytes + "].
Deleting zookeeper region entry. \n exception: ", e);
+ zookeeper.deleteRegionEntry(regionEntry);
+
+ // In the case of NotServingRegionException we will repost
the ZKNode after refreshing the table.
+ if ((e instanceof NotServingRegionException) ||
(e.getCause() instanceof NotServingRegionException)){
+ // Create a local HTable object using the regionInfo
+ HTable table = new HTable(config,
HRegionInfo.parseFrom(regionBytes).getTable().getNameAsString());
+ // Repost a zookeeper entry for all current regions in
the table
+ zookeeper.postAllRegionEntries(table);
+ }
+ } // IOException
+
+ if (TxRecoverList != null) {
+ if (LOG.isDebugEnabled()) LOG.trace("TRAF RCOV THREAD:size
of TxRecoverList " + TxRecoverList.size());
+ if (TxRecoverList.size() == 0) {
+ // First delete the zookeeper entry
+ LOG.warn("TRAF RCOV THREAD:Leftover Znode calling
txnManager.recoveryRequest. " + "TM: " +
+ tmID + " regionBytes: [" + regionBytes + "].
Deleting zookeeper region entry. ");
+ zookeeper.deleteRegionEntry(regionEntry);
+ }
+ for (Long txid : TxRecoverList) {
+ TransactionState ts = transactionStates.get(txid);
+ if (ts == null) {
+ ts = new TransactionState(txid);
+
+ //Identify if DDL is part of this transaction and
valid
+ if(hbtx.useDDLTrans){
+ TmDDL tmDDL = hbtx.getTmDDL();
+ StringBuilder state = new StringBuilder ();
+ tmDDL.getState(txid,state);
+ if(state.toString().equals("VALID"))
+ ts.setDDLTx(true);
+ }
+ }
+ this.addRegionToTS(hostnamePort, regionBytes, ts);
+ transactionStates.put(txid, ts);
+ }
+ }
+ else if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV
THREAD:size od TxRecoverList is NULL ");
+
+ return transactionStates;
+ }
+ return null;
+ }
+
+ private Map<Long, TransactionState> getTransactionsFromTmDDL()
+ throws IOException
+ {
+ if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: Checking for
DDL only recovery");
+
+ //Access TMDDL, return null if not enabled.
+ if(! hbtx.useDDLTrans)
+ return null;
+
+ Map<Long, TransactionState> transactionStates = null;
+ TmDDL tmDDL = hbtx.getTmDDL();
+ List<Long> txIdList = tmDDL.getTxIdList(tmID);
+
+ //This list of txID is specific to tmID owner.
+ //This list may include txId that are:
+ //1. currently in ACTIVE state. RecoverTransactions() call takes
care of
+ //ignoring TxId which are currently actively in progress.
+ //2. Txids regions which have not yet requested for help(regions
requesting help
+ //from zookeeper) , probably will, could be timing.
+ //3. Txids regions which have already requested for help.
+ //4. Txids whose regions have already serviced, but only require
recovery
+ //from DDL perspective.
+ //For 2 and 3 use cases above, those regions will ultimately seek
help if
+ //they need help. So no need to handle those regions here. We are
only
+ //interested to handle use case 4. If usecase 4 also involves DML
regions
+ //it is ok to recover the DDL only here and not dependent on DML
regions.
+ //
+ //Note that recoverTransactions() attempts recovery, its a no-op if
those
+ //txids are completed for some reason, some of the regions might
have completed
+ //processing, ignoreUnknownTransactionException is enabled.
+ if(txIdList != null && txIdList.size() > 0)
+ {
+ transactionStates = new HashMap<Long, TransactionState>();
+ for (Long txid : txIdList)
+ {
+ //build ts object
+ TransactionState ts = new TransactionState(txid);
+ ts.setDDLTx(true);
+ transactionStates.put(txid, ts);
+ }
+ }
+ return transactionStates;
+ }
+
+ private void recoverTransactions(Map<Long, TransactionState>
transactionStates) throws IOException
+ {
+ if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt
transaction size " + transactionStates.size());
+
+ for (Map.Entry<Long, TransactionState> tsEntry :
transactionStates.entrySet()) {
+ int isTransactionStillAlive = 0;
+ TransactionState ts = tsEntry.getValue();
+ Long txID = ts.getTransactionId();
+ // TransactionState ts = new TransactionState(txID);
+
+ //It is possible for long prepare situations that involve
multiple DDL
+ //operations, multiple prompts from RS is received. Hence
check to see if there
+ //is a TS object in main TS list and transaction is still
active.
+ //Note that tsEntry is local TS object.
+ if (hbtx.mapTransactionStates.get(txID) != null) {
+ if
(hbtx.mapTransactionStates.get(txID).getStatus().toString().contains("ACTIVE"))
{
+ isTransactionStillAlive = 1;
+ }
+ if (LOG.isInfoEnabled())
+ LOG.info("TRAF RCOV THREAD: TID " + txID
+ + " still has TS object in TM memory. TS details: "
+ + hbtx.mapTransactionStates.get(txID).toString()
+ + " transactionAlive: " + isTransactionStillAlive);
+ if(isTransactionStillAlive == 1)
+ continue; //for loop
+ }
+
+ try {
+ audit.getTransactionState(ts);
+ if
(ts.getStatus().equals(TransState.STATE_COMMITTED.toString())) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("TRAF RCOV THREAD:Redriving commit for "
+ txID + " number of regions " + ts.getParticipatingRegions().size() +
+ " and tolerating
UnknownTransactionExceptions");
+ txnManager.doCommit(ts, true /*ignore
UnknownTransactionException*/);
+ if(useTlog && useForgotten) {
+ long nextAsn =
tLog.getNextAuditSeqNum((int)TransactionState.getNodeId(txID));
+ tLog.putSingleRecord(txID, ts.getCommitId(),
"FORGOTTEN", null, forceForgotten, nextAsn);
+ }
+ } else if
(ts.getStatus().equals(TransState.STATE_ABORTED.toString())) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("TRAF RCOV THREAD:Redriving abort for "
+ txID);
+ txnManager.abort(ts);
--- End diff --
@sbroeder : Do we need true /*ignore UnknownTransactionException*/ for
abort as well?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---