Cherry-pick changes for JIRA 2095
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/b452921f Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/b452921f Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/b452921f Branch: refs/heads/master Commit: b452921fe23a7f6c1cebc8dd670fbcc98a7e88d9 Parents: c74e3d6 cc51150 Author: Sean Broeder <[email protected]> Authored: Thu Jul 7 03:50:39 2016 +0000 Committer: Sean Broeder <[email protected]> Committed: Sun Jul 17 03:47:31 2016 +0000 ---------------------------------------------------------------------- .../hbase/client/transactional/RMInterface.java | 162 +------------------ .../hbase/client/transactional/TmDDL.java | 4 +- .../transactional/TransactionManager.java | 142 +++++++++++++++- .../java/org/trafodion/dtm/HBaseTxClient.java | 3 + core/sql/regress/executor/EXPECTED022.SB | 16 +- docs/shared/license.txt | 5 + docs/shared/revisions.txt | 4 +- docs/src/site/markdown/download.md | 8 +- docs/src/site/markdown/index.md | 8 +- docs/src/site/markdown/release-notes-2-0-1.md | 10 +- docs/src/site/resources/css/site.css | 1 + docs/src/site/site.xml | 12 +- 12 files changed, 181 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b452921f/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java ---------------------------------------------------------------------- diff --cc core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java index fe98284,df74a45..dc3e140 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java @@@ -73,18 -68,6 +68,13 @@@ import org.apache.hadoop.hbase.regionse import org.apache.hadoop.hbase.regionserver.transactional.IdTmException; import org.apache.hadoop.hbase.regionserver.transactional.IdTmId; - import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService; - import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest; - import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse; - +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; + +import org.apache.zookeeper.KeeperException; + +import com.google.protobuf.ByteString; + - import java.util.ArrayList; import java.util.Map; import java.util.HashMap; import java.util.Iterator; @@@ -372,20 -222,12 +229,12 @@@ public class RMInterface } public void createTable(HTableDescriptor desc, byte[][] keys, int numSplits, int keyLength, long transID) throws IOException { - - if (LOG.isTraceEnabled()) LOG.trace("createTable ENTER: "); - byte[] lv_byte_desc = desc.toByteArray(); - byte[] lv_byte_tblname = desc.getNameAsString().getBytes(); - if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc bytearray: " + lv_byte_desc + "desc in hex: " + Hex.encodeHexString(lv_byte_desc)); - createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, lv_byte_tblname); + if (LOG.isTraceEnabled()) LOG.trace("Enter createTable, txid: " + transID + " Table: " + desc.getNameAsString()); + byte[] lv_byte_desc = desc.toByteArray(); + byte[] lv_byte_tblname = desc.getNameAsString().getBytes(); + if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc bytearray: " + lv_byte_desc + "desc in hex: " + Hex.encodeHexString(lv_byte_desc)); + createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, lv_byte_tblname); - TransactionState ts = mapTransactionStates.get(transID); - if (LOG.isTraceEnabled()) LOG.trace("createTable: pushing epoch into regions for : " + desc.getNameAsString()); - if (ts == null){ - if (LOG.isTraceEnabled()) LOG.trace("pushing epoch into regions but unable to get ts object for transID : " + transID); - throw new IOException("createTable push epoch exception for table " + desc.getNameAsString()); - } - pushRegionEpoch(desc, ts); - if (LOG.isTraceEnabled()) LOG.trace("createTable: epoch pushed into regions for : " + desc.getNameAsString()); + if (LOG.isTraceEnabled()) LOG.trace("Exit createTable, txid: " + transID + " Table: " + desc.getNameAsString()); } public void truncateTableOnAbort(String tblName, long transID) throws IOException { @@@ -419,21 -261,15 +268,18 @@@ static public synchronized void unregisterTransaction(final long transactionID) { TransactionState ts = null; if (LOG.isTraceEnabled()) LOG.trace("Enter unregisterTransaction txid: " + transactionID); - try { -- ts = mapTransactionStates.remove(transactionID); - } catch (Exception e) { - LOG.warn("Ignoring exception. mapTransactionStates.remove for transid " + transactionID + - " failed with exception " + e); - return; - } ++ ts = mapTransactionStates.remove(transactionID); if (ts == null) { LOG.warn("mapTransactionStates.remove did not find transid " + transactionID); } ++ if (LOG.isTraceEnabled()) LOG.trace("Exit unregisterTransaction txid: " + transactionID); } // Not used? static public synchronized void unregisterTransaction(TransactionState ts) { ++ if (LOG.isTraceEnabled()) LOG.trace("Enter unregisterTransaction ts: " + ts.getTransactionId()); mapTransactionStates.remove(ts.getTransactionId()); ++ if (LOG.isTraceEnabled()) LOG.trace("Exit unregisterTransaction ts: " + ts.getTransactionId()); } public synchronized Result get(final long transactionID, final Get get) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b452921f/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java ---------------------------------------------------------------------- diff --cc core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java index 783b70c,783b70c..9985861 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java @@@ -106,8 -106,8 +106,8 @@@ public class TmDDL public void putRow(final long transid, final String Operation, final String tableName) throws IOException { long threadId = Thread.currentThread().getId(); -- if (LOG.isTraceEnabled()) LOG.trace("TmDDL putRow Operation, TxID: " + transid + "Thread ID:" + threadId -- + "TableName:" + tableName + "Operation :" + Operation); ++ if (LOG.isTraceEnabled()) LOG.trace("TmDDL putRow Operation, TxID: " + transid + " Thread ID:" + threadId ++ + " TableName:" + tableName + " Operation :" + Operation); byte [] value = null; StringBuilder tableString = null; Result r = null; http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b452921f/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java ---------------------------------------------------------------------- diff --cc core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java index 9c950d7,2c7e6af..cc4b2fc --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java @@@ -70,22 -70,22 +70,25 @@@ import org.apache.hadoop.hbase.client.H import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.Durability; ++import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos; ++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleRequest; ++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleResponse; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionRequest; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionResponse; ++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleRequest; ++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleResponse; ++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleRequest; ++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleResponse; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequest; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestRequest; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestResponse; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitResponse; ++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest; ++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestRequest; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestResponse; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService; --import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleRequest; --import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleResponse; --import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleRequest; --import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleResponse; --import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleRequest; --import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleResponse; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; @@@ -1453,6 -1452,6 +1456,72 @@@ public class TransactionManager if(LOG.isTraceEnabled()) LOG.trace("doAbortX - Batch -- EXIT txID: " + transactionId); return 0; } ++ ++ public Integer pushRegionEpochX(final TransactionState txState, ++ final HRegionLocation location, HConnection connection) throws IOException { ++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Entry txState: " + txState ++ + " location: " + location); ++ ++ Batch.Call<TrxRegionService, PushEpochResponse> callable = ++ new Batch.Call<TrxRegionService, PushEpochResponse>() { ++ ServerRpcController controller = new ServerRpcController(); ++ BlockingRpcCallback<PushEpochResponse> rpcCallback = ++ new BlockingRpcCallback<PushEpochResponse>(); ++ ++ public PushEpochResponse call(TrxRegionService instance) throws IOException { ++ org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest.Builder ++ builder = PushEpochRequest.newBuilder(); ++ builder.setTransactionId(txState.getTransactionId()); ++ builder.setEpoch(txState.getStartEpoch()); ++ builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(location.getRegionInfo().getRegionName()))); ++ instance.pushOnlineEpoch(controller, builder.build(), rpcCallback); ++ return rpcCallback.get(); ++ } ++ }; ++ ++ Map<byte[], PushEpochResponse> result = null; ++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- before coprocessorService: startKey: " ++ + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8")); ++ ++ boolean loopExit = false; ++ do ++ { ++ try { ++ result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable); ++ loopExit = true; ++ } ++ catch (ServiceException se) { ++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- ServiceException ", se); ++ throw new IOException(se); ++ } ++ catch (Throwable t) { ++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Throwable ", t); ++ throw new IOException(t); ++ } ++ ++ } while (loopExit == false); ++ ++ ++ if(result.size() == 1){ ++ // size is 1 ++ for (PushEpochResponse eresponse : result.values()){ ++ if(eresponse.getHasException()) { ++ String exceptionString = new String (eresponse.getException().toString()); ++ LOG.error("pushRegionEpochX - coprocessor exceptionString: " + exceptionString); ++ throw new IOException(eresponse.getException()); ++ } ++ } ++ } ++ else { ++ LOG.error("pushRegionEpochX, received incorrect result size: " + result.size() + " txid: " ++ + txState.getTransactionId() + " location: " + location.getRegionInfo().getRegionNameAsString()); ++ return 1; ++ } ++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Exit txState: " + txState ++ + " location: " + location); ++ return 0; ++ } ++ } // TransactionManagerCallable private void checkException(TransactionState ts, List<TransactionRegionLocation> locations, List<String> exceptions) throws IOException { @@@ -1602,7 -1600,7 +1671,7 @@@ if (LOG.isTraceEnabled()) LOG.trace("beginTransaction NOT retrieving new startId"); } if (LOG.isTraceEnabled()) LOG.trace("beginTransaction setting transaction: [" + ts.getTransactionId() + -- "] with startId: " + startIdVal); ++ "], startEpoch: " + ts.getStartEpoch() + " and startId: " + startIdVal); ts.setStartId(startIdVal); return ts; } @@@ -1971,6 -1969,6 +2040,56 @@@ if(LOG.isTraceEnabled()) LOG.trace("retryCommit -- EXIT -- txid: " + transactionState.getTransactionId()); } ++ public void pushRegionEpoch (HTableDescriptor desc, final TransactionState ts) throws IOException { ++ LOG.info("pushRegionEpoch start; transId: " + ts.getTransactionId()); ++ ++ TransactionalTable ttable1 = new TransactionalTable(Bytes.toBytes(desc.getNameAsString())); ++ HConnection connection = ttable1.getConnection(); ++ long lvTransid = ts.getTransactionId(); ++ RegionLocator rl = connection.getRegionLocator(desc.getTableName()); ++ List<HRegionLocation> regionList = rl.getAllRegionLocations(); ++ // (need one CompletionService per request for thread safety, can share pool of threads ++ CompletionService<Integer> compPool = new ExecutorCompletionService<Integer>(threadPool); ++ ++ boolean complete = false; ++ int loopCount = 0; ++ int result = 0; ++ for (HRegionLocation location : regionList) { ++ final byte[] regionName = location.getRegionInfo().getRegionName(); ++ final HConnection lv_connection = connection; ++ final TransactionRegionLocation lv_location = ++ new TransactionRegionLocation(location.getRegionInfo(), location.getServerName()); ++ compPool.submit(new TransactionManagerCallable(ts, lv_location, lv_connection) { ++ public Integer call() throws IOException { ++ return pushRegionEpochX(ts, lv_location, lv_connection); ++ } ++ }); ++ boolean loopExit = false; ++ do ++ { ++ try { ++ result = compPool.take().get(); ++ loopExit = true; ++ } ++ catch (InterruptedException ie) {} ++ catch (ExecutionException e) { ++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpoch -- ExecutionException ", e); ++ throw new IOException(e); ++ } ++ ++ } while (loopExit == false); ++ ++ if ( result != 0 ){ ++ LOG.error("pushRegionEpoch result " + result + " returned from region " ++ + location.getRegionInfo().getRegionName()); ++ throw new IOException("pushRegionEpoch result " + result + " returned from region " ++ + location.getRegionInfo().getRegionName()); ++ } ++ } ++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpoch end transid: " + ts.getTransactionId()); ++ return; ++ } ++ public void retryAbort(final TransactionState transactionState) throws IOException { if(LOG.isTraceEnabled()) LOG.trace("retryAbort -- ENTRY -- txid: " + transactionState.getTransactionId()); synchronized(transactionState.getRetryRegions()) { @@@ -2639,6 -2637,6 +2758,12 @@@ //record this create in TmDDL. tmDDL.putRow( transactionState.getTransactionId(), "CREATE", desc.getNameAsString()); ++ ++ if (LOG.isTraceEnabled()) LOG.trace("createTable: epoch pushed into regions for : " + desc.getNameAsString()); ++ pushRegionEpoch(desc, transactionState); ++ ++ if (LOG.isTraceEnabled()) LOG.trace("createTable EXIT, transactionState: " + transactionState.getTransactionId()); ++ } private class ChangeFlags { @@@ -2883,7 -2881,7 +3008,7 @@@ public void alterTable(final TransactionState transactionState, String tblName, Object[] tableOptions) throws IOException { -- if (LOG.isTraceEnabled()) LOG.trace("createTable ENTRY, transactionState: " + transactionState.getTransactionId()); ++ if (LOG.isTraceEnabled()) LOG.trace("alterTable ENTRY, transactionState: " + transactionState.getTransactionId()); HTableDescriptor htblDesc = hbadmin.getTableDescriptor(tblName.getBytes()); HColumnDescriptor[] families = htblDesc.getColumnFamilies(); @@@ -2908,6 -2906,6 +3033,7 @@@ //record this create in TmDDL. tmDDL.putRow( transactionState.getTransactionId(), "ALTER", tblName); ++ if (LOG.isTraceEnabled()) LOG.trace("alterTable EXIT, transactionState: " + transactionState.getTransactionId()); } public void registerTruncateOnAbort(final TransactionState transactionState, String tblName) http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b452921f/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java ---------------------------------------------------------------------- diff --cc core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java index 4b1c4d8,4b1c4d8..ae058de --- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java +++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java @@@ -706,6 -706,6 +706,9 @@@ public class HBaseTxClient throw new Exception("createTable call error"); } ++ ++ ++ if (LOG.isTraceEnabled()) LOG.trace("Exit callCreateTable, txid: [" + transactionId + "] returning RET_OK"); return TransReturnCode.RET_OK.getShort(); }
