Repository: hbase
Updated Branches:
  refs/heads/branch-1 bbaa0e851 -> e043d450e


HBASE-16768 Inconsistent results from the Append/Increment (ChiaPing Tsai)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e043d450
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e043d450
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e043d450

Branch: refs/heads/branch-1
Commit: e043d450edc57a32d4b748affc76b93293a768ea
Parents: bbaa0e8
Author: tedyu <[email protected]>
Authored: Sat Oct 8 13:12:21 2016 -0700
Committer: tedyu <[email protected]>
Committed: Sat Oct 8 13:12:21 2016 -0700

----------------------------------------------------------------------
 .../hbase/regionserver/DefaultMemStore.java     |  9 +++
 .../hadoop/hbase/regionserver/HRegion.java      | 61 ++++++++++++-------
 .../hadoop/hbase/regionserver/HStore.java       | 10 ++++
 .../hadoop/hbase/regionserver/MemStore.java     |  7 +++
 .../apache/hadoop/hbase/regionserver/Store.java |  7 +++
 .../hadoop/hbase/client/TestFromClientSide.java | 62 ++++++++++++++++++++
 6 files changed, 135 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e043d450/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
index 94ae510..8412d6e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
@@ -235,6 +235,15 @@ public class DefaultMemStore implements MemStore {
   }
 
   @Override
+  public long add(Iterable<Cell> cells) {
+    long size = 0;
+    for (Cell cell : cells) {
+      size += add(cell);
+    }
+    return size;
+  }
+
+  @Override
   public long timeOfOldestEdit() {
     return timeOfOldestEdit;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e043d450/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ed32536..c399f4a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3324,8 +3324,15 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
             != OperationStatusCode.NOT_RUN) {
           continue;
         }
+        // We need to update the sequence id for following reasons.
+        // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId 
won't stamp sequence id.
+        // 2) If no WAL, FSWALEntry won't be used
+        boolean updateSeqId = isInReplay || 
batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
+        if (updateSeqId) {
+          updateSequenceId(familyMaps[i].values(), mvccNum);
+        }
         doRollBackMemstore = true; // If we have a failure, we need to clean 
what we wrote
-        addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, 
isInReplay);
+        addedSize += applyFamilyMapToMemstore(familyMaps[i]);
       }
 
       // -------------------------------
@@ -3722,6 +3729,18 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     manifest.addRegion(this);
   }
 
+  private void updateSequenceId(final Iterable<List<Cell>> cellItr, final long 
sequenceId)
+          throws IOException {
+    for (List<Cell> cells : cellItr) {
+      if (cells == null) {
+        return;
+      }
+      for (Cell cell : cells) {
+        CellUtil.setSequenceId(cell, sequenceId);
+      }
+    }
+  }
+
   @Override
   public void updateCellTimestamps(final Iterable<List<Cell>> cellItr, final 
byte[] now)
       throws IOException {
@@ -3846,8 +3865,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
    * new entries.
    * @throws IOException
    */
-  private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap,
-    long mvccNum, boolean isInReplay) throws IOException {
+  private long applyFamilyMapToMemstore(Map<byte[], List<Cell>> familyMap) 
throws IOException {
     long size = 0;
 
     for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
@@ -3855,14 +3873,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       List<Cell> cells = e.getValue();
       assert cells instanceof RandomAccess;
       Store store = getStore(family);
-      int listSize = cells.size();
-      for (int i=0; i < listSize; i++) {
-        Cell cell = cells.get(i);
-        if (cell.getSequenceId() == 0 || isInReplay) {
-          CellUtil.setSequenceId(cell, mvccNum);
-        }
-        size += store.add(cell);
-      }
+      size += store.add(cells);
     }
 
      return size;
@@ -7520,9 +7531,12 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
               recordMutationWithoutWal(mutate.getFamilyCellMap());
             }
           }
