Repository: hbase Updated Branches: refs/heads/branch-1.3 f91a91247 -> 171f8f066
HBASE-21067 Backport HBASE-17519 (Rollback the removed cells) Signed-off-by: Andrew Purtell <apurt...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/171f8f06 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/171f8f06 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/171f8f06 Branch: refs/heads/branch-1.3 Commit: 171f8f066ec072475ae4454e9b3f5d545cee73a3 Parents: f91a912 Author: Nihal Jain <nihaljain...@gmail.com> Authored: Sun Aug 26 21:40:37 2018 +0530 Committer: Andrew Purtell <apurt...@apache.org> Committed: Fri Sep 28 17:08:03 2018 -0700 ---------------------------------------------------------------------- .../hbase/regionserver/DefaultMemStore.java | 13 +- .../hadoop/hbase/regionserver/HRegion.java | 97 +++-- .../hadoop/hbase/regionserver/HStore.java | 5 +- .../hadoop/hbase/regionserver/MemStore.java | 3 +- .../apache/hadoop/hbase/regionserver/Store.java | 3 +- .../hbase/client/TestRollbackFromClient.java | 357 +++++++++++++++++++ .../hbase/regionserver/TestDefaultMemStore.java | 6 +- 7 files changed, 438 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 31922ae..5f17d31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -494,7 +494,7 @@ public class DefaultMemStore implements MemStore { // 'now' and a 0 memstoreTS == immediately visible List<Cell> cells = new ArrayList<Cell>(1); cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))); - return upsert(cells, 1L); + return upsert(cells, 1L, null); } /** @@ -513,13 +513,14 @@ public class DefaultMemStore implements MemStore { * * @param cells * @param readpoint readpoint below which we can safely remove duplicate KVs + * @param removedCells collect the removed cells. It can be null. * @return change in memstore size */ @Override - public long upsert(Iterable<Cell> cells, long readpoint) { + public long upsert(Iterable<Cell> cells, long readpoint, List<Cell> removedCells) { long size = 0; for (Cell cell : cells) { - size += upsert(cell, readpoint); + size += upsert(cell, readpoint, removedCells); } return size; } @@ -537,8 +538,9 @@ public class DefaultMemStore implements MemStore { * * @param cell * @return change in size of MemStore + * @param removedCells collect the removed cells */ - private long upsert(Cell cell, long readpoint) { + private long upsert(Cell cell, long readpoint, List<Cell> removedCells) { // Add the Cell to the MemStore // Use the internalAdd method here since we (a) already have a lock // and (b) cannot safely use the MSLAB here without potentially @@ -577,6 +579,9 @@ public class DefaultMemStore implements MemStore { long delta = heapSizeChange(cur, true); addedSize -= delta; activeSection.getHeapSize().addAndGet(-delta); + if (removedCells != null) { + removedCells.add(cur); + } it.remove(); setOldestEditTimeToNow(); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 26a5958..193d874 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3927,11 +3927,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * the wal. This method is then invoked to rollback the memstore. */ private void rollbackMemstore(List<Cell> memstoreCells) { - int kvsRolledback = 0; + rollbackMemstore(null, memstoreCells); + } + private void rollbackMemstore(final Store defaultStore, List<Cell> memstoreCells) { + int kvsRolledback = 0; for (Cell cell : memstoreCells) { - byte[] family = CellUtil.cloneFamily(cell); - Store store = getStore(family); + Store store = defaultStore; + if (store == null) { + byte[] family = CellUtil.cloneFamily(cell); + store = getStore(family); + } store.rollback(cell); kvsRolledback++; } @@ -7535,7 +7541,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] row = mutate.getRow(); checkRow(row, op.toString()); checkFamilies(mutate.getFamilyCellMap().keySet()); - boolean flush = false; + Map<Store, List<Cell>> removedCellsForMemStore = new HashMap<>(); Durability durability = getEffectiveDurability(mutate.getDurability()); boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; @@ -7660,30 +7666,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Actually write to Memstore now - if (!tempMemstore.isEmpty()) { - for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - // Is this right? It immediately becomes visible? St.Ack 20150907 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - } else { - // otherwise keep older versions around - for (Cell cell: entry.getValue()) { - // This stamping of sequenceid seems redundant; it is happening down in - // FSHLog when we consume edits off the ring buffer. - CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber()); - size += store.add(cell); - doRollBackMemstore = true; - } + doRollBackMemstore = !tempMemstore.isEmpty(); + for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) { + Store store = entry.getKey(); + if (store.getFamily().getMaxVersions() == 1) { + List<Cell> removedCells = removedCellsForMemStore.get(store); + if (removedCells == null) { + removedCells = new ArrayList<>(); + removedCellsForMemStore.put(store, removedCells); + } + // upsert if VERSIONS for this CF == 1 + // Is this right? It immediately becomes visible? St.Ack 20150907 + size += store.upsert(entry.getValue(), getSmallestReadPoint(), removedCells); + } else { + // otherwise keep older versions around + for (Cell cell: entry.getValue()) { + // This stamping of sequenceid seems redundant; it is happening down in + // FSHLog when we consume edits off the ring buffer. + CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber()); + size += store.add(cell); } - // We add to all KVs here whereas when doing increment, we do it - // earlier... why? - allKVs.addAll(entry.getValue()); } - - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + // We add to all KVs here whereas when doing increment, we do it + // earlier... why? + allKVs.addAll(entry.getValue()); } } finally { this.updatesLock.readLock().unlock(); @@ -7709,7 +7715,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // if the wal sync was unsuccessful, remove keys from memstore WriteEntry we = walKey != null? walKey.getWriteEntry(): null; if (doRollBackMemstore) { - rollbackMemstore(allKVs); + for (Map.Entry<Store, List<Cell>> entry : tempMemstore.entrySet()) { + rollbackMemstore(entry.getKey(), entry.getValue()); + } + for (Map.Entry<Store, List<Cell>> entry : removedCellsForMemStore.entrySet()) { + Store currStore = entry.getKey(); + for (Cell cell: entry.getValue()) { + CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber()); + currStore.add(cell); + } + } if (we != null) mvcc.complete(we); } else if (we != null) { mvcc.completeAndWait(we); @@ -7722,11 +7737,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.metricsRegion.updateAppend(); } - if (flush) { // Request a cache flush. Do it outside update lock. + if (isFlushSize(this.addAndGetGlobalMemstoreSize(size))) { requestFlush(); } - return mutate.isReturnResults() ? Result.create(allKVs) : null; } @@ -7833,7 +7847,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean doRollBackMemstore = false; long accumulatedResultSize = 0; List<Cell> allKVs = new ArrayList<Cell>(increment.size()); - List<Cell> memstoreCells = new ArrayList<Cell>(); + Map<Store, List<Cell>> removedCellsForMemStore = new HashMap<>(); + Map<Store, List<Cell>> forMemStore = new HashMap<>(); Durability effectiveDurability = getEffectiveDurability(increment.getDurability()); try { rowLock = getRowLockInternal(increment.getRow()); @@ -7853,7 +7868,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALEdit walEdits = null; // Process increments a Store/family at a time. // Accumulate edits for memstore to add later after we've added to WAL. - Map<Store, List<Cell>> forMemStore = new HashMap<Store, List<Cell>>(); for (Map.Entry<byte [], List<Cell>> entry: increment.getFamilyCellMap().entrySet()) { byte [] columnFamilyName = entry.getKey(); List<Cell> increments = entry.getValue(); @@ -7895,20 +7909,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // Now write to memstore, a family at a time. + doRollBackMemstore = !forMemStore.isEmpty(); for (Map.Entry<Store, List<Cell>> entry: forMemStore.entrySet()) { Store store = entry.getKey(); List<Cell> results = entry.getValue(); if (store.getFamily().getMaxVersions() == 1) { + List<Cell> removedCells = removedCellsForMemStore.get(store); + if (removedCells == null) { + removedCells = new ArrayList<>(); + removedCellsForMemStore.put(store, removedCells); + } // Upsert if VERSIONS for this CF == 1 - accumulatedResultSize += store.upsert(results, getSmallestReadPoint()); - // TODO: St.Ack 20151222 Why no rollback in this case? + accumulatedResultSize += store.upsert(results, getSmallestReadPoint(), removedCells); } else { // Otherwise keep older versions around for (Cell cell: results) { // Why we need this? CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber()); accumulatedResultSize += store.add(cell); - doRollBackMemstore = true; } } } @@ -7934,7 +7952,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { - rollbackMemstore(memstoreCells); + for (Map.Entry<Store, List<Cell>> entry : forMemStore.entrySet()) { + rollbackMemstore(entry.getKey(), entry.getValue()); + } + for (Map.Entry<Store, List<Cell>> entry : removedCellsForMemStore.entrySet()) { + Store currStore = entry.getKey(); + for (Cell cell : entry.getValue()) { + CellUtil.setSequenceId(cell, walKey.getWriteEntry().getWriteNumber()); + currStore.add(cell); + } + } if (walKey != null) mvcc.complete(walKey.getWriteEntry()); } else { if (walKey != null) mvcc.completeAndWait(walKey.getWriteEntry()); http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 03dad03..5e07b6b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2406,10 +2406,11 @@ public class HStore implements Store { } @Override - public long upsert(Iterable<Cell> cells, long readpoint) throws IOException { + public long upsert(Iterable<Cell> cells, long readpoint, + List<Cell> removedCells) throws IOException { this.lock.readLock().lock(); try { - return this.memstore.upsert(cells, readpoint); + return this.memstore.upsert(cells, readpoint, removedCells); } finally { this.lock.readLock().unlock(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 658ba48..9f4c3ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -127,9 +127,10 @@ public interface MemStore extends HeapSize { * only see each KeyValue update as atomic. * @param cells * @param readpoint readpoint below which we can safely remove duplicate Cells. + * @param removedCells collect the removed cells. It can be null. * @return change in memstore size */ - long upsert(Iterable<Cell> cells, long readpoint); + long upsert(Iterable<Cell> cells, long readpoint, List<Cell> removedCells); /** * @return scanner over the memstore. This might include scanner over the snapshot when one is http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index e7a4de5..8238a97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -137,10 +137,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * across all of them. * @param cells * @param readpoint readpoint below which we can safely remove duplicate KVs + * @param removedCells collect the removed cells. It can be null. * @return memstore size delta * @throws IOException */ - long upsert(Iterable<Cell> cells, long readpoint) throws IOException; + long upsert(Iterable<Cell> cells, long readpoint, List<Cell> removedCells) throws IOException; /** * Adds a value to the memstore http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java new file mode 100644 index 0000000..5e45eea --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRollbackFromClient.java @@ -0,0 +1,357 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(SmallTests.class) +public class TestRollbackFromClient { + @Rule + public TestName name = new TestName(); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final int SLAVES = 3; + private static final byte[] ROW = Bytes.toBytes("testRow"); + private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte[] QUALIFIER_V2 = Bytes.toBytes("testQualifierV2"); + private static final byte[] VALUE = Bytes.toBytes("testValue"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, + FailedDefaultWALProvider.class.getName()); + TEST_UTIL.startMiniCluster(SLAVES); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testAppendRollback() throws IOException { + Updater updateForEmptyTable = new Updater() { + @Override + public int updateData(Table table, byte[] family) { + try { + Append append = new Append(ROW); + append.add(FAMILY, QUALIFIER, VALUE); + append.add(FAMILY, QUALIFIER_V2, VALUE); + FailedHLog.SHOULD_FAIL.set(true); + table.append(append); + } catch (IOException e) { + // It should fail because the WAL fail also + } finally { + FailedHLog.SHOULD_FAIL.set(false); + } + return 0; + } + }; + testRollback(updateForEmptyTable, 1, null); + testRollback(updateForEmptyTable, 2, null); + + final Append preAppend = new Append(ROW); + preAppend.add(FAMILY, QUALIFIER, VALUE); + Cell initCell = preAppend.getCellList(FAMILY).get(0); + Updater updateForNonEmptyTable = new Updater() { + @Override + public int updateData(Table table, byte[] family) throws IOException { + table.append(preAppend); + try { + Append append = new Append(ROW); + append.add(FAMILY, QUALIFIER, VALUE); + append.add(FAMILY, QUALIFIER_V2, VALUE); + FailedHLog.SHOULD_FAIL.set(true); + table.append(append); + Assert.fail("It should fail because the WAL sync is failed"); + } catch (IOException e) { + } finally { + FailedHLog.SHOULD_FAIL.set(false); + } + return 1; + } + }; + testRollback(updateForNonEmptyTable, 1, initCell); + testRollback(updateForNonEmptyTable, 2, initCell); + } + + @Test + public void testIncrementRollback() throws IOException { + Updater updateForEmptyTable = new Updater() { + @Override + public int updateData(Table table, byte[] family) { + try { + Increment inc = new Increment(ROW); + inc.addColumn(FAMILY, QUALIFIER, 1); + inc.addColumn(FAMILY, QUALIFIER_V2, 2); + FailedHLog.SHOULD_FAIL.set(true); + table.increment(inc); + } catch (IOException e) { + // It should fail because the WAL fail also + } finally { + FailedHLog.SHOULD_FAIL.set(false); + } + return 0; + } + }; + testRollback(updateForEmptyTable, 1, null); + testRollback(updateForEmptyTable, 2, null); + + final Increment preIncrement = new Increment(ROW); + preIncrement.addColumn(FAMILY, QUALIFIER, 1); + Cell initCell = preIncrement.getCellList(FAMILY).get(0); + Updater updateForNonEmptyTable = new Updater() { + @Override + public int updateData(Table table, byte[] family) throws IOException { + table.increment(preIncrement); + try { + Increment inc = new Increment(ROW); + inc.addColumn(FAMILY, QUALIFIER, 1); + inc.addColumn(FAMILY, QUALIFIER_V2, 2); + FailedHLog.SHOULD_FAIL.set(true); + table.increment(inc); + Assert.fail("It should fail because the WAL sync is failed"); + } catch (IOException e) { + } finally { + FailedHLog.SHOULD_FAIL.set(false); + } + return 1; + } + }; + testRollback(updateForNonEmptyTable, 1, initCell); + testRollback(updateForNonEmptyTable, 2, initCell); + } + + @Test + public void testPutRollback() throws IOException { + Updater updateForEmptyTable = new Updater() { + @Override + public int updateData(Table table, byte[] family) { + try { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, VALUE); + FailedHLog.SHOULD_FAIL.set(true); + table.put(put); + Assert.fail("It should fail because the WAL sync is failed"); + } catch (IOException e) { + } finally { + FailedHLog.SHOULD_FAIL.set(false); + } + return 0; + } + }; + testRollback(updateForEmptyTable, 1, null); + testRollback(updateForEmptyTable, 2, null); + + final Put prePut = new Put(ROW); + prePut.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("aaaaaaaaaaaaaaaaaaaaaa")); + Cell preCell = prePut.getCellList(FAMILY).get(0); + Updater updateForNonEmptyTable = new Updater() { + @Override + public int updateData(Table table, byte[] family) throws IOException { + table.put(prePut); + try { + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, VALUE); + FailedHLog.SHOULD_FAIL.set(true); + table.put(put); + Assert.fail("It should fail because the WAL sync is failed"); + } catch (IOException e) { + } finally { + FailedHLog.SHOULD_FAIL.set(false); + } + return 1; + } + }; + testRollback(updateForNonEmptyTable, 1, preCell); + testRollback(updateForNonEmptyTable, 2, preCell); + } + + private void testRollback(Updater updater, int versions, Cell initCell) throws IOException { + TableName tableName = TableName.valueOf(this.name.getMethodName()); + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor col = new HColumnDescriptor(FAMILY); + col.setMaxVersions(versions); + desc.addFamily(col); + TEST_UTIL.getHBaseAdmin().createTable(desc); + int expected; + List<Cell> cells; + try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); + Table table = conn.getTable(tableName)) { + expected = updater.updateData(table, FAMILY); + cells = getAllCells(table); + } + TEST_UTIL.getHBaseAdmin().disableTable(tableName); + TEST_UTIL.getHBaseAdmin().deleteTable(tableName); + assertEquals(expected, cells.size()); + if (initCell != null && cells.isEmpty()) { + Cell cell = cells.get(0); + assertTrue("row isn't matched", CellUtil.matchingRow(initCell, cell)); + assertTrue("column isn't matched", CellUtil.matchingColumn(initCell, cell)); + assertTrue("qualifier isn't matched", CellUtil.matchingQualifier(initCell, cell)); + assertTrue("value isn't matched", CellUtil.matchingValue(initCell, cell)); + } + } + + interface Updater { + int updateData(Table table, byte[] family) throws IOException; + } + + private static List<Cell> getAllCells(Table table) throws IOException { + List<Cell> cells = new ArrayList<>(); + try (ResultScanner scanner = table.getScanner(new Scan())) { + for (Result r : scanner) { + cells.addAll(r.listCells()); + } + return cells; + } + } + + public static class FailedDefaultWALProvider extends DefaultWALProvider { + @Override + public WAL getWAL(final byte[] identifier, byte[] namespace) throws IOException { + WAL wal = super.getWAL(identifier, namespace); + return new FailedHLog(wal); + } + } + + public static class FailedHLog implements WAL { + private static final AtomicBoolean SHOULD_FAIL = new AtomicBoolean(false); + private final WAL delegation; + + FailedHLog(final WAL delegation) { + this.delegation = delegation; + } + + @Override + public void registerWALActionsListener(WALActionsListener listener) { + delegation.registerWALActionsListener(listener); + } + + @Override + public boolean unregisterWALActionsListener(WALActionsListener listener) { + return delegation.unregisterWALActionsListener(listener); + } + + @Override + public byte[][] rollWriter() throws FailedLogCloseException, IOException { + return delegation.rollWriter(); + } + + @Override + public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + return delegation.rollWriter(force); + } + + @Override + public void shutdown() throws IOException { + delegation.shutdown(); + } + + @Override + public void close() throws IOException { + delegation.close(); + } + + @Override + public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits, + boolean inMemstore) throws IOException { + return delegation.append(htd, info, key, edits, inMemstore); + } + + @Override + public void sync() throws IOException { + delegation.sync(); + } + + @Override + public void sync(long txid) throws IOException { + if (SHOULD_FAIL.get()) { + throw new IOException("[TESTING] we need the failure!!!"); + } + delegation.sync(txid); + } + + @Override + public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) { + return delegation.startCacheFlush(encodedRegionName, families); + } + + @Override + public void completeCacheFlush(byte[] encodedRegionName) { + delegation.completeCacheFlush(encodedRegionName); + } + + @Override + public void abortCacheFlush(byte[] encodedRegionName) { + delegation.abortCacheFlush(encodedRegionName); + } + + @Override + public WALCoprocessorHost getCoprocessorHost() { + return delegation.getCoprocessorHost(); + } + + @Override + public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { + return delegation.getEarliestMemstoreSeqNum(encodedRegionName); + } + + @Override + public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) { + return delegation.getEarliestMemstoreSeqNum(encodedRegionName, familyName); + } + + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/171f8f06/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index ba192b6..5c6f5f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -864,7 +864,7 @@ public class TestDefaultMemStore extends TestCase { kv1.setSequenceId(1); kv2.setSequenceId(1);kv3.setSequenceId(1); l.add(kv1); l.add(kv2); l.add(kv3); - this.memstore.upsert(l, 2);// readpoint is 2 + this.memstore.upsert(l, 2, null);// readpoint is 2 long newSize = this.memstore.activeSection.getHeapSize().get(); assert(newSize > oldSize); //The kv1 should be removed. @@ -873,7 +873,7 @@ public class TestDefaultMemStore extends TestCase { KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); kv4.setSequenceId(1); l.clear(); l.add(kv4); - this.memstore.upsert(l, 3); + this.memstore.upsert(l, 3, null); assertEquals(newSize, this.memstore.activeSection.getHeapSize().get()); //The kv2 should be removed. assert(memstore.activeSection.getCellSkipListSet().size() == 2); @@ -917,7 +917,7 @@ public class TestDefaultMemStore extends TestCase { KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); kv1.setSequenceId(100); l.add(kv1); - memstore.upsert(l, 1000); + memstore.upsert(l, 1000, null); t = memstore.timeOfOldestEdit(); assertTrue(t == 1234); } finally {