Github user prashanth-vasudev commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/993#discussion_r107265851
  
    --- Diff: 
core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
 ---
    @@ -1232,8 +1203,218 @@ public void run() {
                     }
                     if(LOG.isDebugEnabled()) LOG.debug("Exiting recovery 
thread for tm ID: " + tmID);
                 }
    -     }
    +            
    +    private  Map<Long, TransactionState> getTransactionsFromRegions(
    +                           Map<String, byte[]> regions)
    +                           throws IOException, KeeperException,
    +                           DeserializationException
    +    {
    +        if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt 
region size " + regions.size());
    +        for (Map.Entry<String, byte[]> regionEntry : regions.entrySet()) {
    +            Map<Long, TransactionState> transactionStates =
    +                            new HashMap<Long, TransactionState>();
    +            List<Long> TxRecoverList = new ArrayList<Long>();
    +            String hostnamePort = regionEntry.getKey();
    +            byte[] regionBytes = regionEntry.getValue();
    +            if (LOG.isDebugEnabled())
    +                LOG.debug("TRAF RCOV THREAD:Recovery Thread Processing 
region: " + new String(regionBytes));
    +            if (recoveryIterations == 0) {
    +               if(LOG.isWarnEnabled()) {
    +                  //  Let's get the host name
    +                  final byte [] delimiter = ",".getBytes();
    +                  String[] hostname = hostnamePort.split(new 
String(delimiter), 3);
    +                  if (hostname.length < 2) {
    +                     throw new IllegalArgumentException("hostnamePort 
format is incorrect");
    +                  }
    +  
    +                  LOG.warn ("TRAF RCOV THREAD:Starting recovery with " + 
regions.size() +
    +                       " regions to recover.  First region hostname: " + 
hostnamePort +
    +                       " Recovery iterations: " + recoveryIterations);
    +               }
    +            }
    +            else {
    +               if(recoveryIterations % 10 == 0) {
    +                  if(LOG.isWarnEnabled()) {
    +                     //  Let's get the host name
    +                     final byte [] delimiter = ",".getBytes();
    +                     String[] hostname = hostnamePort.split(new 
String(delimiter), 3);
    +                     if (hostname.length < 2) {
    +                        throw new IllegalArgumentException("hostnamePort 
format is incorrect");
    +                     }
    +                     LOG.warn("TRAF RCOV THREAD:Recovery thread 
encountered " + regions.size() +
    +                       " regions to recover.  First region hostname: " + 
hostnamePort +
    +                       " Recovery iterations: " + recoveryIterations);
    +                  }
    +               }
    +            }
    +            try {
    +                TxRecoverList = txnManager.recoveryRequest(hostnamePort, 
regionBytes, tmID);
    +            }
    +            catch (IOException e) {
    +               // For all cases of Exception, we rely on the region to 
redrive the request.
    +               // Likely there is nothing to recover, due to a stale 
region entry, but it is always safe to redrive.
    +               // We log a warning event and delete the ZKNode entry.
    +               LOG.warn("TRAF RCOV THREAD:Exception calling 
txnManager.recoveryRequest. " + "TM: " +
    +                          tmID + " regionBytes: [" + regionBytes + "].  
Deleting zookeeper region entry. \n exception: ", e);
    +               zookeeper.deleteRegionEntry(regionEntry);
    +  
    +               // In the case of NotServingRegionException we will repost 
the ZKNode after refreshing the table.
    +               if ((e instanceof NotServingRegionException) || 
(e.getCause() instanceof NotServingRegionException)){
    +                   // Create a local HTable object using the regionInfo
    +                   HTable table = new HTable(config, 
HRegionInfo.parseFrom(regionBytes).getTable().getNameAsString());
    +                   // Repost a zookeeper entry for all current regions in 
the table
    +                   zookeeper.postAllRegionEntries(table);
    +               }
    +            } // IOException
    +  
    +            if (TxRecoverList != null) {
    +                if (LOG.isDebugEnabled()) LOG.trace("TRAF RCOV THREAD:size 
of TxRecoverList " + TxRecoverList.size());
    +                if (TxRecoverList.size() == 0) {
    +                     // First delete the zookeeper entry
    +                     LOG.warn("TRAF RCOV THREAD:Leftover Znode  calling 
txnManager.recoveryRequest. " + "TM: " +
    +                             tmID + " regionBytes: [" + regionBytes + "].  
Deleting zookeeper region entry. ");
    +                     zookeeper.deleteRegionEntry(regionEntry);
    +               }
    +               for (Long txid : TxRecoverList) {
    +                  TransactionState ts = transactionStates.get(txid);
    +                  if (ts == null) {
    +                     ts = new TransactionState(txid);
    +  
    +                     //Identify if DDL is part of this transaction and 
valid
    +                     if(hbtx.useDDLTrans){
    +                        TmDDL tmDDL = hbtx.getTmDDL();
    +                        StringBuilder state = new StringBuilder ();
    +                        tmDDL.getState(txid,state);
    +                        if(state.toString().equals("VALID"))
    +                           ts.setDDLTx(true);
    +                     }
    +                  }
    +                  this.addRegionToTS(hostnamePort, regionBytes, ts);
    +                  transactionStates.put(txid, ts);
    +               }
    +            }
    +            else if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV 
THREAD:size od TxRecoverList is NULL ");
    +            
    +            return transactionStates;
    +        }
    +        return null;
    +    }
    +              
    +    private  Map<Long, TransactionState> getTransactionsFromTmDDL()
    +                                                throws IOException
    +    {
    +      if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: Checking for 
DDL only recovery");
    +    
    +      //Access TMDDL, return null if not enabled.
    +      if(! hbtx.useDDLTrans)
    +        return null;
    +      
    +      Map<Long, TransactionState> transactionStates = null;
    +      TmDDL tmDDL = hbtx.getTmDDL();
    +      List<Long> txIdList  = tmDDL.getTxIdList(tmID);
    +      
    +      //This list of txID is specific to tmID owner.
    +      //This list may include txId that are:
    +      //1. currently in ACTIVE state. RecoverTransactions() call takes 
care of
    +      //ignoring TxId which are currently actively in progress.
    +      //2. Txids regions which have not yet requested for help(regions 
requesting help
    +      //from zookeeper) , probably will, could be timing. 
    +      //3. Txids regions which have already requested for help.
    +      //4. Txids whose regions have already serviced, but only require 
recovery
    +      //from DDL perspective.
    +      //For 2 and 3 use cases above, those regions will ultimately seek 
help if
    +      //they need help. So no need to handle those regions here. We are 
only
    +      //interested to handle use case 4. If usecase 4 also involves DML 
regions
    +      //it is ok to recover the DDL only here and not dependent on DML 
regions.
    +      //
    +      //Note that recoverTransactions() attempts recovery, its a no-op if 
those
    +      //txids are completed for some reason, some of the regions might 
have completed
    +      //processing, ignoreUnknownTransactionException is enabled.
    +      if(txIdList != null && txIdList.size() > 0)
    +      {
    +        transactionStates = new HashMap<Long, TransactionState>();
    +        for (Long txid : txIdList)
    +        {
    +          //build ts object
    +          TransactionState  ts = new TransactionState(txid);
    +          ts.setDDLTx(true);
    +          transactionStates.put(txid, ts);
    +        }
    +      }
    +      return transactionStates;
    +    }
    +
    +    private void recoverTransactions(Map<Long, TransactionState> 
transactionStates) throws IOException
    +    {
    +        if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt 
transaction size " + transactionStates.size());
    +        
    +        for (Map.Entry<Long, TransactionState> tsEntry : 
transactionStates.entrySet()) {
    +            int isTransactionStillAlive = 0;
    +            TransactionState ts = tsEntry.getValue();
    +            Long txID = ts.getTransactionId();
    +            // TransactionState ts = new TransactionState(txID);
    +            
    +            //It is possible for long prepare situations that involve 
multiple DDL
    +            //operations, multiple prompts from RS is received. Hence 
check to see if there
    +            //is a TS object in main TS list and transaction is still 
active.
    +            //Note that tsEntry is local TS object. 
    +            if (hbtx.mapTransactionStates.get(txID) != null) {
    +              if 
(hbtx.mapTransactionStates.get(txID).getStatus().toString().contains("ACTIVE")) 
{
    +                isTransactionStillAlive = 1;
    +              }
    +              if (LOG.isInfoEnabled()) 
    +              LOG.info("TRAF RCOV THREAD: TID " + txID
    +                        + " still has TS object in TM memory. TS details: "
    +                        + hbtx.mapTransactionStates.get(txID).toString() 
    +                        + " transactionAlive: " + isTransactionStillAlive);
    +              if(isTransactionStillAlive == 1)
    +                continue; //for loop
    +            }
    +           
    +            try {
    +                audit.getTransactionState(ts);
    +                if 
(ts.getStatus().equals(TransState.STATE_COMMITTED.toString())) {
    +                    if (LOG.isDebugEnabled())
    +                        LOG.debug("TRAF RCOV THREAD:Redriving commit for " 
+ txID + " number of regions " + ts.getParticipatingRegions().size() +
    +                                " and tolerating 
UnknownTransactionExceptions");
    +                    txnManager.doCommit(ts, true /*ignore 
UnknownTransactionException*/);
    +                    if(useTlog && useForgotten) {
    +                        long nextAsn = 
tLog.getNextAuditSeqNum((int)TransactionState.getNodeId(txID));
    +                        tLog.putSingleRecord(txID, ts.getCommitId(), 
"FORGOTTEN", null, forceForgotten, nextAsn);
    +                    }
    +                } else if 
(ts.getStatus().equals(TransState.STATE_ABORTED.toString())) {
    +                    if (LOG.isDebugEnabled())
    +                        LOG.debug("TRAF RCOV THREAD:Redriving abort for " 
+ txID);
    +                    txnManager.abort(ts);
    --- End diff --
    
    abort does not take ignore unknownTransactionException flag. We can add if 
abort starts to support this flag.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to