Repository: hbase
Updated Branches:
  refs/heads/branch-1 4afae59cf -> 5ff108d0e


http://git-wip-us.apache.org/repos/asf/hbase/blob/5ff108d0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java
index 98f0985..0dfe355 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java
@@ -19,15 +19,19 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 
 /**
- * Last flushed sequence Ids for the regions on region server
+ * Last flushed sequence Ids for the regions and their stores on region server
  */
 @InterfaceAudience.Private
 public interface LastSequenceId {
+
   /**
    * @param encodedRegionName Encoded region name
-   * @return Last flushed sequence Id for region or -1 if it can't be 
determined
+   * @return Last flushed sequence Id for region and its stores. Id will be -1 
if it can't be
+   *         determined
    */
-  long getLastSequenceId(byte[] encodedRegionName);
+  RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName);
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ff108d0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index 147a13d..1ea9d4f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -56,6 +56,7 @@ class FSWALEntry extends Entry {
   private final transient HTableDescriptor htd;
   private final transient HRegionInfo hri;
   private final transient List<Cell> memstoreCells;
+  private final Set<byte[]> familyNames;
 
   FSWALEntry(final long sequence, final WALKey key, final WALEdit edit,
       final AtomicLong referenceToRegionSequenceId, final boolean inMemstore,
@@ -67,6 +68,23 @@ class FSWALEntry extends Entry {
     this.hri = hri;
     this.sequence = sequence;
     this.memstoreCells = memstoreCells;
+    if (inMemstore) {
+      // construct familyNames here to reduce the work of log sinker.
+      ArrayList<Cell> cells = this.getEdit().getCells();
+      if (CollectionUtils.isEmpty(cells)) {
+        this.familyNames = Collections.<byte[]> emptySet();
+      } else {
+        Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
+        for (Cell cell : cells) {
+          if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+            familySet.add(CellUtil.cloneFamily(cell));
+          }
+        }
+        this.familyNames = Collections.unmodifiableSet(familySet);
+      }
+    } else {
+      this.familyNames = Collections.<byte[]> emptySet();
+    }
   }
 
   public String toString() {
@@ -118,16 +136,6 @@ class FSWALEntry extends Entry {
    * @return the family names which are effected by this edit.
    */
   Set<byte[]> getFamilyNames() {
-    ArrayList<Cell> cells = this.getEdit().getCells();
-    if (CollectionUtils.isEmpty(cells)) {
-      return Collections.<byte[]>emptySet();
-    }
-    Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR);
-    for (Cell cell : cells) {
-      if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
-        familySet.add(CellUtil.cloneFamily(cell));
-      }
-    }
-    return familySet;
+    return familyNames;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ff108d0/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index ff205c6..75e1dea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -71,9 +71,6 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.TableStateManager;
-import org.apache.hadoop.hbase.Tag;
-import org.apache.hadoop.hbase.TagRewriteCell;
-import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.Delete;
@@ -96,12 +93,12 @@ import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoReque
 import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
 import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
-import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
 // imports for things that haven't moved from regionserver.wal yet.
@@ -123,10 +120,6 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
 import org.apache.hadoop.io.MultipleIOException;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ServiceException;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -342,7 +335,14 @@ public class WALSplitter {
               lastFlushedSequenceId = ids.getLastFlushedSequenceId();
             }
           } else if (sequenceIdChecker != null) {
-            lastFlushedSequenceId = 
sequenceIdChecker.getLastSequenceId(region);
+            RegionStoreSequenceIds ids = 
sequenceIdChecker.getLastSequenceId(region);
+            Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], 
Long>(Bytes.BYTES_COMPARATOR);
+            for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
+              maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
+                storeSeqId.getSequenceId());
+            }
+            regionMaxSeqIdInStores.put(key, maxSeqIdInStores);
+            lastFlushedSequenceId = ids.getLastFlushedSequenceId();
           }
           if (lastFlushedSequenceId == null) {
             lastFlushedSequenceId = -1L;
@@ -1494,6 +1494,29 @@ public class WALSplitter {
       return (new WriterAndPath(regionedits, w));
     }
 
+    private void filterCellByStore(Entry logEntry) {
+      Map<byte[], Long> maxSeqIdInStores =
+          
regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
+      if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
+        return;
+      }
+      List<Cell> skippedCells = new ArrayList<Cell>();
+      for (Cell cell : logEntry.getEdit().getCells()) {
+        if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+          byte[] family = CellUtil.cloneFamily(cell);
+          Long maxSeqId = maxSeqIdInStores.get(family);
+          // Do not skip cell even if maxSeqId is null. Maybe we are in a 
rolling upgrade,
+          // or the master was crashed before and we can not get the 
information.
+          if (maxSeqId != null && maxSeqId.longValue() >= 
logEntry.getKey().getLogSeqNum()) {
+            skippedCells.add(cell);
+          }
+        }
+      }
+      if (!skippedCells.isEmpty()) {
+        logEntry.getEdit().getCells().removeAll(skippedCells);
+      }
+    }
+
     @Override
     public void append(RegionEntryBuffer buffer) throws IOException {
       List<Entry> entries = buffer.entryBuffer;
@@ -1518,7 +1541,10 @@ public class WALSplitter {
               return;
             }
           }
