Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/581#discussion_r70303183
  
    --- Diff: 
core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
 ---
    @@ -137,6 +163,130 @@ public RMInterface() throws IOException {
     
         }
     
    +    public void pushRegionEpoch (HTableDescriptor desc, final 
TransactionState ts) throws IOException {
    +       LOG.info("pushRegionEpoch start; transId: " + 
ts.getTransactionId());
    +
    +       TransactionalTable ttable1 = new 
TransactionalTable(Bytes.toBytes(desc.getNameAsString()));
    +       HConnection connection = ttable1.getConnection();
    +       long lvTransid = ts.getTransactionId();
    +       RegionLocator rl = connection.getRegionLocator(desc.getTableName());
    +       List<HRegionLocation> regionList = rl.getAllRegionLocations();
    +
    +       boolean complete = false;
    +       int loopCount = 0;
    +       int result = 0;
    +
    +       for (HRegionLocation location : regionList) {
    +          final byte[] regionName = 
location.getRegionInfo().getRegionName();
    +          if (compPool == null){
    +              LOG.info("pushRegionEpoch compPool is null");
    +              threadPool = Executors.newFixedThreadPool(intThreads);
    +              compPool = new 
ExecutorCompletionService<Integer>(threadPool);
    +          }
    +
    +          final HRegionLocation lv_location = location;
    +          final HConnection lv_connection = connection;
    +          compPool.submit(new RMCallable2(ts, lv_location, lv_connection ) 
{
    +             public Integer call() throws IOException {
    +                return pushRegionEpochX(ts, lv_location, lv_connection);
    +             }
    +          });
    +          try {
    +            result = compPool.take().get();
    +          } catch(Exception ex) {
    +            throw new IOException(ex);
    +          }
    +          if ( result != 0 ){
    +             LOG.error("pushRegionEpoch result " + result + " returned 
from region "
    +                          + location.getRegionInfo().getRegionName());
    +             throw new IOException("pushRegionEpoch result " + result + " 
returned from region "
    +                      + location.getRegionInfo().getRegionName());
    +          }
    +       }
    +       if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpoch end transid: " 
+ ts.getTransactionId());
    +       return;
    +    }
    +
    +    private abstract class RMCallable2 implements Callable<Integer>{
    +       TransactionState transactionState;
    +       HRegionLocation  location;
    +       HConnection connection;
    +       HTable table;
    +       byte[] startKey;
    +       byte[] endKey_orig;
    +       byte[] endKey;
    +
    +       RMCallable2(TransactionState txState, HRegionLocation location, 
HConnection connection) {
    +          this.transactionState = txState;
    +          this.location = location;
    +          this.connection = connection;
    +          try {
    +             table = new HTable(location.getRegionInfo().getTable(), 
connection);
    +          } catch(IOException e) {
    +             LOG.error("Error obtaining HTable instance " + e);
    +             table = null;
    +          }
    +          startKey = location.getRegionInfo().getStartKey();
    +          endKey_orig = location.getRegionInfo().getEndKey();
    +          endKey = TransactionManager.binaryIncrementPos(endKey_orig, -1);
    +
    +       }
    +
    +       public Integer pushRegionEpochX(final TransactionState txState,
    +                              final HRegionLocation location, HConnection 
connection) throws IOException {
    +          if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Entry 
txState: " + txState
    +                   + " location: " + location);
    +           
    +          Batch.Call<TrxRegionService, PushEpochResponse> callable =
    +              new Batch.Call<TrxRegionService, PushEpochResponse>() {
    +                 ServerRpcController controller = new 
ServerRpcController();
    +                 BlockingRpcCallback<PushEpochResponse> rpcCallback =
    +                    new BlockingRpcCallback<PushEpochResponse>();
    +
    +                 @Override
    +                 public PushEpochResponse call(TrxRegionService instance) 
throws IOException {
    +                    
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest.Builder
    +                    builder = PushEpochRequest.newBuilder();
    +                    builder.setTransactionId(txState.getTransactionId());
    +                    builder.setEpoch(txState.getStartEpoch());
    +                    
builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(location.getRegionInfo().getRegionName())));
    +                    instance.pushOnlineEpoch(controller, builder.build(), 
rpcCallback);
    +                    return rpcCallback.get();
    +                 }
    +              };
    +
    +              Map<byte[], PushEpochResponse> result = null;
    +              try {
    +                 if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- 
before coprocessorService: startKey: "
    +                     + new String(startKey, "UTF-8") + " endKey: " + new 
String(endKey, "UTF-8"));
    +                 result = table.coprocessorService(TrxRegionService.class, 
startKey, endKey, callable);
    +              } catch (Throwable e) {
    --- End diff --
    
    ditto


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to