+          boolean updateSeqId = false;
           if (walKey == null) {
             // Append a faked WALEdit in order for SKIP_WAL updates to get 
mvcc assigned
             walKey = this.appendEmptyEdit(this.wal);
+            // If no WAL, FSWALEntry won't be used and no update for sequence 
id
+            updateSeqId = true;
           }
           // Do a get on the write entry... this will block until sequenceid 
is assigned... w/o it,
           // TestAtomicOperation fails.
@@ -7533,21 +7547,21 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
               writeEntry.getWriteNumber());
           }
 
+          if (updateSeqId) {
+            updateSequenceId(tempMemstore.values(), 
writeEntry.getWriteNumber());
+          }
+
           // Actually write to Memstore now
           if (!tempMemstore.isEmpty()) {
             for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) 
{
               Store store = entry.getKey();
               if (store.getFamily().getMaxVersions() == 1) {
                 // upsert if VERSIONS for this CF == 1
-                // Is this right? It immediately becomes visible? St.Ack 
20150907
                 size += store.upsert(entry.getValue(), getSmallestReadPoint());
               } else {
                 // otherwise keep older versions around
-                for (Cell cell: entry.getValue()) {
-                  // This stamping of sequenceid seems redundant; it is 
happening down in
-                  // FSHLog when we consume edits off the ring buffer.
-                  CellUtil.setSequenceId(cell, 
walKey.getWriteEntry().getWriteNumber());
-                  size += store.add(cell);
+                size += store.add(entry.getValue());
+                if (!entry.getValue().isEmpty()) {
                   doRollBackMemstore = true;
                 }
               }
@@ -7746,6 +7760,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
               }
             }
           }
+          boolean updateSeqId = false;
           // Actually write to WAL now. If walEdits is non-empty, we write the 
WAL.
           if (walEdits != null && !walEdits.isEmpty()) {
             // Using default cluster id, as this can only happen in the 
originating cluster.
@@ -7759,6 +7774,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           } else {
             // Append a faked WALEdit in order for SKIP_WAL updates to get 
mvccNum assigned
             walKey = this.appendEmptyEdit(this.wal);
+            // If no WAL, FSWALEntry won't be used and no update for sequence 
id
+            updateSeqId = true;
           }
           // Get WriteEntry. Will wait on assign of the sequence id.
           WriteEntry writeEntry = walKey.getWriteEntry();
@@ -7768,6 +7785,10 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
               writeEntry.getWriteNumber());
           }
 