-          wap.w.append(logEntry);
+          filterCellByStore(logEntry);
+          if (!logEntry.getEdit().isEmpty()) {
+            wap.w.append(logEntry);
+          }
           this.updateRegionMaximumEditLogSeqNum(logEntry);
           editsCount++;
         }
@@ -1708,8 +1734,8 @@ public class WALSplitter {
         HConnection hconn = this.getConnectionByTableName(table);
 
         for (Cell cell : cells) {
-          byte[] row = cell.getRow();
-          byte[] family = cell.getFamily();
+          byte[] row = CellUtil.cloneRow(cell);
+          byte[] family = CellUtil.cloneFamily(cell);
           boolean isCompactionEntry = false;
           if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
             CompactionDescriptor compaction = WALEdit.getCompaction(cell);

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ff108d0/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
index ac942f2..376a945 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java
@@ -26,12 +26,12 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import 
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.zookeeper.KeeperException;
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ff108d0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java
index 0f7c281..e4be517 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -86,14 +87,20 @@ public class TestGetLastFlushedSequenceId {
     }
     assertNotNull(region);
     Thread.sleep(2000);
-    assertEquals(
-      HConstants.NO_SEQNUM,
-      testUtil.getHBaseCluster().getMaster()
-          .getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes()));
+    RegionStoreSequenceIds ids =
+        testUtil.getHBaseCluster().getMaster()
+            .getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes());
+    assertEquals(HConstants.NO_SEQNUM, ids.getLastFlushedSequenceId());
+    long storeSequenceId = ids.getStoreSequenceId(0).getSequenceId();
+    assertTrue(storeSequenceId > 0);
     testUtil.getHBaseAdmin().flush(tableName);
     Thread.sleep(2000);
-    assertTrue(testUtil.getHBaseCluster().getMaster()
-        .getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes()) > 
0);
+    ids =
+        testUtil.getHBaseCluster().getMaster()
+            .getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes());
+    assertTrue(ids.getLastFlushedSequenceId() + " > " + storeSequenceId,
+      ids.getLastFlushedSequenceId() > storeSequenceId);
+    assertEquals(ids.getLastFlushedSequenceId(), 
ids.getStoreSequenceId(0).getSequenceId());
     table.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ff108d0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
