Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 3175d8073 -> 62906b49c


HBASE-9899 for idempotent operation dups, return the result instead of throwing 
conflict exception (Guanghao Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/62906b49
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/62906b49
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/62906b49

Branch: refs/heads/branch-1.3
Commit: 62906b49cf37dc2e9b58b4e51a76935bc0a5e842
Parents: 3175d80
Author: stack <[email protected]>
Authored: Mon Aug 8 21:44:37 2016 -0700
Committer: stack <[email protected]>
Committed: Mon Aug 8 21:46:27 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/protobuf/ProtobufUtil.java     | 56 ++++++++++++++++
 .../hadoop/hbase/regionserver/HRegion.java      | 70 ++++++++++++++++----
 .../hbase/regionserver/RSRpcServices.java       | 64 +++++++++++-------
 .../hadoop/hbase/regionserver/Region.java       | 11 +++
 .../hbase/regionserver/ServerNonceManager.java  | 43 ++++++++++++
 .../hbase/client/HConnectionTestingUtility.java | 45 +++++++++++++
 .../hadoop/hbase/client/TestFromClientSide.java | 45 +++++++++++++
 .../client/TestIncrementsFromClientSide.java    | 48 ++++++++++++++
 .../hadoop/hbase/client/TestMultiParallel.java  | 13 ++--
 .../TestScannerHeartbeatMessages.java           |  2 +-
 10 files changed, 352 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/62906b49/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index e9c8102..ac48ecd 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -852,6 +852,62 @@ public final class ProtobufUtil {
   }
 
   /**
+   * Convert a protocol buffer Mutate to a Get.
+   * @param proto the protocol buffer Mutate to convert.
+   * @param cellScanner
+   * @return the converted client get.
+   * @throws IOException
+   */
+  public static Get toGet(final MutationProto proto, final CellScanner 
cellScanner)
+      throws IOException {
+    MutationType type = proto.getMutateType();
+    assert type == MutationType.INCREMENT || type == MutationType.APPEND : 
type.name();
+    byte[] row = proto.hasRow() ? proto.getRow().toByteArray() : null;
+    Get get = null;
+    int cellCount = proto.hasAssociatedCellCount() ? 
proto.getAssociatedCellCount() : 0;
+    if (cellCount > 0) {
+      // The proto has metadata only and the data is separate to be found in 
the cellScanner.
+      if (cellScanner == null) {
+        throw new DoNotRetryIOException("Cell count of " + cellCount + " but 
no cellScanner: "
+            + TextFormat.shortDebugString(proto));
+      }
+      for (int i = 0; i < cellCount; i++) {
+        if (!cellScanner.advance()) {
+          throw new DoNotRetryIOException("Cell count of " + cellCount + " but 
at index " + i
+              + " no cell returned: " + TextFormat.shortDebugString(proto));
+        }
+        Cell cell = cellScanner.current();
+        if (get == null) {
+          get = new Get(Bytes.copy(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength()));
+        }
+        get.addColumn(Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(),
+          cell.getFamilyLength()), Bytes.copy(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+          cell.getQualifierLength()));
+      }
+    } else {
+      get = new Get(row);
+      for (ColumnValue column : proto.getColumnValueList()) {
+        byte[] family = column.getFamily().toByteArray();
+        for (QualifierValue qv : column.getQualifierValueList()) {
+          byte[] qualifier = qv.getQualifier().toByteArray();
+          if (!qv.hasValue()) {
+            throw new DoNotRetryIOException("Missing required field: qualifier 
value");
+          }
+          get.addColumn(family, qualifier);
+        }
+      }
+    }
+    if (proto.hasTimeRange()) {
+      TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
+      get.setTimeRange(timeRange.getMin(), timeRange.getMax());
+    }
+    for (NameBytesPair attribute : proto.getAttributeList()) {
+      get.setAttribute(attribute.getName(), 
attribute.getValue().toByteArray());
+    }
+    return get;
+  }
+
+  /**
    * Convert a client Scan to a protocol buffer Scan
    *
    * @param scan the client Scan to convert

http://git-wip-us.apache.org/repos/asf/hbase/blob/62906b49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 91917de..b56c887 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -2623,8 +2623,13 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
   }
 
   @Override
-  public RegionScanner getScanner(Scan scan,
-      List<KeyValueScanner> additionalScanners) throws IOException {
+  public RegionScanner getScanner(Scan scan, List<KeyValueScanner> 
additionalScanners)
+      throws IOException {
+    return getScanner(scan, additionalScanners, HConstants.NO_NONCE, 
HConstants.NO_NONCE);
+  }
+
+  private RegionScanner getScanner(Scan scan, List<KeyValueScanner> 
additionalScanners,
+      long nonceGroup, long nonce) throws IOException {
     startRegionOperation(Operation.SCAN);
     try {
       // Verify families are all valid
@@ -2638,7 +2643,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           checkFamily(family);
         }
       }
-      return instantiateRegionScanner(scan, additionalScanners);
+      return instantiateRegionScanner(scan, additionalScanners, nonceGroup, 
nonce);
     } finally {
       closeRegionOperation(Operation.SCAN);
     }
@@ -2646,6 +2651,12 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
   protected RegionScanner instantiateRegionScanner(Scan scan,
       List<KeyValueScanner> additionalScanners) throws IOException {
+    return instantiateRegionScanner(scan, additionalScanners, 
HConstants.NO_NONCE,
+      HConstants.NO_NONCE);
+  }
+
+  protected RegionScanner instantiateRegionScanner(Scan scan,
+      List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) 
throws IOException {
     if (scan.isReversed()) {
       if (scan.getFilter() != null) {
         scan.getFilter().setReversed(true);
@@ -5647,8 +5658,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       return region.getRegionInfo();
     }
 
-    RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, 
HRegion region)
-        throws IOException {
+    RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, 
HRegion region,
+        long nonceGroup, long nonce) throws IOException {
       this.region = region;
       this.maxResultSize = scan.getMaxResultSize();
       if (scan.hasFilter()) {
@@ -5677,10 +5688,25 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       // getSmallestReadPoint, before scannerReadPoints is updated.
       IsolationLevel isolationLevel = scan.getIsolationLevel();
       synchronized(scannerReadPoints) {
-        this.readPt = getReadpoint(isolationLevel);
+        if (nonce == HConstants.NO_NONCE || rsServices == null
+            || rsServices.getNonceManager() == null) {
+          this.readPt = getReadpoint(isolationLevel);
+        } else {
+          this.readPt = 
rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
+        }
         scannerReadPoints.put(this, this.readPt);
       }
 
+      initializeScanners(scan, additionalScanners);
+    }
+
+    RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, 
HRegion region)
+        throws IOException {
+      this(scan, additionalScanners, region, HConstants.NO_NONCE, 
HConstants.NO_NONCE);
+    }
+
+    protected void initializeScanners(Scan scan, List<KeyValueScanner> 
additionalScanners)
+        throws IOException {
       // Here we separate all scanners into two lists - scanner that provide 
data required
       // by the filter to operate (scanners list) and all others 
(joinedScanners list).
       List<KeyValueScanner> scanners = new 
ArrayList<KeyValueScanner>(scan.getFamilyMap().size());
@@ -6963,25 +6989,31 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
   @Override
   public List<Cell> get(Get get, boolean withCoprocessor) throws IOException {
+    return get(get, withCoprocessor, HConstants.NO_NONCE, HConstants.NO_NONCE);
+  }
 
+  @Override
+  public List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, 
long nonce)
+      throws IOException {
     List<Cell> results = new ArrayList<Cell>();
 
     // pre-get CP hook
     if (withCoprocessor && (coprocessorHost != null)) {
-       if (coprocessorHost.preGet(get, results)) {
-         return results;
-       }
+      if (coprocessorHost.preGet(get, results)) {
+        return results;
+      }
     }
-    long before =  EnvironmentEdgeManager.currentTime();
+    long before = EnvironmentEdgeManager.currentTime();
     Scan scan = new Scan(get);
 
     RegionScanner scanner = null;
     try {
-      scanner = getScanner(scan);
+      scanner = getScanner(scan, null, nonceGroup, nonce);
       scanner.next(results);
     } finally {
-      if (scanner != null)
+      if (scanner != null) {
         scanner.close();
+      }
     }
 
     // post-get CP hook
@@ -7455,7 +7487,12 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           }
           // Do a get on the write entry... this will block until sequenceid 
is assigned... w/o it,
           // TestAtomicOperation fails.
-          walKey.getWriteEntry();
+          WriteEntry writeEntry = walKey.getWriteEntry();
+          // save mvcc to this nonce's OperationContext
+          if (rsServices != null && rsServices.getNonceManager() != null) {
+            rsServices.getNonceManager().addMvccToOperationContext(nonceGroup, 
nonce,
+              writeEntry.getWriteNumber());
+          }
 
           // Actually write to Memstore now
           if (!tempMemstore.isEmpty()) {
@@ -7681,7 +7718,12 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
             walKey = this.appendEmptyEdit(this.wal);
           }
           // Get WriteEntry. Will wait on assign of the sequence id.
-          walKey.getWriteEntry();
+          WriteEntry writeEntry = walKey.getWriteEntry();
+          // save mvcc to this nonce's OperationContext
+          if (rsServices != null && rsServices.getNonceManager() != null) {
+            rsServices.getNonceManager().addMvccToOperationContext(nonceGroup, 
nonce,
+              writeEntry.getWriteNumber());
+          }
 
           // Now write to memstore, a family at a time.
           for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/62906b49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 329abb6..135bf6a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -349,11 +349,11 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
    * Starts the nonce operation for a mutation, if needed.
    * @param mutation Mutation.
    * @param nonceGroup Nonce group from the request.
-   * @returns Nonce used (can be NO_NONCE).
+   * @returns whether to proceed this mutation.
    */
-  private long startNonceOperation(final MutationProto mutation, long 
nonceGroup)
+  private boolean startNonceOperation(final MutationProto mutation, long 
nonceGroup)
       throws IOException, OperationConflictException {
-    if (regionServer.nonceManager == null || !mutation.hasNonce()) return 
HConstants.NO_NONCE;
+    if (regionServer.nonceManager == null || !mutation.hasNonce()) return true;
     boolean canProceed = false;
     try {
       canProceed = regionServer.nonceManager.startOperation(
@@ -361,14 +361,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     } catch (InterruptedException ex) {
       throw new InterruptedIOException("Nonce start operation interrupted");
     }
-    if (!canProceed) {
-      // TODO: instead, we could convert append/increment to get w/mvcc
-      String message = "The operation with nonce {" + nonceGroup + ", " + 
mutation.getNonce()
-        + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
-        + "] may have already completed";
-      throw new OperationConflictException(message);
-    }
-    return mutation.getNonce();
+    return canProceed;
   }
 
   /**
@@ -530,29 +523,41 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
    * Execute an append mutation.
    *
    * @param region
-   * @param m
+   * @param mutation
    * @param cellScanner
+   * @param nonce group
    * @return result to return to client if default operation should be
    * bypassed as indicated by RegionObserver, null otherwise
    * @throws IOException
    */
-  private Result append(final Region region, final OperationQuota quota, final 
MutationProto m,
-      final CellScanner cellScanner, long nonceGroup) throws IOException {
+  private Result append(final Region region, final OperationQuota quota,
+      final MutationProto mutation, final CellScanner cellScanner, long 
nonceGroup)
+      throws IOException {
     long before = EnvironmentEdgeManager.currentTime();
-    Append append = ProtobufUtil.toAppend(m, cellScanner);
+    Append append = ProtobufUtil.toAppend(mutation, cellScanner);
     quota.addMutation(append);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
       r = region.getCoprocessorHost().preAppend(append);
     }
     if (r == null) {
-      long nonce = startNonceOperation(m, nonceGroup);
+      boolean canProceed = startNonceOperation(mutation, nonceGroup);
       boolean success = false;
       try {
-        r = region.append(append, nonceGroup, nonce);
+        long nonce = mutation.hasNonce() ? mutation.getNonce() : 
HConstants.NO_NONCE;
+        if (canProceed) {
+          r = region.append(append, nonceGroup, nonce);
+        } else {
+          // convert duplicate append to get
+          List<Cell> results = region.get(ProtobufUtil.toGet(mutation, 
cellScanner), false,
+            nonceGroup, nonce);
+          r = Result.create(results);
+        }
         success = true;
       } finally {
-        endNonceOperation(m, nonceGroup, success);
+        if (canProceed) {
+          endNonceOperation(mutation, nonceGroup, success);
+        }
       }
       if (region.getCoprocessorHost() != null) {
         region.getCoprocessorHost().postAppend(append, r);
@@ -570,26 +575,39 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
    *
    * @param region
    * @param mutation
+   * @param cellScanner
+   * @param nonce group
    * @return the Result
    * @throws IOException
    */
   private Result increment(final Region region, final OperationQuota quota,
-      final MutationProto mutation, final CellScanner cells, long nonceGroup) 
throws IOException {
+      final MutationProto mutation, final CellScanner cellScanner, long 
nonceGroup)
+      throws IOException {
     long before = EnvironmentEdgeManager.currentTime();
-    Increment increment = ProtobufUtil.toIncrement(mutation, cells);
+    Increment increment = ProtobufUtil.toIncrement(mutation, cellScanner);
     quota.addMutation(increment);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
       r = region.getCoprocessorHost().preIncrement(increment);
     }
     if (r == null) {
-      long nonce = startNonceOperation(mutation, nonceGroup);
+      boolean canProceed = startNonceOperation(mutation, nonceGroup);
       boolean success = false;
       try {
-        r = region.increment(increment, nonceGroup, nonce);
+        long nonce = mutation.hasNonce() ? mutation.getNonce() : 
HConstants.NO_NONCE;
+        if (canProceed) {
+          r = region.increment(increment, nonceGroup, nonce);
+        } else {
+          // convert duplicate increment to get
+          List<Cell> results = region.get(ProtobufUtil.toGet(mutation, 
cellScanner), false,
+            nonceGroup, nonce);
+          r = Result.create(results);
+        }
         success = true;
       } finally {
-        endNonceOperation(mutation, nonceGroup, success);
+        if (canProceed) {
+          endNonceOperation(mutation, nonceGroup, success);
+        }
       }
       if (region.getCoprocessorHost() != null) {
         r = region.getCoprocessorHost().postIncrement(increment, r);

http://git-wip-us.apache.org/repos/asf/hbase/blob/62906b49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 81fb0b9..8827967 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -380,6 +380,17 @@ public interface Region extends ConfigurationObserver {
   List<Cell> get(Get get, boolean withCoprocessor) throws IOException;
 
   /**
+   * Do a get for duplicate non-idempotent operation.
+   * @param get query parameters.
+   * @param withCoprocessor
+   * @param nonceGroup Nonce group.
+   * @param nonce Nonce.
+   * @return list of cells resulting from the operation
+   * @throws IOException
+   */
+  List<Cell> get(Get get, boolean withCoprocessor, long nonceGroup, long 
nonce) throws IOException;
+
+  /**
    * Return all the data for the row that matches <i>row</i> exactly,
    * or the one that immediately preceeds it, at or immediately before
    * <i>ts</i>.

http://git-wip-us.apache.org/repos/asf/hbase/blob/62906b49/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java
index b2b656b..bd9dad9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ServerNonceManager.java
@@ -62,6 +62,8 @@ public class ServerNonceManager {
     private static final long WAITING_BIT = 4;
     private static final long ALL_FLAG_BITS = WAITING_BIT | STATE_BITS;
 
+    private long mvcc;
+
     @Override
     public String toString() {
       return "[state " + getState() + ", hasWait " + hasWait() + ", activity "
@@ -101,6 +103,14 @@ public class ServerNonceManager {
     private long getActivityTime() {
       return this.data >>> 3;
     }
+
+    public void setMvcc(long mvcc) {
+      this.mvcc = mvcc;
+    }
+
+    public long getMvcc() {
+      return this.mvcc;
+    }
   }
 
   /**
@@ -192,6 +202,39 @@ public class ServerNonceManager {
   }
 
   /**
+   * Store the write point in OperationContext when the operation succeed.
+   * @param group Nonce group.
+   * @param nonce Nonce.
+   * @param mvcc Write point of the succeed operation.
+   */
+  public void addMvccToOperationContext(long group, long nonce, long mvcc) {
+    if (nonce == HConstants.NO_NONCE) {
+      return;
+    }
+    NonceKey nk = new NonceKey(group, nonce);
+    OperationContext result = nonces.get(nk);
+    assert result != null;
+    synchronized (result) {
+      result.setMvcc(mvcc);
+    }
+  }
+
+  /**
+   * Return the write point of the previous succeed operation.
+   * @param group Nonce group.
+   * @param nonce Nonce.
+   * @return write point of the previous succeed operation.
+   */
+  public long getMvccFromOperationContext(long group, long nonce) {
+    if (nonce == HConstants.NO_NONCE) {
+      return Long.MAX_VALUE;
+    }
+    NonceKey nk = new NonceKey(group, nonce);
+    OperationContext result = nonces.get(nk);
+    return result == null ? Long.MAX_VALUE : result.getMvcc();
+  }
+
+  /**
    * Reports the operation from WAL during replay.
    * @param group Nonce group.
    * @param nonce Nonce.

http://git-wip-us.apache.org/repos/asf/hbase/blob/62906b49/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 1a7c2ef..7b22ba4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.RegionLocations;
@@ -28,7 +29,11 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Threads;
 import 
org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -216,4 +221,44 @@ public class HConnectionTestingUtility {
       return ConnectionManager.CONNECTION_INSTANCES.size();
     }
   }
+
+  /**
+   * This coproceesor sleep 2s at first increment/append rpc call.
+   */
+  public static class SleepAtFirstRpcCall extends BaseRegionObserver {
+    static final AtomicLong ct = new AtomicLong(0);
+    static final String SLEEP_TIME_CONF_KEY =
+        "hbase.coprocessor.SleepAtFirstRpcCall.sleepTime";
+    static final long DEFAULT_SLEEP_TIME = 2000;
+    static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
+
+    public SleepAtFirstRpcCall() {
+    }
+
+    @Override
+    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
+      RegionCoprocessorEnvironment env = e.getEnvironment();
+      Configuration conf = env.getConfiguration();
+      sleepTime.set(conf.getLong(SLEEP_TIME_CONF_KEY, DEFAULT_SLEEP_TIME));
+    }
+
+    @Override
+    public Result postIncrement(final 
ObserverContext<RegionCoprocessorEnvironment> e,
+        final Increment increment, final Result result) throws IOException {
+      if (ct.incrementAndGet() == 1) {
+        Threads.sleep(sleepTime.get());
+      }
+      return result;
+    }
+
+    @Override
+    public Result postAppend(final 
ObserverContext<RegionCoprocessorEnvironment> e,
+        final Append append, final Result result) throws IOException {
+      if (ct.incrementAndGet() == 1) {
+        Threads.sleep(sleepTime.get());
+      }
+      return result;
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/62906b49/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 7dfa633..fc47ae2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -32,6 +32,7 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -171,6 +172,50 @@ public class TestFromClientSide {
   }
 
   /**
+   * Test append result when there are duplicate rpc request.
+   */
+  @Test
+  public void testDuplicateAppend() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testDuplicateAppend");
+    Map<String, String> kvs = new HashMap<String, String>();
+    kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, 
"2000");
+    
hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(),
 null, 1, kvs);
+    TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
+
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
+    // Client will retry beacuse rpc timeout is small than the sleep time of 
first rpc call
+    c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
+
+    Connection connection = ConnectionFactory.createConnection(c);
+    Table t = 
connection.getTable(TableName.valueOf("HCM-testDuplicateAppend"));
+    if (t instanceof HTable) {
+      HTable table = (HTable) t;
+      table.setOperationTimeout(3 * 1000);
+
+      try {
+        Append append = new Append(ROW);
+        append.add(TEST_UTIL.fam1, QUALIFIER, VALUE);
+        Result result = table.append(append);
+
+        // Verify expected result
+        Cell[] cells = result.rawCells();
+        assertEquals(1, cells.length);
+        assertKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, VALUE);
+
+        // Verify expected result again
+        Result readResult = table.get(new Get(ROW));
+        cells = readResult.rawCells();
+        assertEquals(1, cells.length);
+        assertKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, VALUE);
+      } finally {
+        table.close();
+        connection.close();
+      }
+    }
+  }
+
+  /**
    * Basic client side validation of HBASE-4536
    */
    @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/62906b49/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
index 1568403..780fb19 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestIncrementsFromClientSide.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,6 +34,8 @@ import org.apache.hadoop.hbase.CategoryBasedTimeout;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
@@ -61,6 +65,7 @@ public class TestIncrementsFromClientSide {
   protected final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
   private static byte [] ROW = Bytes.toBytes("testRow");
   private static byte [] FAMILY = Bytes.toBytes("testFamily");
+  private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
   // This test depends on there being only one slave running at at a time. See 
the @Before
   // method where we do rolling restart.
   protected static int SLAVES = 1;
@@ -89,6 +94,49 @@ public class TestIncrementsFromClientSide {
     TEST_UTIL.shutdownMiniCluster();
   }
 
+  /**
+   * Test increment result when there are duplicate rpc request.
+   */
+  @Test
+  public void testDuplicateIncrement() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testDuplicateIncrement");
+    Map<String, String> kvs = new HashMap<String, String>();
+    kvs.put(HConnectionTestingUtility.SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, 
"2000");
+    
hdt.addCoprocessor(HConnectionTestingUtility.SleepAtFirstRpcCall.class.getName(),
 null, 1, kvs);
+    TEST_UTIL.createTable(hdt, new byte[][] { ROW }).close();
+
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50);
+    // Client will retry beacuse rpc timeout is small than the sleep time of 
first rpc call
+    c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500);
+
+    Connection connection = ConnectionFactory.createConnection(c);
+    Table t = 
connection.getTable(TableName.valueOf("HCM-testDuplicateIncrement"));
+    if (t instanceof HTable) {
+      HTable table = (HTable) t;
+      table.setOperationTimeout(3 * 1000);
+
+      try {
+        Increment inc = new Increment(ROW);
+        inc.addColumn(TEST_UTIL.fam1, QUALIFIER, 1);
+        Result result = table.increment(inc);
+
+        Cell [] cells = result.rawCells();
+        assertEquals(1, cells.length);
+        assertIncrementKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, 1);
+
+        // Verify expected result
+        Result readResult = table.get(new Get(ROW));
+        cells = readResult.rawCells();
+        assertEquals(1, cells.length);
+        assertIncrementKey(cells[0], ROW, TEST_UTIL.fam1, QUALIFIER, 1);
+      } finally {
+        table.close();
+        connection.close();
+      }
+    }
+  }
+
   @Test
   public void testIncrementWithDeletes() throws Exception {
     LOG.info("Starting " + this.name.getMethodName());

http://git-wip-us.apache.org/repos/asf/hbase/blob/62906b49/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
index 935f6e8..484bc0e 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java
@@ -516,14 +516,14 @@ public class TestMultiParallel {
       table.increment(inc);
       inc = new Increment(ONE_ROW);
       inc.addColumn(BYTES_FAMILY, QUALIFIER, 1L);
-      try {
-        table.increment(inc);
-        fail("Should have thrown an exception");
-      } catch (OperationConflictException ex) {
-      }
+
+      // duplicate increment
+      Result result = table.increment(inc);
+      validateResult(result, QUALIFIER, Bytes.toBytes(1L));
+
       Get get = new Get(ONE_ROW);
       get.addColumn(BYTES_FAMILY, QUALIFIER);
-      Result result = table.get(get);
+      result = table.get(get);
       validateResult(result, QUALIFIER, Bytes.toBytes(1L));
 
       // Now run a bunch of requests in parallel, exactly half should succeed.
@@ -551,7 +551,6 @@ public class TestMultiParallel {
             }
             try {
               table.increment(inc);
-            } catch (OperationConflictException ex) { // Some threads are 
expected to fail.
             } catch (IOException ioEx) {
               fail("Not expected");
             }

http://git-wip-us.apache.org/repos/asf/hbase/blob/62906b49/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index 9cb76dd..c85e41c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -492,7 +492,7 @@ public class TestScannerHeartbeatMessages {
     // Instantiate the custom heartbeat region scanners
     @Override
     protected RegionScanner instantiateRegionScanner(Scan scan,
-        List<KeyValueScanner> additionalScanners) throws IOException {
+        List<KeyValueScanner> additionalScanners, long nonceGroup, long nonce) 
throws IOException {
       if (scan.isReversed()) {
         if (scan.getFilter() != null) {
           scan.getFilter().setReversed(true);

Reply via email to