+          if (updateSeqId) {
+            updateSequenceId(forMemStore.values(), 
writeEntry.getWriteNumber());
+          }
+
           // Now write to memstore, a family at a time.
           for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) {
             Store store = entry.getKey();
@@ -7778,10 +7799,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
               // TODO: St.Ack 20151222 Why no rollback in this case?
             } else {
               // Otherwise keep older versions around
-              for (Cell cell: results) {
-                // Why we need this?
-                CellUtil.setSequenceId(cell, 
walKey.getWriteEntry().getWriteNumber());
-                accumulatedResultSize += store.add(cell);
+              accumulatedResultSize += store.add(entry.getValue());
+              if (!entry.getValue().isEmpty()) {
                 doRollBackMemstore = true;
               }
             }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e043d450/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index b598f09..6ee6bb5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -666,6 +666,16 @@ public class HStore implements Store {
   }
 
   @Override
+  public long add(Iterable<Cell> cells) {
+    lock.readLock().lock();
+    try {
+       return this.memstore.add(cells);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
   public long timeOfOldestEdit() {
     return memstore.timeOfOldestEdit();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e043d450/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
index 4901b2a..a885d79 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
@@ -73,6 +73,13 @@ public interface MemStore extends HeapSize {
   long add(final Cell cell);
 
   /**
+   * Write the updates
+   * @param cells
+   * @return approximate size of the passed cell.
+   */
+  long add(Iterable<Cell> cells);
+
+  /**
    * @return Oldest timestamp of all the Cells in the MemStore
    */
   long timeOfOldestEdit();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e043d450/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 1b41eb0..c78e0e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -151,6 +151,13 @@ public interface Store extends HeapSize, 
StoreConfigInformation, PropagatingConf
   long add(Cell cell);
 
   /**
+   * Adds the specified value to the memstore
+   * @param cells
+   * @return memstore size delta
+   */
+  long add(Iterable<Cell> cells);
+
+  /**
    * When was the last edit done in the memstore
    */
   long timeOfOldestEdit();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e043d450/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 039c781..69fdf6a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -4120,6 +4121,67 @@ public class TestFromClientSide {
     assertEquals(true, ok);
   }
 
+  private List<Result> doAppend(final boolean walUsed) throws IOException {
+    LOG.info("Starting testAppend, walUsed is " + walUsed);
+    final TableName TABLENAME = TableName.valueOf(walUsed ? 
"testAppendWithWAL" : "testAppendWithoutWAL");
+    Table t = TEST_UTIL.createTable(TABLENAME, FAMILY);
+    final byte[] row1 = Bytes.toBytes("c");
+    final byte[] row2 = Bytes.toBytes("b");
+    final byte[] row3 = Bytes.toBytes("a");
+    final byte[] qual = Bytes.toBytes("qual");
+    Put put_0 = new Put(row2);
+    put_0.addColumn(FAMILY, qual, Bytes.toBytes("put"));
+    Put put_1 = new Put(row3);
+    put_1.addColumn(FAMILY, qual, Bytes.toBytes("put"));
+    Append append_0 = new Append(row1);
+    append_0.add(FAMILY, qual, Bytes.toBytes("i"));
+    Append append_1 = new Append(row1);
+    append_1.add(FAMILY, qual, Bytes.toBytes("k"));
+    Append append_2 = new Append(row1);
+    append_2.add(FAMILY, qual, Bytes.toBytes("e"));
+    if (!walUsed) {
+      append_2.setDurability(Durability.SKIP_WAL);
+    }
+    Append append_3 = new Append(row1);
+    append_3.add(FAMILY, qual, Bytes.toBytes("a"));
+    Scan s = new Scan();
+    s.setCaching(1);
+    t.append(append_0);
+    t.put(put_0);
+    t.put(put_1);
+    List<Result> results = new LinkedList<>();
+    try (ResultScanner scanner = t.getScanner(s)) {
+      t.append(append_1);
+      t.append(append_2);
+      t.append(append_3);
+      for (Result r : scanner) {
+        results.add(r);
+      }
+    }
+    TEST_UTIL.deleteTable(TABLENAME);
+    return results;
+  }
+
+  @Test
+  public void testAppendWithoutWAL() throws Exception {
+    List<Result> resultsWithWal = doAppend(true);
+    List<Result> resultsWithoutWal = doAppend(false);
+    assertEquals(resultsWithWal.size(), resultsWithoutWal.size());
+    for (int i = 0; i != resultsWithWal.size(); ++i) {
+      Result resultWithWal = resultsWithWal.get(i);
+      Result resultWithoutWal = resultsWithoutWal.get(i);
+      assertEquals(resultWithWal.rawCells().length, 
resultWithoutWal.rawCells().length);
+      for (int j = 0; j != resultWithWal.rawCells().length; ++j) {
+        Cell cellWithWal = resultWithWal.rawCells()[j];
+        Cell cellWithoutWal = resultWithoutWal.rawCells()[j];
+        assertTrue(Bytes.equals(CellUtil.cloneRow(cellWithWal), 
CellUtil.cloneRow(cellWithoutWal)));
+        assertTrue(Bytes.equals(CellUtil.cloneFamily(cellWithWal), 
CellUtil.cloneFamily(cellWithoutWal)));
+        assertTrue(Bytes.equals(CellUtil.cloneQualifier(cellWithWal), 
CellUtil.cloneQualifier(cellWithoutWal)));
+        assertTrue(Bytes.equals(CellUtil.cloneValue(cellWithWal), 
CellUtil.cloneValue(cellWithoutWal)));
+      }
+    }
+  }
+
   /**
    * test for HBASE-737
    * @throws IOException

Reply via email to