index 498ec57..1e9ffdd 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java
@@ -73,18 +73,18 @@ public class TestPerColumnFamilyFlush {
 
   public static final TableName TABLENAME = 
TableName.valueOf("TestPerColumnFamilyFlush", "t1");
 
-  public static final byte[][] families = { Bytes.toBytes("f1"), 
Bytes.toBytes("f2"),
+  public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), 
Bytes.toBytes("f2"),
       Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") };
 
-  public static final byte[] FAMILY1 = families[0];
+  public static final byte[] FAMILY1 = FAMILIES[0];
 
-  public static final byte[] FAMILY2 = families[1];
+  public static final byte[] FAMILY2 = FAMILIES[1];
 
-  public static final byte[] FAMILY3 = families[2];
+  public static final byte[] FAMILY3 = FAMILIES[2];
 
   private void initHRegion(String callingMethod, Configuration conf) throws 
IOException {
     HTableDescriptor htd = new HTableDescriptor(TABLENAME);
-    for (byte[] family : families) {
+    for (byte[] family : FAMILIES) {
       htd.addFamily(new HColumnDescriptor(family));
     }
     HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false);
@@ -98,7 +98,7 @@ public class TestPerColumnFamilyFlush {
     byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum);
     byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
     Put p = new Put(row);
-    p.add(families[familyNum - 1], qf, val);
+    p.addColumn(FAMILIES[familyNum - 1], qf, val);
     return p;
   }
 
@@ -111,7 +111,7 @@ public class TestPerColumnFamilyFlush {
   // A helper function to verify edits.
   void verifyEdit(int familyNum, int putNum, HTable table) throws IOException {
     Result r = table.get(createGet(familyNum, putNum));
-    byte[] family = families[familyNum - 1];
+    byte[] family = FAMILIES[familyNum - 1];
     byte[] qf = Bytes.toBytes("q" + familyNum);
     byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum);
     assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), 
r.getFamilyMap(family));
@@ -327,7 +327,7 @@ public class TestPerColumnFamilyFlush {
     return null;
   }
 
-  public void doTestLogReplay() throws Exception {
+  private void doTestLogReplay() throws Exception {
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000);
     // Carefully chosen limits so that the memstore just flushes when we're 
done
@@ -338,10 +338,10 @@ public class TestPerColumnFamilyFlush {
       TEST_UTIL.startMiniCluster(numRegionServers);
       TEST_UTIL.getHBaseAdmin().createNamespace(
           
NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build());
-      HTable table = TEST_UTIL.createTable(TABLENAME, families);
+      HTable table = TEST_UTIL.createTable(TABLENAME, FAMILIES);
       HTableDescriptor htd = table.getTableDescriptor();
 
-      for (byte[] family : families) {
+      for (byte[] family : FAMILIES) {
         if (!htd.hasFamily(family)) {
           htd.addFamily(new HColumnDescriptor(family));
         }
@@ -455,7 +455,7 @@ public class TestPerColumnFamilyFlush {
     try {
       TEST_UTIL.startMiniCluster(numRegionServers);
       HTable table = null;
-      table = TEST_UTIL.createTable(tableName, families);
+      table = TEST_UTIL.createTable(tableName, FAMILIES);
       // Force flush the namespace table so edits to it are not hanging around 
as oldest
       // edits. Otherwise, below, when we make maximum number of WAL files, 
then it will be
       // the namespace region that is flushed and not the below 
'desiredRegion'.
@@ -521,9 +521,9 @@ public class TestPerColumnFamilyFlush {
       rand.nextBytes(value1);
       rand.nextBytes(value2);
       rand.nextBytes(value3);
-      put.add(FAMILY1, qf, value1);
-      put.add(FAMILY2, qf, value2);
-      put.add(FAMILY3, qf, value3);
+      put.addColumn(FAMILY1, qf, value1);
+      put.addColumn(FAMILY2, qf, value2);
+      put.addColumn(FAMILY3, qf, value3);
       table.put(put);
       // slow down to let regionserver flush region.
       while (region.getMemstoreSize().get() > memstoreFlushSize) {
@@ -650,9 +650,9 @@ public class TestPerColumnFamilyFlush {
       rand.nextBytes(value1);
       rand.nextBytes(value2);
       rand.nextBytes(value3);
-      put.add(FAMILY1, qf, value1);
-      put.add(FAMILY2, qf, value2);
-      put.add(FAMILY3, qf, value3);
+      put.addColumn(FAMILY1, qf, value1);
+      put.addColumn(FAMILY2, qf, value2);
+      put.addColumn(FAMILY3, qf, value3);
       table.put(put);
       if (i % 10000 == 0) {
         LOG.info(i + " rows put");

Reply via email to