Repository: hbase Updated Branches: refs/heads/branch-1 4afae59cf -> 5ff108d0e
http://git-wip-us.apache.org/repos/asf/hbase/blob/5ff108d0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java index 98f0985..0dfe355 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LastSequenceId.java @@ -19,15 +19,19 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; /** - * Last flushed sequence Ids for the regions on region server + * Last flushed sequence Ids for the regions and their stores on region server */ @InterfaceAudience.Private public interface LastSequenceId { + /** * @param encodedRegionName Encoded region name - * @return Last flushed sequence Id for region or -1 if it can't be determined + * @return Last flushed sequence Id for region and its stores. Id will be -1 if it can't be + * determined */ - long getLastSequenceId(byte[] encodedRegionName); + RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName); + } http://git-wip-us.apache.org/repos/asf/hbase/blob/5ff108d0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 147a13d..1ea9d4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -56,6 +56,7 @@ class FSWALEntry extends Entry { private final transient HTableDescriptor htd; private final transient HRegionInfo hri; private final transient List<Cell> memstoreCells; + private final Set<byte[]> familyNames; FSWALEntry(final long sequence, final WALKey key, final WALEdit edit, final AtomicLong referenceToRegionSequenceId, final boolean inMemstore, @@ -67,6 +68,23 @@ class FSWALEntry extends Entry { this.hri = hri; this.sequence = sequence; this.memstoreCells = memstoreCells; + if (inMemstore) { + // construct familyNames here to reduce the work of log sinker. + ArrayList<Cell> cells = this.getEdit().getCells(); + if (CollectionUtils.isEmpty(cells)) { + this.familyNames = Collections.<byte[]> emptySet(); + } else { + Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR); + for (Cell cell : cells) { + if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + familySet.add(CellUtil.cloneFamily(cell)); + } + } + this.familyNames = Collections.unmodifiableSet(familySet); + } + } else { + this.familyNames = Collections.<byte[]> emptySet(); + } } public String toString() { @@ -118,16 +136,6 @@ class FSWALEntry extends Entry { * @return the family names which are effected by this edit. */ Set<byte[]> getFamilyNames() { - ArrayList<Cell> cells = this.getEdit().getCells(); - if (CollectionUtils.isEmpty(cells)) { - return Collections.<byte[]>emptySet(); - } - Set<byte[]> familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR); - for (Cell cell : cells) { - if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { - familySet.add(CellUtil.cloneFamily(cell)); - } - } - return familySet; + return familyNames; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5ff108d0/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index ff205c6..75e1dea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -71,9 +71,6 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableStateManager; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.TagRewriteCell; -import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Delete; @@ -96,12 +93,12 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoReque import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; +import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.LastSequenceId; // imports for things that haven't moved from regionserver.wal yet. @@ -123,10 +120,6 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.io.MultipleIOException; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.protobuf.ServiceException; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -342,7 +335,14 @@ public class WALSplitter { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); } } else if (sequenceIdChecker != null) { - lastFlushedSequenceId = sequenceIdChecker.getLastSequenceId(region); + RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); + Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); + for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) { + maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), + storeSeqId.getSequenceId()); + } + regionMaxSeqIdInStores.put(key, maxSeqIdInStores); + lastFlushedSequenceId = ids.getLastFlushedSequenceId(); } if (lastFlushedSequenceId == null) { lastFlushedSequenceId = -1L; @@ -1494,6 +1494,29 @@ public class WALSplitter { return (new WriterAndPath(regionedits, w)); } + private void filterCellByStore(Entry logEntry) { + Map<byte[], Long> maxSeqIdInStores = + regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); + if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) { + return; + } + List<Cell> skippedCells = new ArrayList<Cell>(); + for (Cell cell : logEntry.getEdit().getCells()) { + if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + byte[] family = CellUtil.cloneFamily(cell); + Long maxSeqId = maxSeqIdInStores.get(family); + // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, + // or the master was crashed before and we can not get the information. + if (maxSeqId != null && maxSeqId.longValue() >= logEntry.getKey().getLogSeqNum()) { + skippedCells.add(cell); + } + } + } + if (!skippedCells.isEmpty()) { + logEntry.getEdit().getCells().removeAll(skippedCells); + } + } + @Override public void append(RegionEntryBuffer buffer) throws IOException { List<Entry> entries = buffer.entryBuffer; @@ -1518,7 +1541,10 @@ public class WALSplitter { return; } } - wap.w.append(logEntry); + filterCellByStore(logEntry); + if (!logEntry.getEdit().isEmpty()) { + wap.w.append(logEntry); + } this.updateRegionMaximumEditLogSeqNum(logEntry); editsCount++; } @@ -1708,8 +1734,8 @@ public class WALSplitter { HConnection hconn = this.getConnectionByTableName(table); for (Cell cell : cells) { - byte[] row = cell.getRow(); - byte[] family = cell.getFamily(); + byte[] row = CellUtil.cloneRow(cell); + byte[] family = CellUtil.cloneFamily(cell); boolean isCompactionEntry = false; if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { CompactionDescriptor compaction = WALEdit.getCompaction(cell); http://git-wip-us.apache.org/repos/asf/hbase/blob/5ff108d0/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java index ac942f2..376a945 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKSplitLog.java @@ -26,12 +26,12 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds; +import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.zookeeper.KeeperException; /** http://git-wip-us.apache.org/repos/asf/hbase/blob/5ff108d0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java index 0f7c281..e4be517 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestGetLastFlushedSequenceId.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -86,14 +87,20 @@ public class TestGetLastFlushedSequenceId { } assertNotNull(region); Thread.sleep(2000); - assertEquals( - HConstants.NO_SEQNUM, - testUtil.getHBaseCluster().getMaster() - .getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes())); + RegionStoreSequenceIds ids = + testUtil.getHBaseCluster().getMaster() + .getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes()); + assertEquals(HConstants.NO_SEQNUM, ids.getLastFlushedSequenceId()); + long storeSequenceId = ids.getStoreSequenceId(0).getSequenceId(); + assertTrue(storeSequenceId > 0); testUtil.getHBaseAdmin().flush(tableName); Thread.sleep(2000); - assertTrue(testUtil.getHBaseCluster().getMaster() - .getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes()) > 0); + ids = + testUtil.getHBaseCluster().getMaster() + .getLastSequenceId(region.getRegionInfo().getEncodedNameAsBytes()); + assertTrue(ids.getLastFlushedSequenceId() + " > " + storeSequenceId, + ids.getLastFlushedSequenceId() > storeSequenceId); + assertEquals(ids.getLastFlushedSequenceId(), ids.getStoreSequenceId(0).getSequenceId()); table.close(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5ff108d0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java index 498ec57..1e9ffdd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -73,18 +73,18 @@ public class TestPerColumnFamilyFlush { public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1"); - public static final byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), + public static final byte[][] FAMILIES = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; - public static final byte[] FAMILY1 = families[0]; + public static final byte[] FAMILY1 = FAMILIES[0]; - public static final byte[] FAMILY2 = families[1]; + public static final byte[] FAMILY2 = FAMILIES[1]; - public static final byte[] FAMILY3 = families[2]; + public static final byte[] FAMILY3 = FAMILIES[2]; private void initHRegion(String callingMethod, Configuration conf) throws IOException { HTableDescriptor htd = new HTableDescriptor(TABLENAME); - for (byte[] family : families) { + for (byte[] family : FAMILIES) { htd.addFamily(new HColumnDescriptor(family)); } HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); @@ -98,7 +98,7 @@ public class TestPerColumnFamilyFlush { byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); Put p = new Put(row); - p.add(families[familyNum - 1], qf, val); + p.addColumn(FAMILIES[familyNum - 1], qf, val); return p; } @@ -111,7 +111,7 @@ public class TestPerColumnFamilyFlush { // A helper function to verify edits. void verifyEdit(int familyNum, int putNum, HTable table) throws IOException { Result r = table.get(createGet(familyNum, putNum)); - byte[] family = families[familyNum - 1]; + byte[] family = FAMILIES[familyNum - 1]; byte[] qf = Bytes.toBytes("q" + familyNum); byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); @@ -327,7 +327,7 @@ public class TestPerColumnFamilyFlush { return null; } - public void doTestLogReplay() throws Exception { + private void doTestLogReplay() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000); // Carefully chosen limits so that the memstore just flushes when we're done @@ -338,10 +338,10 @@ public class TestPerColumnFamilyFlush { TEST_UTIL.startMiniCluster(numRegionServers); TEST_UTIL.getHBaseAdmin().createNamespace( NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); - HTable table = TEST_UTIL.createTable(TABLENAME, families); + HTable table = TEST_UTIL.createTable(TABLENAME, FAMILIES); HTableDescriptor htd = table.getTableDescriptor(); - for (byte[] family : families) { + for (byte[] family : FAMILIES) { if (!htd.hasFamily(family)) { htd.addFamily(new HColumnDescriptor(family)); } @@ -455,7 +455,7 @@ public class TestPerColumnFamilyFlush { try { TEST_UTIL.startMiniCluster(numRegionServers); HTable table = null; - table = TEST_UTIL.createTable(tableName, families); + table = TEST_UTIL.createTable(tableName, FAMILIES); // Force flush the namespace table so edits to it are not hanging around as oldest // edits. Otherwise, below, when we make maximum number of WAL files, then it will be // the namespace region that is flushed and not the below 'desiredRegion'. @@ -521,9 +521,9 @@ public class TestPerColumnFamilyFlush { rand.nextBytes(value1); rand.nextBytes(value2); rand.nextBytes(value3); - put.add(FAMILY1, qf, value1); - put.add(FAMILY2, qf, value2); - put.add(FAMILY3, qf, value3); + put.addColumn(FAMILY1, qf, value1); + put.addColumn(FAMILY2, qf, value2); + put.addColumn(FAMILY3, qf, value3); table.put(put); // slow down to let regionserver flush region. while (region.getMemstoreSize().get() > memstoreFlushSize) { @@ -650,9 +650,9 @@ public class TestPerColumnFamilyFlush { rand.nextBytes(value1); rand.nextBytes(value2); rand.nextBytes(value3); - put.add(FAMILY1, qf, value1); - put.add(FAMILY2, qf, value2); - put.add(FAMILY3, qf, value3); + put.addColumn(FAMILY1, qf, value1); + put.addColumn(FAMILY2, qf, value2); + put.addColumn(FAMILY3, qf, value3); table.put(put); if (i % 10000 == 0) { LOG.info(i + " rows put");