Repository: incubator-trafodion Updated Branches: refs/heads/master 1cb7a1a6e -> 5c29c0d31
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7d668e44/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java index 75d4eb5..31b35d4 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java @@ -81,10 +81,19 @@ public class SsccTransactionState extends TransactionState{ private long commitSequenceId; private long startId_; - public SsccTransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId, final HRegionInfo regionInfo, - HTableDescriptor htd, WAL hLog, boolean logging, long SsccSequenceId) { + public SsccTransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId, + final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, boolean logging, long SsccSequenceId) { - super(transactionId,rLogStartSequenceId,hlogSeqId,regionInfo,htd,hLog,logging); + super(transactionId,rLogStartSequenceId,hlogSeqId,regionInfo,htd,hLog,logging, /* region TX */ false); + setStartId(SsccSequenceId); + if(LOG.isTraceEnabled()) LOG.trace("SsccTransactionState : new state object for transid: " + transactionId + " with sequence: " + SsccSequenceId + " complete"); + } + + public SsccTransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId, + final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, boolean logging, long SsccSequenceId, + boolean regionTx) { + + super(transactionId,rLogStartSequenceId,hlogSeqId,regionInfo,htd,hLog,logging, regionTx); setStartId(SsccSequenceId); if(LOG.isTraceEnabled()) LOG.trace("SsccTransactionState : new state object for transid: " + transactionId + " with sequence: " + SsccSequenceId + " complete"); } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7d668e44/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java index 7531a7b..dc076a1 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java @@ -110,6 +110,7 @@ public class TransactionState { protected boolean splitRetry = false; protected boolean earlyLogging = false; protected boolean commit_TS_CC = false; + protected boolean isRegionTx = false; protected WAL tHLog = null; protected Object xaOperation = new Object();; protected CommitProgress commitProgress = CommitProgress.NONE; // 0 is no commit yet, 1 is a commit is under way, 2 is committed @@ -120,13 +121,14 @@ public class TransactionState { public static byte TS_TRAFODION_TXN_TAG_TYPE = 41; public TransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId, final HRegionInfo regionInfo, - HTableDescriptor htd, WAL hLog, boolean logging) { + HTableDescriptor htd, WAL hLog, boolean logging, boolean isRegionTx) { Tag transactionalTag = null; if (LOG.isTraceEnabled()) LOG.trace("Create TS object for " + transactionId + " early logging " + logging); this.transactionId = transactionId; this.hLogStartSequenceId = rLogStartSequenceId; this.logSeqId = hlogSeqId; this.regionInfo = regionInfo; + this.isRegionTx = isRegionTx; this.status = Status.PENDING; this.tabledescriptor = htd; this.earlyLogging = logging; @@ -189,7 +191,17 @@ public class TransactionState { } } } - /** + /** + * Returns a boolean indicating whether or not this is a region transaction. + * + * @return Return the isRegionTx boolean. + */ + public boolean getIsRegionTx() { + + return isRegionTx; + } + + /** * Get the originating node of the transaction. * * @return Return the nodeId. http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7d668e44/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl index 769aa38..22af807 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl @@ -155,7 +155,21 @@ public class TrxTransactionState extends TransactionState { public TrxTransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId, final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, boolean logging) { - super(transactionId, rLogStartSequenceId, hlogSeqId, regionInfo, htd, hLog, logging); + super(transactionId, + rLogStartSequenceId, + hlogSeqId, + regionInfo, + htd, + hLog, + logging, + false); // not a region transaction + this.e = new WALEdit(); + dropTableRecorded = false; + } + + public TrxTransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId, + final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, boolean logging, boolean regionTx) { + super(transactionId, rLogStartSequenceId, hlogSeqId, regionInfo, htd, hLog, logging, regionTx); this.e = new WALEdit(); dropTableRecorded = false; } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7d668e44/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/SsccRegion.proto ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/SsccRegion.proto b/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/SsccRegion.proto index 3e2dfd7..14eb31c 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/SsccRegion.proto +++ b/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/SsccRegion.proto @@ -105,6 +105,24 @@ message SsccCheckAndDeleteResponse { optional bool hasException = 3; } +message SsccCheckAndDeleteRegionTxRequest { + required int64 transactionId = 1; + required int64 startId = 2; + required bytes regionName = 3; + required bytes row = 4; + required bytes family = 5; + required bytes qualifier = 6; + required bytes value = 7; + required MutationProto delete = 8; + required bool autoCommit = 9; +} + +message SsccCheckAndDeleteRegionTxResponse { + required bool result = 1; + optional string exception = 2; + optional bool hasException = 3; +} + message SsccCheckAndPutRequest { required int64 transactionId = 1; required int64 startId = 2; @@ -122,6 +140,24 @@ message SsccCheckAndPutResponse { optional bool hasException = 3; } +message SsccCheckAndPutRegionTxRequest { + required int64 tid = 1; + required int64 startId = 2; + required bytes regionName = 3; + required bytes row = 4; + required bytes family = 5; + required bytes qualifier = 6; + required bytes value = 7; + required MutationProto put = 8; + required bool autoCommit = 9; +} + +message SsccCheckAndPutRegionTxResponse { + required bool result = 1; + optional string exception = 2; + optional bool hasException = 3; +} + message SsccCloseScannerRequest { required int64 transactionId = 1; required bytes regionName = 2; @@ -206,6 +242,23 @@ message SsccPerformScanResponse { optional bool hasException = 6; } +message SsccPutRegionTxRequest { + required int64 tid = 1; + required int64 startId = 2; + required bytes regionName = 3; + required MutationProto put = 4; + optional bool isStateless = 5; + required bool autoCommit = 6; +} + +message SsccPutRegionTxResponse { + optional Result result = 1; + optional string exception = 2; + optional bool hasException = 3; + optional int32 status = 4; +} + + message SsccPutTransactionalRequest { required int64 transactionId = 1; required int64 startId = 2; @@ -284,8 +337,12 @@ service SsccRegionService { returns(SsccBeginTransactionResponse); rpc checkAndDelete(SsccCheckAndDeleteRequest) returns(SsccCheckAndDeleteResponse); +// rpc checkAndDeleteRegionTx(SsccCheckAndDeleteRegionTxRequest) +// returns(SsccCheckAndDeleteRegionTxResponse); rpc checkAndPut(SsccCheckAndPutRequest) returns(SsccCheckAndPutResponse); + rpc checkAndPutRegionTx(SsccCheckAndPutRegionTxRequest) + returns(SsccCheckAndPutRegionTxResponse); rpc closeScanner(SsccCloseScannerRequest) returns(SsccCloseScannerResponse); rpc commit(SsccCommitRequest) @@ -304,6 +361,8 @@ service SsccRegionService { returns(SsccPerformScanResponse); rpc openScanner(SsccOpenScannerRequest) returns(SsccOpenScannerResponse); + rpc putRegionTx(SsccPutRegionTxRequest) + returns(SsccPutRegionTxResponse); rpc put(SsccPutTransactionalRequest) returns(SsccPutTransactionalResponse); rpc putMultiple(SsccPutMultipleTransactionalRequest) http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7d668e44/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto b/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto index c3975c7..5327cf1 100755 --- a/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto +++ b/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto @@ -1,6 +1,5 @@ // @@@ START COPYRIGHT @@@ // -// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file @@ -145,6 +144,23 @@ message CheckAndDeleteResponse { optional bool hasException = 3; } +message CheckAndDeleteRegionTxRequest { + required int64 transactionId = 1; + required bytes regionName = 2; + required bytes row = 3; + required bytes family = 4; + required bytes qualifier = 5; + required bytes value = 6; + required MutationProto delete = 7; + required bool autoCommit = 8; +} + +message CheckAndDeleteRegionTxResponse { + required bool result = 1; + optional string exception = 2; + optional bool hasException = 3; +} + message CheckAndPutRequest { required int64 transactionId = 1; required bytes regionName = 2; @@ -161,6 +177,23 @@ message CheckAndPutResponse { optional bool hasException = 3; } +message CheckAndPutRegionTxRequest { + required int64 tid = 1; + required bytes regionName = 2; + required bytes row = 3; + required bytes family = 4; + required bytes qualifier = 5; + required bytes value = 6; + required MutationProto put = 7; + required bool autoCommit = 8; +} + +message CheckAndPutRegionTxResponse { + required bool result = 1; + optional string exception = 2; + optional bool hasException = 3; +} + message CloseScannerRequest { required int64 transactionId = 1; required bytes regionName = 2; @@ -238,6 +271,19 @@ message PerformScanResponse { optional bool hasException = 6; } +message PutRegionTxRequest { + required int64 tid = 1; + required bytes regionName = 2; + required MutationProto put = 3; + required bool autoCommit = 4; +} + +message PutRegionTxResponse { + optional Result result = 1; + optional string exception = 2; + optional bool hasException = 3; +} + message PutTransactionalRequest { required int64 transactionId = 1; required bytes regionName = 2; @@ -396,8 +442,12 @@ service TrxRegionService { returns(BeginTransactionResponse); rpc checkAndDelete(CheckAndDeleteRequest) returns(CheckAndDeleteResponse); +// rpc checkAndDeleteRegionTx(CheckAndDeleteRegionTxRequest) +// returns(CheckAndDeleteRegionTxResponse); rpc checkAndPut(CheckAndPutRequest) returns(CheckAndPutResponse); + rpc checkAndPutRegionTx(CheckAndPutRegionTxRequest) + returns(CheckAndPutRegionTxResponse); rpc closeScanner(CloseScannerRequest) returns(CloseScannerResponse); rpc commit(CommitRequest) @@ -420,6 +470,8 @@ service TrxRegionService { returns(PerformScanResponse); rpc openScanner(OpenScannerRequest) returns(OpenScannerResponse); + rpc putRegionTx(PutRegionTxRequest) + returns(PutRegionTxResponse); rpc put(PutTransactionalRequest) returns(PutTransactionalResponse); rpc putMultiple(PutMultipleTransactionalRequest) http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7d668e44/core/sql/src/main/java/org/trafodion/sql/HTableClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HTableClient.java b/core/sql/src/main/java/org/trafodion/sql/HTableClient.java index 2474663..5fce633 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HTableClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HTableClient.java @@ -103,6 +103,7 @@ public class HTableClient { private static final int SCAN_FETCH = 3; private boolean useTRex; private boolean useTRexScanner; + private boolean useRegionTransaction; private String tableName; private static Connection connection; private ResultScanner scanner = null; @@ -327,6 +328,7 @@ public class HTableClient { if (logger.isDebugEnabled()) logger.debug("Enter HTableClient::init, tableName: " + tblName); this.useTRex = useTRex; tableName = tblName; + this.useRegionTransaction = true; if ( !this.useTRex ) { this.useTRexScanner = false; @@ -1383,20 +1385,33 @@ public class HTableClient { future = executorService.submit(new Callable() { public Object call() throws IOException { boolean res = true; - if (useTRex && (transID != 0)) - table.delete(transID, del); - else - table.delete(del); - return new Boolean(res); + if (useTRex && (transID != 0)) { + table.delete(transID, del); + } +// else if (useRegionTransaction){ +// logger.info("deleteRow using region TX"); +// table.deleteRegionTx(del, /* auto-commit */ true); +// } + else { + logger.info("deleteRow without transID "); + table.delete(del); + } + return true; } }); - return true; } else { - if (useTRex && (transID != 0)) - table.delete(transID, del); - else - table.delete(del); + if (useTRex && (transID != 0)) { + table.delete(transID, del); + } +// else if (useRegionTransaction){ +// logger.info("deleteRow using region TX"); +// table.deleteRegionTx(del, /* auto-commit */ true); +// } + else { + logger.info("deleteRow without transID "); + table.delete(del); + } } if (logger.isTraceEnabled()) logger.trace("Exit deleteRow"); return true; @@ -1487,9 +1502,16 @@ public class HTableClient { boolean res; if (useTRex && (transID != 0)) { res = table.checkAndDelete(transID, rowID, family, qualifier, colValToCheck, del); - } else { - res = table.checkAndDelete(rowID, family, qualifier, colValToCheck, del); - } + } +// else if (useRegionTransaction){ +// logger.info("checkAndDeleteRow using region TX"); +// res = table.checkAndDeleteRegionTx(rowID, family, qualifier, colValToCheck, +// del, /* autoCommit */ true); +// } + else { + logger.info("checkAndDeleteRow without transID "); + res = table.checkAndDelete(rowID, family, qualifier, colValToCheck, del); + } if (res == false) return false; @@ -1541,18 +1563,35 @@ public class HTableClient { boolean res = true; if (checkAndPut) { - if (useTRex && (transID != 0)) - res = table.checkAndPut(transID, rowID, - family1, qualifier1, colValToCheck, put); - else - res = table.checkAndPut(rowID, - family1, qualifier1, colValToCheck, put); + if (useTRex && (transID != 0)){ + logger.info("checkAndPut using put with transID " + transID); + res = table.checkAndPut(transID, rowID, + family1, qualifier1, colValToCheck, put); + } + else if (useRegionTransaction){ + logger.info("checkAndPutRegionTx with regionTX "); + res = table.checkAndPutRegionTx(rowID, + family1, qualifier1, colValToCheck, put, /* auto-commit */ true); + + } + else { + logger.info("checkAndPut without transID "); + res = table.checkAndPut(rowID, + family1, qualifier1, colValToCheck, put); + } } else { - if (useTRex && (transID != 0)) - table.put(transID, put); - else - table.put(put); + if (useTRex && (transID != 0)){ + logger.info("putRow using put with transID " + transID); + table.put(transID, put); + } + else if (useRegionTransaction){ + logger.info("putRow using putRegionTx"); + table.putRegionTx(put, /* auto-commit */ true); + }else{ + logger.info("putRow not using putRegionTx"); + table.put(put); + } } return new Boolean(res); } @@ -1561,18 +1600,34 @@ public class HTableClient { } else { boolean result = true; if (checkAndPut) { - if (useTRex && (transID != 0)) - result = table.checkAndPut(transID, rowID, + if (useTRex && (transID != 0)){ + logger.info("checkAndPut using put with transID " + transID); + result = table.checkAndPut(transID, rowID, family1, qualifier1, colValToCheck, put); - else - result = table.checkAndPut(rowID, + } + else if (useRegionTransaction){ + logger.info("checkAndPutRegionTx using regionTX "); + result = table.checkAndPutRegionTx(rowID, family1, qualifier1, + colValToCheck, put, /* auto-commit */ true); + } + else { + logger.info("checkAndPut without transID "); + result = table.checkAndPut(rowID, family1, qualifier1, colValToCheck, put); + } } else { - if (useTRex && (transID != 0)) - table.put(transID, put); - else - table.put(put); + if (useTRex && (transID != 0)){ + logger.info("putRow using put with transID " + transID); + table.put(transID, put); + } + else if (useRegionTransaction){ + logger.info("putRow using putRegionTx"); + table.putRegionTx(put, true /* also commit */); + }else{ + logger.info("putRow not using putRegionTx"); + table.put(put); + } } return result; }