Repository: incubator-trafodion Updated Branches: refs/heads/master 643f82ba5 -> 67288b3cd
[TRAFODION-2150] and [TRAFODION-2151] TM long prepare and chore thread issues. Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/30c8a83d Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/30c8a83d Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/30c8a83d Branch: refs/heads/master Commit: 30c8a83d1512667f5ecfe2c7384372f646f18ae3 Parents: 624d61e Author: Prashant Vasudev <[email protected]> Authored: Thu Jan 19 00:11:33 2017 +0000 Committer: Prashant Vasudev <[email protected]> Committed: Thu Jan 19 00:11:33 2017 +0000 ---------------------------------------------------------------------- .../transactional/TrxRegionEndpoint.java.tmpl | 49 ++++++++++++++++++-- .../java/org/trafodion/dtm/HBaseTxClient.java | 19 ++++++++ 2 files changed, 65 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/30c8a83d/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl index f824a4f..403df5f 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl @@ -93,6 +93,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +#ifdef HDP2.3 HDP2.4 CDH5.5 CDH5.7 APACHE1.2 +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.ScheduledChore; +#endif import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -309,7 +313,7 @@ CoprocessorService, Coprocessor { static MemoryUsageChore memoryUsageThread = null; Stoppable stoppable = new StoppableImplementation(); static Stoppable stoppable2 = new StoppableImplementation(); - private int cleanTimer = 5000; // Five minutes + private int cleanTimer = 5000; // 5 secs overriden by DEFAULT_SLEEP private int memoryUsageTimer = 60000; // One minute private int regionState = 0; private Path recoveryTrxPath = null; @@ -361,6 +365,7 @@ CoprocessorService, Coprocessor { private boolean configuredEarlyLogging = false; private boolean configuredConflictReinstate = false; private static Object zkRecoveryCheckLock = new Object(); + private static Object choreDetectStaleBranchLock = new Object(); private static ZooKeeperWatcher zkw1 = null; String lv_hostName; int lv_port; @@ -435,6 +440,13 @@ CoprocessorService, Coprocessor { public static final int REGION_STATE_START = 2; public static final String trxkeyEPCPinstance = "EPCPinstance"; + + #ifdef HDP2.3 HDP2.4 CDH5.5 CDH5.7 APACHE1.2 + static ChoreService s_ChoreService = null; + #endif + static int txnChoreServiceThreadPoolSize = 1; + public static final int DEFAULT_TXN_CHORE_SERVICE_THREAD_POOL_SIZE=5; + // TBD Maybe we should just use HashMap to improve the performance, ConcurrentHashMap could be too strict static ConcurrentHashMap<String, Object> transactionsEPCPMap; // TrxRegionService methods @@ -3876,9 +3888,19 @@ CoprocessorService, Coprocessor { + this.suppressOutOfOrderProtocolException); // Start the clean core thread - this.cleanOldTransactionsThread = new CleanOldTransactionsChore(this, cleanTimer, stoppable); +#ifdef HDP2.3 HDP2.4 CDH5.5 CDH5.7 APACHE1.2 + this.txnChoreServiceThreadPoolSize = + tmp_env.getConfiguration().getInt("hbase.regionserver.region.transactional.chore_service_thread_pool_size", + DEFAULT_TXN_CHORE_SERVICE_THREAD_POOL_SIZE); + if (LOG.isTraceEnabled()) LOG.trace("Transactional chore thread pool size setting is " + this.txnChoreServiceThreadPoolSize); + + if (this.cleanOldTransactionsThread != null) { + setupChoreService(); + s_ChoreService.scheduleChore(this.cleanOldTransactionsThread); + } +#else UncaughtExceptionHandler handler = new UncaughtExceptionHandler() { public void uncaughtException(final Thread t, final Throwable e) @@ -3891,6 +3913,7 @@ CoprocessorService, Coprocessor { ChoreThread = new Thread(this.cleanOldTransactionsThread); Threads.setDaemonThreadRunning(ChoreThread, n + ".oldTransactionCleaner", handler); +#endif // Start the memory usage chore thread if the threshold // selected is greater than the default of 100%. @@ -4080,6 +4103,22 @@ CoprocessorService, Coprocessor { } +#ifdef HDP2.3 HDP2.4 CDH5.5 + private synchronized void setupChoreService() { + if (s_ChoreService == null) { + s_ChoreService = new ChoreService("Cleanup ChoreService", this.txnChoreServiceThreadPoolSize); + } + } +#endif + +#ifdef CDH5.7 APACHE1.2 + private synchronized void setupChoreService() { + if (s_ChoreService == null) { + s_ChoreService = new ChoreService("Cleanup ChoreService", this.txnChoreServiceThreadPoolSize, true); + } + } +#endif + // Internal support methods /** * Checks if the region is closing and needs to block all activity @@ -4216,6 +4255,8 @@ CoprocessorService, Coprocessor { public void choreThreadDetectStaleTransactionBranch() { + synchronized(choreDetectStaleBranchLock) { + List<Integer> staleBranchforTMId = new ArrayList<Integer>(); List<TrxTransactionState> commitPendingCopy = new ArrayList<TrxTransactionState>(commitPendingTransactions); Map<Long, List<WALEdit>> indoubtTransactionsMap = new TreeMap<Long, List<WALEdit>>(indoubtTransactionsById); @@ -4252,7 +4293,6 @@ CoprocessorService, Coprocessor { } } } - if (!staleBranchforTMId.isEmpty()) { for (int i = 0; i < staleBranchforTMId.size(); i++) { try { @@ -4270,6 +4310,8 @@ CoprocessorService, Coprocessor { commitPendingCopy.clear(); indoubtTransactionsMap.clear(); staleBranchforTMId.clear(); + + } //synchronized } public void createRecoveryzNode(int node, String encodedName, byte [] data) throws IOException { @@ -6678,6 +6720,7 @@ CoprocessorService, Coprocessor { @Override public void stop(String why) { + LOG.info("Cleanup Chore thread has stopped: Reason:" + why); this.stop = true; } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/30c8a83d/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java index 1b2ebed..0435175 100644 --- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java +++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java @@ -1108,9 +1108,28 @@ public class HBaseTxClient { } if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt transaction size " + transactionStates.size()); for (Map.Entry<Long, TransactionState> tsEntry : transactionStates.entrySet()) { + int isTransactionStillAlive = 0; TransactionState ts = tsEntry.getValue(); Long txID = ts.getTransactionId(); // TransactionState ts = new TransactionState(txID); + + //It is possible for long prepare situations that involve multiple DDL + //operations, multiple prompts from RS is received. Hence check to see if there + //is a TS object in main TS list and transaction is still active. + //Note that tsEntry is local TS object. + if (hbtx.mapTransactionStates.get(txID) != null) { + if (hbtx.mapTransactionStates.get(txID).getStatus().toString().contains("ACTIVE")) { + isTransactionStillAlive = 1; + } + if (LOG.isInfoEnabled()) + LOG.info("TRAF RCOV THREAD: TID " + txID + + " still has ts object in TM memory with state " + + hbtx.mapTransactionStates.get(txID).getStatus().toString() + + " transactionAlive: " + isTransactionStillAlive); + if(isTransactionStillAlive == 1) + continue; //for loop + } + try { audit.getTransactionState(ts); if (ts.getStatus().equals(TransState.STATE_COMMITTED.toString())) {
