Github user selvaganesang commented on a diff in the pull request:
https://github.com/apache/incubator-trafodion/pull/581#discussion_r70303183
--- Diff:
core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
---
@@ -137,6 +163,130 @@ public RMInterface() throws IOException {
}
+ public void pushRegionEpoch (HTableDescriptor desc, final
TransactionState ts) throws IOException {
+ LOG.info("pushRegionEpoch start; transId: " +
ts.getTransactionId());
+
+ TransactionalTable ttable1 = new
TransactionalTable(Bytes.toBytes(desc.getNameAsString()));
+ HConnection connection = ttable1.getConnection();
+ long lvTransid = ts.getTransactionId();
+ RegionLocator rl = connection.getRegionLocator(desc.getTableName());
+ List<HRegionLocation> regionList = rl.getAllRegionLocations();
+
+ boolean complete = false;
+ int loopCount = 0;
+ int result = 0;
+
+ for (HRegionLocation location : regionList) {
+ final byte[] regionName =
location.getRegionInfo().getRegionName();
+ if (compPool == null){
+ LOG.info("pushRegionEpoch compPool is null");
+ threadPool = Executors.newFixedThreadPool(intThreads);
+ compPool = new
ExecutorCompletionService<Integer>(threadPool);
+ }
+
+ final HRegionLocation lv_location = location;
+ final HConnection lv_connection = connection;
+ compPool.submit(new RMCallable2(ts, lv_location, lv_connection )
{
+ public Integer call() throws IOException {
+ return pushRegionEpochX(ts, lv_location, lv_connection);
+ }
+ });
+ try {
+ result = compPool.take().get();
+ } catch(Exception ex) {
+ throw new IOException(ex);
+ }
+ if ( result != 0 ){
+ LOG.error("pushRegionEpoch result " + result + " returned
from region "
+ + location.getRegionInfo().getRegionName());
+ throw new IOException("pushRegionEpoch result " + result + "
returned from region "
+ + location.getRegionInfo().getRegionName());
+ }
+ }
+ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpoch end transid: "
+ ts.getTransactionId());
+ return;
+ }
+
+ private abstract class RMCallable2 implements Callable<Integer>{
+ TransactionState transactionState;
+ HRegionLocation location;
+ HConnection connection;
+ HTable table;
+ byte[] startKey;
+ byte[] endKey_orig;
+ byte[] endKey;
+
+ RMCallable2(TransactionState txState, HRegionLocation location,
HConnection connection) {
+ this.transactionState = txState;
+ this.location = location;
+ this.connection = connection;
+ try {
+ table = new HTable(location.getRegionInfo().getTable(),
connection);
+ } catch(IOException e) {
+ LOG.error("Error obtaining HTable instance " + e);
+ table = null;
+ }
+ startKey = location.getRegionInfo().getStartKey();
+ endKey_orig = location.getRegionInfo().getEndKey();
+ endKey = TransactionManager.binaryIncrementPos(endKey_orig, -1);
+
+ }
+
+ public Integer pushRegionEpochX(final TransactionState txState,
+ final HRegionLocation location, HConnection
connection) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Entry
txState: " + txState
+ + " location: " + location);
+
+ Batch.Call<TrxRegionService, PushEpochResponse> callable =
+ new Batch.Call<TrxRegionService, PushEpochResponse>() {
+ ServerRpcController controller = new
ServerRpcController();
+ BlockingRpcCallback<PushEpochResponse> rpcCallback =
+ new BlockingRpcCallback<PushEpochResponse>();
+
+ @Override
+ public PushEpochResponse call(TrxRegionService instance)
throws IOException {
+
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest.Builder
+ builder = PushEpochRequest.newBuilder();
+ builder.setTransactionId(txState.getTransactionId());
+ builder.setEpoch(txState.getStartEpoch());
+
builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(location.getRegionInfo().getRegionName())));
+ instance.pushOnlineEpoch(controller, builder.build(),
rpcCallback);
+ return rpcCallback.get();
+ }
+ };
+
+ Map<byte[], PushEpochResponse> result = null;
+ try {
+ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX --
before coprocessorService: startKey: "
+ + new String(startKey, "UTF-8") + " endKey: " + new
String(endKey, "UTF-8"));
+ result = table.coprocessorService(TrxRegionService.class,
startKey, endKey, callable);
+ } catch (Throwable e) {
--- End diff --
ditto
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---