[TRAFODION-1663] Prepare, Commit, and Abort requests from TransactionManager now carry a participant number that helps track the specific requenst in the RegionEndpointCoprocessor causing a problem when an UnknownTransactionException is thrown.
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/84af9603 Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/84af9603 Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/84af9603 Branch: refs/heads/master Commit: 84af9603c6ddb5f935ba8e1a535ba4dc28d1988b Parents: fbfab1e Author: Sean Broeder <[email protected]> Authored: Mon Dec 7 17:03:32 2015 +0000 Committer: Sean Broeder <[email protected]> Committed: Mon Dec 7 17:03:32 2015 +0000 ---------------------------------------------------------------------- core/sqf/src/seatrans/hbase-trx/pom.xml.apache | 4 + .../transactional/TransactionManager.java | 313 +++--- .../transactional/TrxRegionEndpoint.java | 70 +- .../generated/SsccRegionProtos.java | 131 +-- .../generated/TrxRegionProtos.java | 1020 ++++++++++++++---- .../hbase-trx/src/main/protobuf/TrxRegion.proto | 11 +- 6 files changed, 1118 insertions(+), 431 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/84af9603/core/sqf/src/seatrans/hbase-trx/pom.xml.apache ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/pom.xml.apache b/core/sqf/src/seatrans/hbase-trx/pom.xml.apache index c717479..d98e561 100755 --- a/core/sqf/src/seatrans/hbase-trx/pom.xml.apache +++ b/core/sqf/src/seatrans/hbase-trx/pom.xml.apache @@ -63,6 +63,8 @@ <!--<hbase.version>0.98.3-hadoop2</hbase.version>--> <!--<hbase.version>0.98.3-hadoop1</hbase.version>--> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <protobuf.version>2.5.0</protobuf.version> + <protocVersion>2.5.0</protocVersion> <java.version>1.7</java.version> <trx-suffix>apache</trx-suffix> </properties> @@ -338,6 +340,8 @@ if we can combine these profiles somehow --> <goal>protoc</goal> </goals> <configuration> + <protobuf.version>2.5.0</protobuf.version> + <protocVersion>2.5.0</protocVersion> <imports> <param>${basedir}/src/main/protobuf</param> <param>${basedir}/hbase-protocol/src/main/protobuf</param> http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/84af9603/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java index f8f5eaf..b9832a1 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java @@ -223,7 +223,6 @@ public class TransactionManager { return g_TransactionManager; } - /* increment/deincrement for positive value */ /* This method copied from o.a.h.h.utils.Bytes */ public static byte [] binaryIncrementPos(byte [] value, long amount) { @@ -257,7 +256,7 @@ public class TransactionManager { hbadmin = new HBaseAdmin(config); } catch(Exception e) { - System.out.println("ERROR: Unable to obtain HBase accessors, Exiting"); + System.out.println("ERROR: Unable to obtain HBase accessors, Exiting " + e); e.printStackTrace(); System.exit(1); } @@ -281,7 +280,7 @@ public class TransactionManager { table = new HTable(location.getRegionInfo().getTable(), connection, cp_tpe); } catch(IOException e) { e.printStackTrace(); - LOG.error("Error obtaining HTable instance"); + LOG.error("Error obtaining HTable instance " + e); table = null; } startKey = location.getRegionInfo().getStartKey(); @@ -299,7 +298,11 @@ public class TransactionManager { * Return : Always 0, can ignore * Purpose : Call commit for a given regionserver */ - public Integer doCommitX(final byte[] regionName, final long transactionId, final long commitId, final boolean ignoreUnknownTransactionException) throws CommitUnsuccessfulException, IOException { + public Integer doCommitX(final byte[] regionName, + final long transactionId, + final long commitId, + final int participantNum, + final boolean ignoreUnknownTransaction) throws CommitUnsuccessfulException, IOException { boolean retry = false; boolean refresh = false; @@ -310,8 +313,9 @@ public class TransactionManager { do { try { - if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- ENTRY txid: " + transactionId + - " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException); + if (LOG.isDebugEnabled()) LOG.debug("doCommitX -- ENTRY txid: " + transactionId + + " participantNum " + participantNum + + " ignoreUnknownTransaction: " + ignoreUnknownTransaction); Batch.Call<TrxRegionService, CommitResponse> callable = new Batch.Call<TrxRegionService, CommitResponse>() { ServerRpcController controller = new ServerRpcController(); @@ -322,8 +326,9 @@ public class TransactionManager { public CommitResponse call(TrxRegionService instance) throws IOException { org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequest.Builder builder = CommitRequest.newBuilder(); builder.setTransactionId(transactionId); + builder.setParticipantNum(participantNum); builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName))); //ByteString.copyFromUtf8(Bytes.toString(regionName))); - builder.setIgnoreUnknownTransactionException(ignoreUnknownTransactionException); + builder.setIgnoreUnknownTransactionException(ignoreUnknownTransaction); instance.commit(controller, builder.build(), rpcCallback); return rpcCallback.get(); @@ -333,16 +338,18 @@ public class TransactionManager { Map<byte[], CommitResponse> result = null; try { if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- before coprocessorService txid: " + transactionId + - " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException + " table: " + + " ignoreUnknownTransaction: " + ignoreUnknownTransaction + " table: " + table.toString() + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8")); result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable); } catch (Throwable e) { - String msg = "ERROR occurred while calling doCommitX coprocessor service in doCommitX"; - LOG.error(msg + ":" + e); + String msg = new String ("ERROR occurred while calling doCommitX coprocessor service in doCommitX for transaction: " + + transactionId + " participantNum " + participantNum + " Exception: " + e); + LOG.error(msg); throw new Exception(msg); } if(result.size() == 0) { - LOG.error("doCommitX, received incorrect result size: " + result.size() + " txid: " + transactionId); + LOG.error("doCommitX, received incorrect result size: " + result.size() + " txid: " + + transactionId + " location: " + location.getRegionInfo().getRegionNameAsString()); refresh = true; retry = true; } @@ -351,8 +358,9 @@ public class TransactionManager { for (CommitResponse cresponse : result.values()){ if(cresponse.getHasException()) { String exceptionString = new String (cresponse.getException().toString()); + LOG.error("doCommitX - exceptionString: " + exceptionString); if (exceptionString.contains("UnknownTransactionException")) { - if (ignoreUnknownTransactionException == true) { + if (ignoreUnknownTransaction == true) { if (LOG.isTraceEnabled()) LOG.trace("doCommitX, ignoring UnknownTransactionException in cresponse"); } else { @@ -372,8 +380,9 @@ public class TransactionManager { for (CommitResponse cresponse : result.values()){ if(cresponse.getHasException()) { String exceptionString = new String (cresponse.getException().toString()); + LOG.error("doCommitX - exceptionString: " + exceptionString); if (exceptionString.contains("UnknownTransactionException")) { - if (ignoreUnknownTransactionException == true) { + if (ignoreUnknownTransaction == true) { if (LOG.isTraceEnabled()) LOG.trace("doCommitX, ignoring UnknownTransactionException in cresponse"); } else { @@ -395,18 +404,20 @@ public class TransactionManager { } catch (UnknownTransactionException ute) { - LOG.error("Got unknown exception in doCommitX for transaction: " + transactionId + " " + ute); + LOG.error("Got unknown exception in doCommitX by participant " + participantNum + + " for transaction: " + transactionId + " " + ute); transactionState.requestPendingCountDec(true); throw new UnknownTransactionException(); } catch (Exception e) { if(e.toString().contains("Asked to commit a non-pending transaction")) { - if (LOG.isDebugEnabled()) LOG.debug("doCommitX will not retry: " + e); + LOG.error("doCommitX transaction: " + + transactionId + " will not retry: " + e); refresh = false; retry = false; } else { - LOG.error("doCommitX retrying due to Exception: " + e); + LOG.error("doCommitX retrying transaction: " + transactionId + " due to Exception: " + e); refresh = true; retry = true; } @@ -415,8 +426,6 @@ public class TransactionManager { HRegionLocation lv_hrl = table.getRegionLocation(startKey); HRegionInfo lv_hri = lv_hrl.getRegionInfo(); - //String lv_node = lv_hrl.getHostname(); - //int lv_length = lv_node.indexOf('.'); if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + " endKey: " + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId); @@ -429,14 +438,11 @@ public class TransactionManager { throw new CommitUnsuccessfulException("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId); } -// if ((location.getRegionInfo().getEncodedName().compareTo(lv_hri.getEncodedName()) != 0) || // Encoded name is different -// (location.getHostname().regionMatches(0, lv_node, 0, lv_length) == false)) { // Node is different - if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- " + table.toString() + " location being refreshed"); - if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- lv_hri: " + lv_hri); - if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- location.getRegionInfo(): " + location.getRegionInfo()); - table.getRegionLocation(startKey, true); -// } - if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- setting retry, count: " + retryCount); + if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- " + table.toString() + " location being refreshed"); + if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- lv_hri: " + lv_hri); + if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- location.getRegionInfo(): " + location.getRegionInfo()); + table.getRegionLocation(startKey, true); + if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- setting retry, count: " + retryCount); refresh = false; } @@ -460,7 +466,7 @@ public class TransactionManager { try { if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- ENTRY txid: " + transactionId + - " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException); + " ignoreUnknownTransaction: " + ignoreUnknownTransaction); Batch.Call<SsccRegionService, SsccCommitResponse> callable = new Batch.Call<SsccRegionService, SsccCommitResponse>() { ServerRpcController controller = new ServerRpcController(); @@ -473,7 +479,7 @@ public class TransactionManager { builder.setTransactionId(transactionId); builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName))); //ByteString.copyFromUtf8(Bytes.toString(regionName))); builder.setCommitId(commitId); - builder.setIgnoreUnknownTransactionException(ignoreUnknownTransactionException); + builder.setIgnoreUnknownTransactionException(ignoreUnknownTransaction); instance.commit(controller, builder.build(), rpcCallback); return rpcCallback.get(); @@ -483,7 +489,7 @@ public class TransactionManager { Map<byte[], SsccCommitResponse> result = null; try { if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- before coprocessorService txid: " + transactionId + - " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException + " table: " + + " ignoreUnknownTransaction: " + ignoreUnknownTransaction + " table: " + table.toString() + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8")); result = table.coprocessorService(SsccRegionService.class, startKey, endKey, callable); } catch (Throwable e) { @@ -502,7 +508,7 @@ public class TransactionManager { if(cresponse.getHasException()) { String exceptionString = new String (cresponse.getException().toString()); if (exceptionString.contains("UnknownTransactionException")) { - if (ignoreUnknownTransactionException == true) { + if (ignoreUnknownTransaction == true) { if (LOG.isTraceEnabled()) LOG.trace("doCommitX, ignoring UnknownTransactionException in cresponse"); } else { @@ -520,12 +526,14 @@ public class TransactionManager { } } catch (UnknownTransactionException ute) { - LOG.error("Got unknown exception in doCommitX for transaction: " + transactionId + " " + ute); + LOG.error("Got unknown exception in doCommitX by participant " + participantNum + + " for transaction: " + transactionId + " " + ute); transactionState.requestPendingCountDec(true); throw new UnknownTransactionException(); } catch (Exception e) { - LOG.error("doCommitX retrying due to Exception: " + e); + LOG.error("doCommitX participant " + participantNum + " retrying transaction " + + transactionId + " due to Exception: " + e); refresh = true; retry = true; } @@ -590,7 +598,7 @@ public class TransactionManager { * Return : Commit vote (yes, no, read only) * Purpose : Call prepare for a given regionserver */ - public Integer doPrepareX(final byte[] regionName, final long transactionId, final TransactionRegionLocation location) + public Integer doPrepareX(final byte[] regionName, final long transactionId, final int participantNum, final TransactionRegionLocation location) throws IOException, CommitUnsuccessfulException { if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- ENTRY txid: " + transactionId ); int commitStatus = 0; @@ -613,6 +621,7 @@ public class TransactionManager { org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestRequest.Builder builder = CommitRequestRequest.newBuilder(); builder.setTransactionId(transactionId); builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName))); + builder.setParticipantNum(participantNum); instance.commitRequest(controller, builder.build(), rpcCallback); return rpcCallback.get(); @@ -708,11 +717,13 @@ public class TransactionManager { } } catch(UnknownTransactionException ute) { - LOG.warn("doPrepareX Exception: " + ute); + LOG.warn("doPrepareX participant " + participantNum + " transaction " + + transactionId + " unknown transaction : " + ute); throw new UnknownTransactionException(); } catch(Exception e) { - LOG.error("doPrepareX retrying due to Exception: " + e); + LOG.error("doPrepareX participant " + participantNum + " retrying transaction " + + transactionId + " due to Exception: " + e); refresh = true; retry = true; } @@ -807,11 +818,13 @@ public class TransactionManager { } } catch(UnknownTransactionException ute) { - LOG.warn("doPrepareX Exception: " + ute); + LOG.warn("doPrepareX participant " + participantNum + " transaction " + + transactionId + " unknown transaction: " + ute); throw new UnknownTransactionException(); } catch(Exception e) { - LOG.error("doPrepareX retrying due to Exception: " + e); + LOG.error("doPrepareX participant " + participantNum + " retrying transaction " + + transactionId + " due to Exception: " + e); refresh = true; retry = true; } @@ -907,10 +920,11 @@ public class TransactionManager { * Return : Ignored * Purpose : Call abort for a given regionserver */ - public Integer doAbortX(final byte[] regionName, final long transactionId) throws IOException{ - if(LOG.isTraceEnabled()) LOG.trace("doAbortX -- ENTRY txID: " + transactionId); + public Integer doAbortX(final byte[] regionName, final long transactionId, final int participantNum) throws IOException{ + if(LOG.isDebugEnabled()) LOG.debug("doAbortX -- ENTRY txID: " + transactionId + " participantNum " + + participantNum + " region " + regionName.toString()); boolean retry = false; - boolean refresh = false; + boolean refresh = false; int retryCount = 0; int retrySleep = TM_SLEEP; @@ -928,8 +942,8 @@ public class TransactionManager { public AbortTransactionResponse call(TrxRegionService instance) throws IOException { org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionRequest.Builder builder = AbortTransactionRequest.newBuilder(); builder.setTransactionId(transactionId); + builder.setParticipantNum(participantNum); builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName))); - instance.abortTransaction(controller, builder.build(), rpcCallback); return rpcCallback.get(); } @@ -937,9 +951,10 @@ public class TransactionManager { Map<byte[], AbortTransactionResponse> result = null; try { - if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- before coprocessorService txid: " + transactionId + " table: " + - table.toString() + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8")); - result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable); + if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- before coprocessorService txid: " + + transactionId + " table: " + table.toString() + " startKey: " + + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8")); + result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable); } catch (Throwable e) { String msg = "ERROR occurred while calling doAbortX coprocessor service"; LOG.error(msg + ":" + e); @@ -947,15 +962,18 @@ public class TransactionManager { } if(result.size() == 0) { - LOG.error("doAbortX, received 0 region results."); - refresh = true; - retry = true; + LOG.error("doAbortX, received 0 region results for transaction: " + transactionId + + " participantNum: " + participantNum + " region: " + Bytes.toString(regionName)); + refresh = true; + retry = true; } else { for (AbortTransactionResponse cresponse : result.values()) { if(cresponse.getHasException()) { String exceptionString = cresponse.getException().toString(); - LOG.error("Abort HasException true: " + exceptionString); + LOG.error("Abort of transaction: " + transactionId + + " participantNum: " + participantNum + " region: " + Bytes.toString(regionName) + + " threw Exception: " + exceptionString); if(exceptionString.contains("UnknownTransactionException")) { throw new UnknownTransactionException(); } @@ -966,16 +984,18 @@ public class TransactionManager { } } catch (UnknownTransactionException ute) { - LOG.debug("UnknownTransactionException in doAbortX for transaction: " + transactionId + "(ignoring): " + ute); - } + LOG.error("UnknownTransactionException in doAbortX for transaction: " + transactionId + + " participantNum: " + participantNum + " region: " + + Bytes.toString(regionName) + "(ignoring): " + ute ); } catch (Exception e) { - if(e.toString().contains("Asked to commit a non-pending transaction")) { - LOG.error("doCommitX will not retry: " + e); + if(e.toString().contains("Asked to commit a non-pending transaction ")) { + LOG.error(" doCommitX will not retry transaction: " + transactionId + " : " + e); refresh = false; retry = false; } else { - LOG.error("doAbortX retrying due to Exception: " + e ); + LOG.error("doAbortX retrying transaction: " + transactionId + " participantNum: " + + participantNum + " region: " + Bytes.toString(regionName) + " due to Exception: " + e ); refresh = true; retry = true; } @@ -1058,7 +1078,7 @@ public class TransactionManager { for (SsccAbortTransactionResponse cresponse : result.values()) { if(cresponse.getHasException()) { String exceptionString = cresponse.getException().toString(); - LOG.error("Abort HasException true: " + exceptionString); + LOG.error("Abort of transaction: " + transactionId + " threw Exception: " + exceptionString); if(exceptionString.contains("UnknownTransactionException")) { throw new UnknownTransactionException(); } @@ -1069,10 +1089,12 @@ public class TransactionManager { } } catch (UnknownTransactionException ute) { - LOG.debug("UnknownTransactionException in doAbortX for transaction: " + transactionId + "(ignoring): " + ute); + LOG.debug("UnknownTransactionException in doAbortX by participant " + participantNum + + " for transaction: " + transactionId + "(ignoring): " + ute); } catch (Exception e) { - LOG.error("doAbortX retrying due to Exception: " + e ); + LOG.error("doAbortX participant " + participantNum + " retrying transaction " + + transactionId + " due to Exception: " + e); refresh = true; retry = true; } @@ -1124,7 +1146,8 @@ public class TransactionManager { return 0; } - public Integer doCommitX(final List<TransactionRegionLocation> locations, final long transactionId, final long commitId, final boolean ignoreUnknownTransactionException) throws CommitUnsuccessfulException, IOException { + public Integer doCommitX(final List<TransactionRegionLocation> locations, final long transactionId, + final long commitId, final int participantNum, final boolean ignoreUnknownTransaction) throws CommitUnsuccessfulException, IOException { boolean retry = false; boolean refresh = false; @@ -1132,15 +1155,16 @@ public class TransactionManager { do { try { - if (LOG.isTraceEnabled()) LOG.trace("doCommitX - Batch -- ENTRY txid: " + transactionId + - " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException); + if (LOG.isTraceEnabled()) LOG.trace("doCommitX - Batch -- ENTRY txid: " + transactionId + + " participant " + participantNum + " ignoreUnknownTransaction: " + ignoreUnknownTransaction); TrxRegionProtos.CommitMultipleRequest.Builder builder = CommitMultipleRequest.newBuilder(); builder.setTransactionId(transactionId); + builder.setParticipantNum(participantNum); for(TransactionRegionLocation location : locations) { builder.addRegionName(ByteString.copyFrom(location.getRegionInfo().getRegionName())); } - builder.setIgnoreUnknownTransactionException(ignoreUnknownTransactionException); + builder.setIgnoreUnknownTransactionException(ignoreUnknownTransaction); CommitMultipleRequest commitMultipleRequest = builder.build(); CommitMultipleResponse commitMultipleResponse = null; @@ -1170,7 +1194,7 @@ public class TransactionManager { throw new UnknownTransactionException(errMsg); } catch (Exception e) { - LOG.error("doCommitX retrying due to Exception: " + e); + LOG.error("doCommitX retrying transaction " + transactionId + " due to Exception: " + e); refresh = true; retry = true; } @@ -1206,9 +1230,10 @@ public class TransactionManager { return 0; } - public Integer doPrepareX(final List<TransactionRegionLocation> locations, final long transactionId) + public Integer doPrepareX(final List<TransactionRegionLocation> locations, final long transactionId, final int participantNum) throws IOException, CommitUnsuccessfulException { - if (LOG.isTraceEnabled()) LOG.trace("doPrepareX - Batch -- ENTRY txid: " + transactionId ); + if (LOG.isTraceEnabled()) LOG.trace("doPrepareX - Batch -- ENTRY txid: " + transactionId + + " participant " + participantNum ); boolean refresh = false; boolean retry = false; @@ -1219,6 +1244,7 @@ public class TransactionManager { TrxRegionProtos.CommitRequestMultipleRequest.Builder builder = CommitRequestMultipleRequest.newBuilder(); builder.setTransactionId(transactionId); + builder.setParticipantNum(participantNum); for(TransactionRegionLocation location : locations) { builder.addRegionName(ByteString.copyFrom(location.getRegionInfo().getRegionName())); } @@ -1245,12 +1271,14 @@ public class TransactionManager { } } catch(UnknownTransactionException ute) { - String warnMsg = "doPrepareX Exception: " + ute; + String warnMsg = new String("UnknownTransaction in doPrepareX - Batch - by participant " + + participantNum + " for transaction " + transactionId + " " + ute); LOG.warn(warnMsg); throw new UnknownTransactionException(warnMsg); } catch(Exception e) { - LOG.error("doPrepareX retrying due to Exception: " + e); + LOG.error("doPrepareX - Batch - retrying for participant " + + participantNum + " transaction " + transactionId + " due to Exception: " + e); refresh = true; retry = true; } @@ -1258,7 +1286,7 @@ public class TransactionManager { HRegionLocation lv_hrl = table.getRegionLocation(startKey); HRegionInfo lv_hri = lv_hrl.getRegionInfo(); - if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: " + if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -Batch- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: " + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId); if(retryCount == RETRY_ATTEMPTS){ LOG.error("Exceeded retry attempts in doPrepareX: " + retryCount); @@ -1269,14 +1297,14 @@ public class TransactionManager { throw new CommitUnsuccessfulException("Exceeded retry attempts in doPrepareX: " + retryCount); } if (LOG.isWarnEnabled()) { - LOG.warn("doPrepareX -- " + table.toString() + " location being refreshed"); - LOG.warn("doPrepareX -- lv_hri: " + lv_hri); - LOG.warn("doPrepareX -- location.getRegionInfo(): " + location.getRegionInfo()); + LOG.warn("doPrepareX -Batch- " + table.toString() + " location being refreshed"); + LOG.warn("doPrepareX -Batch- lv_hri: " + lv_hri); + LOG.warn("doPrepareX -Batch- location.getRegionInfo(): " + location.getRegionInfo()); } table.getRegionLocation(startKey, true); - if (LOG.isDebugEnabled()) LOG.debug("doPrepareX retry count: " + retryCount); - if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- setting retry, count: " + retryCount); + if (LOG.isDebugEnabled()) LOG.debug("doPrepareX -Batch- retry count: " + retryCount); + if (LOG.isTraceEnabled()) LOG.trace("doPrepareX --Batch-- setting retry, count: " + retryCount); refresh = false; retryCount++; } @@ -1342,7 +1370,7 @@ public class TransactionManager { return TM_COMMIT_TRUE; } - public Integer doAbortX(final List<TransactionRegionLocation> locations, final long transactionId) throws IOException{ + public Integer doAbortX(final List<TransactionRegionLocation> locations, final long transactionId, final int participantNum) throws IOException{ if(LOG.isTraceEnabled()) LOG.trace("doAbortX - Batch -- ENTRY txID: " + transactionId); boolean retry = false; boolean refresh = false; @@ -1351,6 +1379,7 @@ public class TransactionManager { try { TrxRegionProtos.AbortTransactionMultipleRequest.Builder builder = AbortTransactionMultipleRequest.newBuilder(); builder.setTransactionId(transactionId); + builder.setParticipantNum(participantNum); for(TransactionRegionLocation location : locations) { builder.addRegionName(ByteString.copyFrom(location.getRegionInfo().getRegionName())); } @@ -1362,7 +1391,7 @@ public class TransactionManager { abortTransactionMultipleResponse = trxService.abortTransactionMultiple(null, abortTransactionMultipleRequest); retry = false; } catch (Throwable e) { - LOG.error("doAbortX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId + ":" + e); + LOG.error("doAbortX - Batch - coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId + ":" + e); refresh = true; retry = true; } @@ -1375,10 +1404,12 @@ public class TransactionManager { } } catch (UnknownTransactionException ute) { - LOG.debug("UnknownTransactionException in doAbortX for transaction: " + transactionId + "(ignoring): " + ute); + LOG.debug("UnknownTransactionException in doAbortX - Batch - by participant " + participantNum + + " for transaction: " + transactionId + "(ignoring): " + ute); } catch (Exception e) { - LOG.error("doAbortX retrying due to Exception: " + e ); + LOG.error("doAbortX - Batch - participant " + participantNum + " retrying transaction " + + transactionId + " due to Exception: " + e); refresh = true; retry = true; } @@ -1386,20 +1417,22 @@ public class TransactionManager { HRegionLocation lv_hrl = table.getRegionLocation(startKey); HRegionInfo lv_hri = lv_hrl.getRegionInfo(); - if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: " - + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId); + if (LOG.isTraceEnabled()) LOG.trace("doAbortX - Batch - participant " + participantNum + + "-- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + + " endKey: " + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + + " for transaction: " + transactionId); if(retryCount == RETRY_ATTEMPTS){ LOG.error("Exceeded retry attempts in doAbortX: " + retryCount + " (ingoring)"); } if (LOG.isWarnEnabled()) { - LOG.warn("doAbortX -- " + table.toString() + " location being refreshed"); - LOG.warn("doAbortX -- lv_hri: " + lv_hri); - LOG.warn("doAbortX -- location.getRegionInfo(): " + location.getRegionInfo()); + LOG.warn("doAbortX - Batch - -- " + table.toString() + " location being refreshed"); + LOG.warn("doAbortX - Batch - -- lv_hri: " + lv_hri); + LOG.warn("doAbortX - Batch - -- location.getRegionInfo(): " + location.getRegionInfo()); } table.getRegionLocation(startKey, true); - if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- setting retry, count: " + retryCount); + if (LOG.isTraceEnabled()) LOG.trace("doAbortX - Batch - -- setting retry, count: " + retryCount); refresh = false; retryCount++; } @@ -1601,9 +1634,10 @@ public class TransactionManager { for(final Map.Entry<ServerName, List<TransactionRegionLocation>> entry : locations.entrySet()) { loopCount++; + final int lv_participant = loopCount; compPool.submit(new TransactionManagerCallable(transactionState, entry.getValue().iterator().next(), connection) { public Integer call() throws CommitUnsuccessfulException, IOException { - return doPrepareX(entry.getValue(), transactionState.getTransactionId()); + return doPrepareX(entry.getValue(), transactionState.getTransactionId(), lv_participant); } }); } @@ -1639,11 +1673,12 @@ public class TransactionManager { if(transactionState.getRegionsRetryCount() > 0) { for (TransactionRegionLocation location : transactionState.getRetryRegions()) { loopCount++; + final int lvParticipantNum = loopCount; compPool.submit(new TransactionManagerCallable(transactionState, location, connection) { public Integer call() throws CommitUnsuccessfulException, IOException { return doPrepareX(location.getRegionInfo().getRegionName(), - transactionState.getTransactionId(), + transactionState.getTransactionId(), lvParticipantNum, location); } }); @@ -1716,10 +1751,11 @@ public class TransactionManager { loopCount++; final TransactionRegionLocation myLocation = location; final byte[] regionName = location.getRegionInfo().getRegionName(); + final int lvParticipantNum = loopCount; compPool.submit(new TransactionManagerCallable(transactionState, location, connection) { public Integer call() throws IOException, CommitUnsuccessfulException { - return doPrepareX(regionName, transactionState.getTransactionId(), myLocation); + return doPrepareX(regionName, transactionState.getTransactionId(), lvParticipantNum, myLocation); } }); } @@ -1749,7 +1785,8 @@ public class TransactionManager { } } catch (Exception e) { - LOG.error("exception in prepareCommit (during submit to pool): " + e); + LOG.error("exception in prepareCommit for transaction: " + + transactionState.getTransactionId() + " (during submit to pool): " + e); throw new CommitUnsuccessfulException(e); } @@ -1778,8 +1815,9 @@ public class TransactionManager { } } }catch (Exception e) { - LOG.error("exception in prepareCommit (during completion processing): " + e); - throw new CommitUnsuccessfulException(e); + LOG.error("exception in prepareCommit for transaction: " + + transactionState.getTransactionId() + " (during completion processing): " + e); + throw new CommitUnsuccessfulException(e); } if(commitError != 0) return commitError; @@ -1871,19 +1909,26 @@ public class TransactionManager { + ((EnvironmentEdgeManager.currentTimeMillis() - startTime)) + "]ms"); } - public void retryCommit(final TransactionState transactionState, final boolean ignoreUnknownTransactionException) { + public void retryCommit(final TransactionState transactionState, final boolean ignoreUnknownTransaction) { if(LOG.isTraceEnabled()) LOG.trace("retryCommit -- ENTRY -- txid: " + transactionState.getTransactionId()); synchronized(transactionState.getRetryRegions()) { List<TransactionRegionLocation> completedList = new ArrayList<TransactionRegionLocation>(); final long commitIdVal = (TRANSACTION_ALGORITHM == AlgorithmType.SSCC) ? transactionState.getCommitId() : -1; + int loopCount = 0; for (TransactionRegionLocation location : transactionState.getRetryRegions()) { - if(LOG.isTraceEnabled()) LOG.trace("retryAbort retrying abort for: " + location.getRegionInfo().getRegionNameAsString()); + loopCount++; + final int participantNum = loopCount; + if(LOG.isTraceEnabled()) LOG.trace("retryCommit retrying commit for transaction: " + + transactionState.getTransactionId() + ", participant: " + participantNum + ", region " + + location.getRegionInfo().getRegionNameAsString()); threadPool.submit(new TransactionManagerCallable(transactionState, location, connection) { public Integer call() throws CommitUnsuccessfulException, IOException { return doCommitX(location.getRegionInfo().getRegionName(), - transactionState.getTransactionId(), commitIdVal, - ignoreUnknownTransactionException); + transactionState.getTransactionId(), + commitIdVal, + participantNum, + ignoreUnknownTransaction); } }); completedList.add(location); @@ -1897,13 +1942,18 @@ public class TransactionManager { if(LOG.isTraceEnabled()) LOG.trace("retryAbort -- ENTRY -- txid: " + transactionState.getTransactionId()); synchronized(transactionState.getRetryRegions()) { List<TransactionRegionLocation> completedList = new ArrayList<TransactionRegionLocation>(); + int loopCount = 0; for (TransactionRegionLocation location : transactionState.getRetryRegions()) { - if(LOG.isTraceEnabled()) LOG.trace("retryAbort retrying abort for: " + location.getRegionInfo().getRegionNameAsString()); + loopCount++; + final int participantNum = loopCount; + if(LOG.isTraceEnabled()) LOG.trace("retryAbort retrying abort for transaction: " + + transactionState.getTransactionId() + ", participant: " + + participantNum + ", region: " + location.getRegionInfo().getRegionNameAsString()); threadPool.submit(new TransactionManagerCallable(transactionState, location, connection) { public Integer call() throws CommitUnsuccessfulException, IOException { return doAbortX(location.getRegionInfo().getRegionName(), - transactionState.getTransactionId()); + transactionState.getTransactionId(), participantNum); } }); completedList.add(location); @@ -1921,7 +1971,7 @@ public class TransactionManager { public void doCommit(final TransactionState transactionState) throws CommitUnsuccessfulException, UnsuccessfulDDLException { if (LOG.isTraceEnabled()) LOG.trace("doCommit [" + transactionState.getTransactionId() + - "] ignoreUnknownTransactionException not supplied"); + "] ignoreUnknownTransaction not supplied"); doCommit(transactionState, false); } @@ -1929,16 +1979,16 @@ public class TransactionManager { * Do the commit. This is the 2nd phase of the 2-phase protocol. * * @param transactionState - * @param ignoreUnknownTransactionException + * @param ignoreUnknownTransaction * @throws CommitUnsuccessfulException */ - public void doCommit(final TransactionState transactionState, final boolean ignoreUnknownTransactionException) + public void doCommit(final TransactionState transactionState, final boolean ignoreUnknownTransaction) throws CommitUnsuccessfulException, UnsuccessfulDDLException { int loopCount = 0; if (batchRegionServer && (TRANSACTION_ALGORITHM == AlgorithmType.MVCC)) { try { - if (LOG.isTraceEnabled()) LOG.trace("Committing [" + transactionState.getTransactionId() + - "] ignoreUnknownTransactionException: " + ignoreUnknownTransactionException); + if (LOG.isTraceEnabled()) LOG.trace("Committing [" + transactionState.getTransactionId() + + "] ignoreUnknownTransaction: " + ignoreUnknownTransaction); // Set the commitId transactionState.setCommitId(-1); // Dummy for MVCC @@ -1957,24 +2007,24 @@ public class TransactionManager { } else { regionList = locations.get(servername); - } + } regionList.add(location); - } + } for(final Map.Entry<ServerName, List<TransactionRegionLocation>> entry : locations.entrySet()) { if (LOG.isTraceEnabled()) LOG.trace("sending commits ... [" + transactionState.getTransactionId() + "]"); loopCount++; + final int lv_participant = loopCount; threadPool.submit(new TransactionManagerCallable(transactionState, entry.getValue().iterator().next(), connection) { public Integer call() throws CommitUnsuccessfulException, IOException { if (LOG.isTraceEnabled()) LOG.trace("before doCommit() [" + transactionState.getTransactionId() + "]" + - " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException); + " ignoreUnknownTransaction: " + ignoreUnknownTransaction); return doCommitX(entry.getValue(), transactionState.getTransactionId(), - transactionState.getCommitId(), ignoreUnknownTransactionException); + transactionState.getCommitId(), lv_participant, ignoreUnknownTransaction); } }); } - } catch (Exception e) { LOG.error("exception in doCommit for transaction: " + transactionState.getTransactionId() + " " + e); // This happens on a NSRE that is triggered by a split @@ -2000,7 +2050,7 @@ public class TransactionManager { // non batch-rs if (LOG.isTraceEnabled()) LOG.trace("Committing [" + transactionState.getTransactionId() + - "] ignoreUnknownTransactionException: " + ignoreUnknownTransactionException); + "] ignoreUnknownTransactionn: " + ignoreUnknownTransaction); if (LOG.isTraceEnabled()) LOG.trace("sending commits for ts: " + transactionState); try { @@ -2015,28 +2065,26 @@ public class TransactionManager { loopCount++; final byte[] regionName = location.getRegionInfo().getRegionName(); - + final int participantNum = loopCount; //TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection // .getHRegionConnection(location.getServerName()); threadPool.submit(new TransactionManagerCallable(transactionState, location, connection) { public Integer call() throws CommitUnsuccessfulException, IOException { - if (LOG.isTraceEnabled()) LOG.trace("before doCommit() [" + transactionState.getTransactionId() + "]" + - " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException); - return doCommitX(regionName, transactionState.getTransactionId(), transactionState.getCommitId(), ignoreUnknownTransactionException); + if (LOG.isDebugEnabled()) LOG.debug("before doCommit() [" + transactionState.getTransactionId() + + "] participantNum " + participantNum + " ignoreUnknownTransaction: " + ignoreUnknownTransaction); + return doCommitX(regionName, + transactionState.getTransactionId(), + transactionState.getCommitId(), + participantNum, + ignoreUnknownTransaction); } }); } } catch (Exception e) { - LOG.error("exception in doCommit for transaction: " + transactionState.getTransactionId() + " " + e); + LOG.error("exception in doCommit for transaction: " + + transactionState.getTransactionId() + " " + e); // This happens on a NSRE that is triggered by a split -/* try { - abort(transactionState); - } catch (Exception abortException) { - - LOG.warn("Exeption during abort", abortException); - } -*/ throw new CommitUnsuccessfulException(e); } @@ -2212,12 +2260,6 @@ public class TransactionManager { public void abort(final TransactionState transactionState) throws IOException, UnsuccessfulDDLException { if(LOG.isTraceEnabled()) LOG.trace("Abort -- ENTRY txID: " + transactionState.getTransactionId()); int loopCount = 0; - /* - if(transactionState.getStatus().equals("ABORTED")) { - if(LOG.isTraceEnabled()) LOG.trace("Abort --EXIT already called, ignoring"); - return; - } - */ transactionState.setStatus(TransState.STATE_ABORTED); // (Asynchronously send aborts @@ -2243,44 +2285,35 @@ public class TransactionManager { } for(final Map.Entry<ServerName, List<TransactionRegionLocation>> entry : locations.entrySet()) { loopCount++; + final int participantNum = loopCount; + threadPool.submit(new TransactionManagerCallable(transactionState, entry.getValue().iterator().next(), connection) { public Integer call() throws IOException { if (LOG.isTraceEnabled()) LOG.trace("before abort() [" + transactionState.getTransactionId() + "]"); - return doAbortX(entry.getValue(), transactionState.getTransactionId()); + return doAbortX(entry.getValue(), transactionState.getTransactionId(), participantNum); } }); } transactionState.completeSendInvoke(loopCount); - /* - if(transactionState.getRegionsRetryCount() > 0) { - for (TransactionRegionLocation location : transactionState.getRetryRegions()) { - loopCount++; - threadPool.submit(new TransactionManagerCallable(transactionState, location, connection) { - public Integer call() throws CommitUnsuccessfulException, IOException { - - return doAbortX(location.getRegionInfo().getRegionName(), - transactionState.getTransactionId()); - } - }); - } - } - transactionState.clearRetryRegions(); - */ } else { - + loopCount = 0; for (TransactionRegionLocation location : transactionState.getParticipatingRegions()) { if (transactionState.getRegionsToIgnore().contains(location)) { continue; } try { loopCount++; + final int participantNum = loopCount; final byte[] regionName = location.getRegionInfo().getRegionName(); + if(LOG.isTraceEnabled()) LOG.trace("Submitting abort for transaction: " + + transactionState.getTransactionId() + ", participant: " + + participantNum + ", region: " + location.getRegionInfo().getRegionNameAsString()); threadPool.submit(new TransactionManagerCallable(transactionState, location, connection) { public Integer call() throws IOException { - return doAbortX(regionName, transactionState.getTransactionId()); + return doAbortX(regionName, transactionState.getTransactionId(), participantNum); } }); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/84af9603/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java index 16153c3..9dd3b5a 100755 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java @@ -451,8 +451,12 @@ CoprocessorService, Coprocessor { try { abortTransaction(transactionId); } catch (UnknownTransactionException u) { - if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + ", Caught UnknownTransactionException after internal abortTransaction call - " + u.getMessage() + " " + stackTraceToString(u)); - ute = u; + if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor:abort - txId " + + transactionId + + ", Caught UnknownTransactionException after internal abortTransaction call - " + + u.getMessage() + " " + + stackTraceToString(u)); + ute = u; } catch (IOException e) { if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + ", Caught IOException after internal abortTransaction call - " + e.getMessage() + " " + stackTraceToString(e)); ioe = e; @@ -657,8 +661,10 @@ CoprocessorService, Coprocessor { Throwable t = null; WrongRegionException wre = null; long transactionId = request.getTransactionId(); + final int participantNum = request.getParticipantNum(); - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commit - txId " + transactionId + ", regionName " + regionInfo.getRegionNameAsString()); + if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commit - txId " + + transactionId + ", participantNum " + participantNum + ", regionName " + regionInfo.getRegionNameAsString()); /* commenting out for the time being java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8(); @@ -676,7 +682,7 @@ CoprocessorService, Coprocessor { { // Process local memory try { - commit(transactionId, request.getIgnoreUnknownTransactionException()); + commit(transactionId, participantNum, request.getIgnoreUnknownTransactionException()); } catch (Throwable e) { LOG.error("TrxRegionEndpoint coprocessor: commit - txId " + transactionId + ", Caught exception after internal commit call " + e.getMessage() + " " + stackTraceToString(e)); @@ -737,7 +743,7 @@ CoprocessorService, Coprocessor { commitMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_REGIONNOTFOUND_ERR.toString()); } else { - regionEPCP.commit(transactionId, request.getIgnoreUnknownTransactionException()); + regionEPCP.commit(transactionId, i, request.getIgnoreUnknownTransactionException()); } if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitMultiple ends"); //commit(transactionId, request.getIgnoreUnknownTransactionException()); @@ -776,6 +782,7 @@ CoprocessorService, Coprocessor { boolean reply = false; long transactionId = request.getTransactionId(); + final int participantNum = request.getParticipantNum(); Throwable t = null; WrongRegionException wre = null; @@ -795,8 +802,8 @@ CoprocessorService, Coprocessor { { // Process local memory try { - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible - txId " + transactionId + ", regionName, " + regionInfo.getRegionNameAsString() + "calling internal commitIfPossible"); - reply = commitIfPossible(transactionId); + if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitIfPossible - txId " + transactionId + ", regionName, " + regionInfo.getRegionNameAsString() + "calling internal commitIfPossible"); + reply = commitIfPossible(transactionId, participantNum); } catch (Throwable e) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible - txId " + transactionId + ", Caught exception after internal commitIfPossible call " + e.getMessage() + " " + stackTraceToString(e)); @@ -837,8 +844,10 @@ CoprocessorService, Coprocessor { Throwable t = null; WrongRegionException wre = null; long transactionId = request.getTransactionId(); + int participantNum = request.getParticipantNum(); - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", regionName " + regionInfo.getRegionNameAsString()); + if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + + transactionId + ", participantNum " + participantNum + ", regionName " + regionInfo.getRegionNameAsString()); /* commenting out for the time being java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8(); @@ -856,7 +865,7 @@ CoprocessorService, Coprocessor { { // Process local memory try { - status = commitRequest(transactionId); + status = commitRequest(transactionId, participantNum); } catch (UnknownTransactionException u) { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught UnknownTransactionException after internal commitRequest call - " + u.toString()); ute = u; @@ -957,8 +966,8 @@ CoprocessorService, Coprocessor { commitRequestMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_REGIONNOTFOUND_ERR.toString()); } else { - if (i == (numOfRegion - 1)) {status = regionEPCP.commitRequest(transactionId, true);} // only the last region flush - else {status = regionEPCP.commitRequest(transactionId, false);} + if (i == (numOfRegion - 1)) {status = regionEPCP.commitRequest(transactionId, i, true);} // only the last region flush + else {status = regionEPCP.commitRequest(transactionId, i, false);} } if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitRequestMultiple ends"); //status = commitRequest(transactionId); @@ -4225,8 +4234,8 @@ CoprocessorService, Coprocessor { * @param long TransactionId * @throws IOException */ - public void commit(final long transactionId) throws IOException { - commit(transactionId, false /* IgnoreUnknownTransactionException */); + public void commit(final long transactionId, final int participantNum) throws IOException { + commit(transactionId, participantNum, false /* IgnoreUnknownTransactionException */); } /** @@ -4235,9 +4244,9 @@ CoprocessorService, Coprocessor { * @param boolean ignoreUnknownTransactionException * @throws IOException */ - public void commit(final long transactionId, final boolean ignoreUnknownTransactionException) throws IOException { + public void commit(final long transactionId, final int participantNum, final boolean ignoreUnknownTransactionException) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commit(txId) -- ENTRY txId: " + transactionId + - " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException); + " ignoreUnknownTransaction: " + ignoreUnknownTransactionException); CommitProgress commitStatus = CommitProgress.NONE; TrxTransactionState state; try { @@ -4249,10 +4258,11 @@ CoprocessorService, Coprocessor { + m_Region.getRegionInfo().getRegionNameAsString()); return; } - LOG.fatal("TrxRegionEndpoint coprocessor: Asked to commit unknown transaction: " + transactionId - + " in region " - + m_Region.getRegionInfo().getRegionNameAsString()); - throw new IOException("UnknownTransactionException, transId: " + transactionId); + LOG.fatal("TrxRegionEndpoint coprocessor: Participant " + participantNum + + " Asked to commit unknown transaction: " + transactionId + + " in region " + m_Region.getRegionInfo().getRegionNameAsString()); + throw new IOException("UnknownTransactionException, Participant " + + participantNum + " transId: " + transactionId); } if (!state.getStatus().equals(Status.COMMIT_PENDING)) { @@ -4296,13 +4306,15 @@ CoprocessorService, Coprocessor { * @return TransactionRegionInterface commit code * @throws IOException */ - public int commitRequest(final long transactionId) throws IOException { - return commitRequest(transactionId, true); + public int commitRequest(final long transactionId, final int participantNum) throws IOException { + return commitRequest(transactionId, participantNum, true); } - public int commitRequest(final long transactionId, boolean flushHLOG) throws IOException { + public int commitRequest(final long transactionId, final int participantNum, boolean flushHLOG) throws IOException, + UnknownTransactionException { long txid = 0; - if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest -- ENTRY txId: " + transactionId); + if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest -- ENTRY txId: " + + transactionId + " participantNum " + participantNum); TrxTransactionState state; int lv_totalCommits = 0; @@ -4326,10 +4338,10 @@ CoprocessorService, Coprocessor { try { state = getTransactionState(transactionId); } catch (UnknownTransactionException e) { - if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest Unknown transaction [" + transactionId - + "] in region [" + if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest Unknown transaction [" + + transactionId + "] in region [" + m_Region.getRegionInfo().getRegionNameAsString() - + "], ignoring"); + + "], participantNum " + participantNum + " ignoring"); state = null; } // may change to indicate a NOTFOUND case then depends on the TM ts state, if reinstated tx, ignore the exception @@ -4693,18 +4705,18 @@ CoprocessorService, Coprocessor { * @return boolean * @throws IOException */ - public boolean commitIfPossible(final long transactionId) + public boolean commitIfPossible(final long transactionId, final int participantNum) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: " + transactionId); - int status = commitRequest(transactionId); + int status = commitRequest(transactionId, participantNum); if (status == COMMIT_OK) { // Process local memory try { - commit(transactionId); + commit(transactionId, participantNum); if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: " + transactionId + " COMMIT_OK"); return true; } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/84af9603/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java index 8b40e88..71a8a49 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java @@ -1516,15 +1516,15 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasTransactionId()) { - + return false; } if (!hasStartId()) { - + return false; } if (!hasRegionName()) { - + return false; } return true; @@ -2701,15 +2701,15 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasRegionName()) { - + return false; } if (!hasTransactionId()) { - + return false; } if (!hasCommitId()) { - + return false; } return true; @@ -6428,23 +6428,23 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasTransactionId()) { - + return false; } if (!hasStartId()) { - + return false; } if (!hasRegionName()) { - + return false; } if (!hasRow()) { - + return false; } if (!hasFamily()) { - + return false; } if (!hasQualifier()) { @@ -8230,39 +8230,39 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasTransactionId()) { - + return false; } if (!hasStartId()) { - + return false; } if (!hasRegionName()) { - + return false; } if (!hasRow()) { - + return false; } if (!hasFamily()) { - - return false; + + return false; } if (!hasQualifier()) { - + return false; } if (!hasValue()) { - + return false; } if (!hasPut()) { - + return false; } if (!getPut().isInitialized()) { - + return false; } return true; @@ -11006,20 +11006,20 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasTransactionId()) { - + return false; } if (!hasStartId()) { - + return false; } if (!hasRegionName()) { - + return false; } for (int i = 0; i < getDeleteCount(); i++) { if (!getDelete(i).isInitialized()) { - + return false; } } @@ -12745,22 +12745,23 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasTransactionId()) { - + return false; } if (!hasStartId()) { - + return false; } if (!hasRegionName()) { - + return false; } if (!hasDelete()) { - + return false; } if (!getDelete().isInitialized()) { + return false; } return true; @@ -14362,23 +14363,23 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasTransactionId()) { - + return false; } if (!hasStartId()) { - + return false; } if (!hasRegionName()) { - + return false; } if (!hasGet()) { - + return false; } if (!getGet().isInitialized()) { - + return false; } return true; @@ -15899,23 +15900,23 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasTransactionId()) { - + return false; } if (!hasStartId()) { - + return false; } if (!hasRegionName()) { - + return false; } if (!hasScan()) { - + return false; } if (!getScan().isInitialized()) { - + return false; } return true; @@ -17458,31 +17459,31 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasTransactionId()) { - + return false; } if (!hasStartId()) { - + return false; } if (!hasRegionName()) { - + return false; } if (!hasScannerId()) { - + return false; } if (!hasNumberOfRows()) { - + return false; } if (!hasCloseScanner()) { - + return false; } if (!hasNextCallSeq()) { - + return false; } return true; @@ -19503,23 +19504,23 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasTransactionId()) { - + return false; } if (!hasStartId()) { - + return false; } if (!hasRegionName()) { - + return false; } if (!hasPut()) { - + return false; } if (!getPut().isInitialized()) { - + return false; } return true; @@ -21247,20 +21248,20 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasTransactionId()) { - + return false; } if (!hasStartId()) { - + return false; } if (!hasRegionName()) { - + return false; } for (int i = 0; i < getPutCount(); i++) { if (!getPut(i).isInitialized()) { - + return false; } } @@ -22988,19 +22989,19 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasTransactionId()) { - + return false; } if (!hasStartId()) { - + return false; } if (!hasRegionName()) { - + return false; } if (!hasTmId()) { - + return false; } return true; @@ -24568,27 +24569,27 @@ public final class SsccRegionProtos { public final boolean isInitialized() { if (!hasRegionName()) { - + return false; } if (!hasTransactionId()) { - + return false; } if (!hasStartId()) { - + return false; } if (!hasInterpreterClassName()) { - + return false; } if (!hasScan()) { - + return false; } if (!getScan().isInitialized()) { - + return false; } return true;
