http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index b52a258..013f0ef 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; -import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -204,7 +204,7 @@ class MockRegionServerServices implements RegionServerServices { } @Override - public HLog getWAL(HRegionInfo regionInfo) throws IOException { + public WAL getWAL(HRegionInfo regionInfo) throws IOException { return null; }
http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index 00d263c..7fc8e7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -44,8 +44,8 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.junit.Test; @@ -83,7 +83,6 @@ public class TestIOFencing { //((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")) // .getLogger().setLevel(Level.ALL); //((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); - //((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL); } public abstract static class CompactionBlockerRegion extends HRegion { @@ -92,7 +91,7 @@ public class TestIOFencing { volatile CountDownLatch compactionsWaiting = new CountDownLatch(0); @SuppressWarnings("deprecation") - public CompactionBlockerRegion(Path tableDir, HLog log, + public CompactionBlockerRegion(Path tableDir, WAL log, FileSystem fs, Configuration confParam, HRegionInfo info, HTableDescriptor htd, RegionServerServices rsServices) { super(tableDir, log, fs, confParam, info, htd, rsServices); @@ -139,7 +138,7 @@ public class TestIOFencing { */ public static class BlockCompactionsInPrepRegion extends CompactionBlockerRegion { - public BlockCompactionsInPrepRegion(Path tableDir, HLog log, + public BlockCompactionsInPrepRegion(Path tableDir, WAL log, FileSystem fs, Configuration confParam, HRegionInfo info, HTableDescriptor htd, RegionServerServices rsServices) { super(tableDir, log, fs, confParam, info, htd, rsServices); @@ -162,7 +161,7 @@ public class TestIOFencing { * entry to go the WAL before blocking, but blocks afterwards */ public static class BlockCompactionsInCompletionRegion extends CompactionBlockerRegion { - public BlockCompactionsInCompletionRegion(Path tableDir, HLog log, + public BlockCompactionsInCompletionRegion(Path tableDir, WAL log, FileSystem fs, Configuration confParam, HRegionInfo info, HTableDescriptor htd, RegionServerServices rsServices) { super(tableDir, log, fs, confParam, info, htd, rsServices); @@ -277,7 +276,7 @@ public class TestIOFencing { CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(oldHri, FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")), new Path("store_dir")); - HLogUtil.writeCompactionMarker(compactingRegion.getLog(), table.getTableDescriptor(), + WALUtil.writeCompactionMarker(compactingRegion.getWAL(), table.getTableDescriptor(), oldHri, compactionDescriptor, new AtomicLong(Long.MAX_VALUE-100)); // Wait till flush has happened, otherwise there won't be multiple store files http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java index a486f4b..929f9aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java @@ -437,7 +437,7 @@ public class TestHFileArchiving { // remove all the non-storefile named files for the region for (int i = 0; i < storeFiles.size(); i++) { String file = storeFiles.get(i); - if (file.contains(HRegionFileSystem.REGION_INFO_FILE) || file.contains("hlog")) { + if (file.contains(HRegionFileSystem.REGION_INFO_FILE) || file.contains("wal")) { storeFiles.remove(i--); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index fbe33a1..1f4d865 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -54,9 +54,9 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.wal.HLogUtilsForTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -520,7 +520,7 @@ public class TestAdmin2 { } @Test (timeout=300000) - public void testHLogRollWriting() throws Exception { + public void testWALRollWriting() throws Exception { setUpforLogRolling(); String className = this.getClass().getName(); StringBuilder v = new StringBuilder(className); @@ -530,7 +530,7 @@ public class TestAdmin2 { byte[] value = Bytes.toBytes(v.toString()); HRegionServer regionServer = startAndWriteData(TableName.valueOf("TestLogRolling"), value); LOG.info("after writing there are " - + HLogUtilsForTests.getNumRolledLogFiles(regionServer.getWAL()) + " log files"); + + DefaultWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files"); // flush all regions @@ -539,8 +539,8 @@ public class TestAdmin2 { for (HRegion r : regions) { r.flushcache(); } - admin.rollHLogWriter(regionServer.getServerName().getServerName()); - int count = HLogUtilsForTests.getNumRolledLogFiles(regionServer.getWAL()); + admin.rollWALWriter(regionServer.getServerName()); + int count = DefaultWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); assertTrue(("actual count: " + count), count <= 2); http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java index 3f983ed..d7852f1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; @@ -60,6 +61,12 @@ implements WALObserver { private boolean preWALRestoreCalled = false; private boolean postWALRestoreCalled = false; + // Deprecated versions + private boolean preWALWriteDeprecatedCalled = false; + private boolean postWALWriteDeprecatedCalled = false; + private boolean preWALRestoreDeprecatedCalled = false; + private boolean postWALRestoreDeprecatedCalled = false; + /** * Set values: with a table name, a column name which will be ignored, and * a column name which will be added to WAL. @@ -74,18 +81,32 @@ implements WALObserver { this.addedQualifier = addq; this.changedFamily = chf; this.changedQualifier = chq; + preWALWriteCalled = false; + postWALWriteCalled = false; + preWALRestoreCalled = false; + postWALRestoreCalled = false; + preWALWriteDeprecatedCalled = false; + postWALWriteDeprecatedCalled = false; + preWALRestoreDeprecatedCalled = false; + postWALRestoreDeprecatedCalled = false; } + @Override + public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env, + HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + postWALWriteCalled = true; + } @Override public void postWALWrite(ObserverContext<WALCoprocessorEnvironment> env, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { - postWALWriteCalled = true; + postWALWriteDeprecatedCalled = true; + postWALWrite(env, info, (WALKey)logKey, logEdit); } @Override - public boolean preWALWrite(ObserverContext<WALCoprocessorEnvironment> env, - HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { + public boolean preWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> env, + HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { boolean bypass = false; // check table name matches or not. if (!Bytes.equals(info.getTableName(), this.tableName)) { @@ -122,14 +143,28 @@ implements WALObserver { return bypass; } + @Override + public boolean preWALWrite(ObserverContext<WALCoprocessorEnvironment> env, + HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { + preWALWriteDeprecatedCalled = true; + return preWALWrite(env, info, (WALKey)logKey, logEdit); + } + /** * Triggered before {@link org.apache.hadoop.hbase.regionserver.HRegion} when WAL is * Restoreed. */ @Override + public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env, + HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + preWALRestoreCalled = true; + } + + @Override public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { - preWALRestoreCalled = true; + preWALRestoreDeprecatedCalled = true; + preWALRestore(env, info, (WALKey)logKey, logEdit); } /** @@ -137,9 +172,16 @@ implements WALObserver { * Restoreed. */ @Override + public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env, + HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + postWALRestoreCalled = true; + } + + @Override public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { - postWALRestoreCalled = true; + postWALRestoreDeprecatedCalled = true; + postWALRestore(env, info, (WALKey)logKey, logEdit); } public boolean isPreWALWriteCalled() { @@ -161,4 +203,27 @@ implements WALObserver { ".isPostWALRestoreCalled is called."); return postWALRestoreCalled; } + + public boolean isPreWALWriteDeprecatedCalled() { + return preWALWriteDeprecatedCalled; + } + + public boolean isPostWALWriteDeprecatedCalled() { + return postWALWriteDeprecatedCalled; + } + + public boolean isPreWALRestoreDeprecatedCalled() { + return preWALRestoreDeprecatedCalled; + } + + public boolean isPostWALRestoreDeprecatedCalled() { + return postWALRestoreDeprecatedCalled; + } + + /** + * This class should trigger our legacy support since it does not directly implement the + * newer API methods. + */ + static class Legacy extends SampleRegionWALObserver { + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index bf53518..7100ae7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -115,8 +116,6 @@ public class SimpleRegionObserver extends BaseRegionObserver { final AtomicInteger ctPreCheckAndDelete = new AtomicInteger(0); final AtomicInteger ctPreCheckAndDeleteAfterRowLock = new AtomicInteger(0); final AtomicInteger ctPostCheckAndDelete = new AtomicInteger(0); - final AtomicInteger ctPreWALRestored = new AtomicInteger(0); - final AtomicInteger ctPostWALRestored = new AtomicInteger(0); final AtomicInteger ctPreScannerNext = new AtomicInteger(0); final AtomicInteger ctPostScannerNext = new AtomicInteger(0); final AtomicInteger ctPreScannerClose = new AtomicInteger(0); @@ -130,6 +129,8 @@ public class SimpleRegionObserver extends BaseRegionObserver { final AtomicInteger ctPostBatchMutate = new AtomicInteger(0); final AtomicInteger ctPreWALRestore = new AtomicInteger(0); final AtomicInteger ctPostWALRestore = new AtomicInteger(0); + final AtomicInteger ctPreWALRestoreDeprecated = new AtomicInteger(0); + final AtomicInteger ctPostWALRestoreDeprecated = new AtomicInteger(0); final AtomicInteger ctPreSplitBeforePONR = new AtomicInteger(0); final AtomicInteger ctPreSplitAfterPONR = new AtomicInteger(0); final AtomicInteger ctPreStoreFileReaderOpen = new AtomicInteger(0); @@ -661,8 +662,8 @@ public class SimpleRegionObserver extends BaseRegionObserver { } @Override - public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info, - HLogKey logKey, WALEdit logEdit) throws IOException { + public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env, + HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { String tableName = logKey.getTablename().getNameAsString(); if (tableName.equals(TABLE_SKIPPED)) { // skip recovery of TABLE_SKIPPED for testing purpose @@ -673,9 +674,23 @@ public class SimpleRegionObserver extends BaseRegionObserver { } @Override + public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info, + HLogKey logKey, WALEdit logEdit) throws IOException { + preWALRestore(env, info, (WALKey)logKey, logEdit); + ctPreWALRestoreDeprecated.incrementAndGet(); + } + + @Override + public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> env, + HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + ctPostWALRestore.incrementAndGet(); + } + + @Override public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException { - ctPostWALRestore.incrementAndGet(); + postWALRestore(env, info, (WALKey)logKey, logEdit); + ctPostWALRestoreDeprecated.incrementAndGet(); } @Override @@ -794,13 +809,14 @@ public class SimpleRegionObserver extends BaseRegionObserver { return ctPrePrepareDeleteTS.get() > 0; } - public boolean hadPreWALRestored() { - return ctPreWALRestored.get() > 0; + public boolean hadPreWALRestore() { + return ctPreWALRestore.get() > 0; } - public boolean hadPostWALRestored() { - return ctPostWALRestored.get() > 0; + public boolean hadPostWALRestore() { + return ctPostWALRestore.get() > 0; } + public boolean wasScannerNextCalled() { return ctPreScannerNext.get() > 0 && ctPostScannerNext.get() > 0; } @@ -939,7 +955,22 @@ public class SimpleRegionObserver extends BaseRegionObserver { return ctPostWALRestore.get(); } + public int getCtPreWALRestoreDeprecated() { + return ctPreWALRestoreDeprecated.get(); + } + + public int getCtPostWALRestoreDeprecated() { + return ctPostWALRestoreDeprecated.get(); + } + public boolean wasStoreFileReaderOpenCalled() { return ctPreStoreFileReaderOpen.get() > 0 && ctPostStoreFileReaderOpen.get() > 0; } + + /** + * This implementation should trigger our legacy support because it does not directly + * implement the newer API calls. + */ + public static class Legacy extends SimpleRegionObserver { + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index 310d875..31da5aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -98,8 +98,9 @@ public class TestRegionObserverInterface { // set configure to indicate which cp should be loaded Configuration conf = util.getConfiguration(); conf.setBoolean("hbase.master.distributed.log.replay", true); - conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver"); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver", + "org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver$Legacy"); util.startMiniCluster(); cluster = util.getMiniHBaseCluster(); @@ -618,9 +619,10 @@ public class TestRegionObserverInterface { ); verifyMethodResult(SimpleRegionObserver.class, - new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"}, + new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut", + "getCtPreWALRestoreDeprecated", "getCtPostWALRestoreDeprecated"}, tableName, - new Integer[] {0, 0, 1, 1}); + new Integer[] {0, 0, 1, 1, 0, 0}); cluster.killRegionServer(rs1.getRegionServer().getServerName()); Threads.sleep(1000); // Let the kill soak in. @@ -628,9 +630,60 @@ public class TestRegionObserverInterface { LOG.info("All regions assigned"); verifyMethodResult(SimpleRegionObserver.class, - new String[]{"getCtPrePut", "getCtPostPut"}, + new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut", + "getCtPreWALRestoreDeprecated", "getCtPostWALRestoreDeprecated"}, tableName, - new Integer[]{0, 0}); + new Integer[]{1, 1, 0, 0, 0, 0}); + } finally { + util.deleteTable(tableName); + table.close(); + } + } + + @Test + public void testLegacyRecovery() throws Exception { + LOG.info(TestRegionObserverInterface.class.getName() +".testLegacyRecovery"); + TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testLegacyRecovery"); + HTable table = util.createTable(tableName, new byte[][] {A, B, C}); + try { + JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer(); + ServerName sn2 = rs1.getRegionServer().getServerName(); + String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName(); + + util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes()); + while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){ + Thread.sleep(100); + } + + Put put = new Put(ROW); + put.add(A, A, A); + put.add(B, B, B); + put.add(C, C, C); + table.put(put); + + verifyMethodResult(SimpleRegionObserver.Legacy.class, + new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut", + "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"}, + tableName, + new Boolean[] {false, false, true, true, true, true, false} + ); + + verifyMethodResult(SimpleRegionObserver.Legacy.class, + new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut", + "getCtPreWALRestoreDeprecated", "getCtPostWALRestoreDeprecated"}, + tableName, + new Integer[] {0, 0, 1, 1, 0, 0}); + + cluster.killRegionServer(rs1.getRegionServer().getServerName()); + Threads.sleep(1000); // Let the kill soak in. + util.waitUntilAllRegionsAssigned(tableName); + LOG.info("All regions assigned"); + + verifyMethodResult(SimpleRegionObserver.Legacy.class, + new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut", + "getCtPreWALRestoreDeprecated", "getCtPostWALRestoreDeprecated"}, + tableName, + new Integer[]{1, 1, 0, 0, 1, 1}); } finally { util.deleteTable(tableName); table.close(); @@ -664,7 +717,9 @@ public class TestRegionObserverInterface { util.waitUntilAllRegionsAssigned(tableName); verifyMethodResult(SimpleRegionObserver.class, new String[] { "getCtPreWALRestore", - "getCtPostWALRestore" }, tableName, new Integer[] { 0, 0 }); + "getCtPostWALRestore", "getCtPreWALRestoreDeprecated", "getCtPostWALRestoreDeprecated"}, + tableName, + new Integer[] {0, 0, 0, 0}); util.deleteTable(tableName); table.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 3365a95..1f278e3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -61,7 +61,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; -import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -216,7 +216,7 @@ public class TestRegionObserverScannerOpenHook { private static volatile CountDownLatch compactionStateChangeLatch = null; @SuppressWarnings("deprecation") - public CompactionCompletionNotifyingRegion(Path tableDir, HLog log, + public CompactionCompletionNotifyingRegion(Path tableDir, WAL log, FileSystem fs, Configuration confParam, HRegionInfo info, HTableDescriptor htd, RegionServerServices rsServices) { super(tableDir, log, fs, confParam, info, htd, rsServices); http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index 1eddc8a..95a194d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -49,11 +49,14 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; -import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; @@ -63,7 +66,9 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import org.junit.experimental.categories.Category; /** @@ -85,6 +90,9 @@ public class TestWALObserver { Bytes.toBytes("v2"), Bytes.toBytes("v3"), }; private static byte[] TEST_ROW = Bytes.toBytes("testRow"); + @Rule + public TestName currentTest = new TestName(); + private Configuration conf; private FileSystem fs; private Path dir; @@ -92,12 +100,13 @@ public class TestWALObserver { private String logName; private Path oldLogDir; private Path logDir; + private WALFactory wals; @BeforeClass public static void setupBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, - SampleRegionWALObserver.class.getName()); + conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + SampleRegionWALObserver.class.getName(), SampleRegionWALObserver.Legacy.class.getName()); conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, SampleRegionWALObserver.class.getName()); conf.setBoolean("dfs.support.append", true); @@ -124,16 +133,25 @@ public class TestWALObserver { this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName()); this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); - this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME); + this.logDir = new Path(this.hbaseRootDir, + DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName())); this.logName = HConstants.HREGION_LOGDIR_NAME; if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) { TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); } + this.wals = new WALFactory(conf, null, currentTest.getMethodName()); } @After public void tearDown() throws Exception { + try { + wals.shutdown(); + } catch (IOException exception) { + // one of our tests splits out from under our wals. + LOG.warn("Ignoring failure to close wal factory. " + exception.getMessage()); + LOG.debug("details of failure to close wal factory.", exception); + } TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true); } @@ -144,7 +162,23 @@ public class TestWALObserver { */ @Test public void testWALObserverWriteToWAL() throws Exception { + final WAL log = wals.getWAL(UNSPECIFIED_REGION); + verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALObserver.class), false); + } + + /** + * Test WAL write behavior with WALObserver. The coprocessor monitors a + * WALEdit written to WAL, and ignore, modify, and add KeyValue's for the + * WALEdit. + */ + @Test + public void testLegacyWALObserverWriteToWAL() throws Exception { + final WAL log = wals.getWAL(UNSPECIFIED_REGION); + verifyWritesSeen(log, getCoprocessor(log, SampleRegionWALObserver.Legacy.class), true); + } + private void verifyWritesSeen(final WAL log, final SampleRegionWALObserver cp, + final boolean seesLegacy) throws Exception { HRegionInfo hri = createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE)); final HTableDescriptor htd = createBasic3FamilyHTD(Bytes .toString(TEST_TABLE)); @@ -154,10 +188,6 @@ public class TestWALObserver { fs.mkdirs(new Path(basedir, hri.getEncodedName())); final AtomicLong sequenceId = new AtomicLong(0); - HLog log = HLogFactory.createHLog(this.fs, hbaseRootDir, - TestWALObserver.class.getName(), this.conf); - SampleRegionWALObserver cp = getCoprocessor(log); - // TEST_FAMILY[0] shall be removed from WALEdit. // TEST_FAMILY[1] value shall be changed. // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put. @@ -166,6 +196,8 @@ public class TestWALObserver { assertFalse(cp.isPreWALWriteCalled()); assertFalse(cp.isPostWALWriteCalled()); + assertFalse(cp.isPreWALWriteDeprecatedCalled()); + assertFalse(cp.isPostWALWriteDeprecatedCalled()); // TEST_FAMILY[2] is not in the put, however it shall be added by the tested // coprocessor. @@ -201,7 +233,10 @@ public class TestWALObserver { // it's where WAL write cp should occur. long now = EnvironmentEdgeManager.currentTime(); - log.append(hri, hri.getTable(), edit, now, htd, sequenceId); + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + long txid = log.append(htd, hri, new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now), + edit, sequenceId, true, null); + log.sync(txid); // the edit shall have been change now by the coprocessor. foundFamily0 = false; @@ -226,6 +261,83 @@ public class TestWALObserver { assertTrue(cp.isPreWALWriteCalled()); assertTrue(cp.isPostWALWriteCalled()); + assertEquals(seesLegacy, cp.isPreWALWriteDeprecatedCalled()); + assertEquals(seesLegacy, cp.isPostWALWriteDeprecatedCalled()); + } + + @Test + public void testNonLegacyWALKeysDoNotExplode() throws Exception { + TableName tableName = TableName.valueOf(TEST_TABLE); + final HTableDescriptor htd = createBasic3FamilyHTD(Bytes + .toString(TEST_TABLE)); + final HRegionInfo hri = new HRegionInfo(tableName, null, null); + final AtomicLong sequenceId = new AtomicLong(0); + + fs.mkdirs(new Path(FSUtils.getTableDir(hbaseRootDir, tableName), hri.getEncodedName())); + + final Configuration newConf = HBaseConfiguration.create(this.conf); + + final WAL wal = wals.getWAL(UNSPECIFIED_REGION); + final SampleRegionWALObserver newApi = getCoprocessor(wal, SampleRegionWALObserver.class); + newApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null); + final SampleRegionWALObserver oldApi = getCoprocessor(wal, + SampleRegionWALObserver.Legacy.class); + oldApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null); + + LOG.debug("ensuring wal entries haven't happened before we start"); + assertFalse(newApi.isPreWALWriteCalled()); + assertFalse(newApi.isPostWALWriteCalled()); + assertFalse(newApi.isPreWALWriteDeprecatedCalled()); + assertFalse(newApi.isPostWALWriteDeprecatedCalled()); + assertFalse(oldApi.isPreWALWriteCalled()); + assertFalse(oldApi.isPostWALWriteCalled()); + assertFalse(oldApi.isPreWALWriteDeprecatedCalled()); + assertFalse(oldApi.isPostWALWriteDeprecatedCalled()); + + LOG.debug("writing to WAL with non-legacy keys."); + final int countPerFamily = 5; + for (HColumnDescriptor hcd : htd.getFamilies()) { + addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, + EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId); + } + + LOG.debug("Verify that only the non-legacy CP saw edits."); + assertTrue(newApi.isPreWALWriteCalled()); + assertTrue(newApi.isPostWALWriteCalled()); + assertFalse(newApi.isPreWALWriteDeprecatedCalled()); + assertFalse(newApi.isPostWALWriteDeprecatedCalled()); + // wish we could test that the log message happened :/ + assertFalse(oldApi.isPreWALWriteCalled()); + assertFalse(oldApi.isPostWALWriteCalled()); + assertFalse(oldApi.isPreWALWriteDeprecatedCalled()); + assertFalse(oldApi.isPostWALWriteDeprecatedCalled()); + + LOG.debug("reseting cp state."); + newApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null); + oldApi.setTestValues(TEST_TABLE, TEST_ROW, null, null, null, null, null, null); + + LOG.debug("write a log edit that supports legacy cps."); + final long now = EnvironmentEdgeManager.currentTime(); + final WALKey legacyKey = new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), now); + final WALEdit edit = new WALEdit(); + final byte[] nonce = Bytes.toBytes("1772"); + edit.add(new KeyValue(TEST_ROW, TEST_FAMILY[0], nonce, now, nonce)); + final long txid = wal.append(htd, hri, legacyKey, edit, sequenceId, true, null); + wal.sync(txid); + + LOG.debug("Make sure legacy cps can see supported edits after having been skipped."); + assertTrue("non-legacy WALObserver didn't see pre-write.", newApi.isPreWALWriteCalled()); + assertTrue("non-legacy WALObserver didn't see post-write.", newApi.isPostWALWriteCalled()); + assertFalse("non-legacy WALObserver shouldn't have seen legacy pre-write.", + newApi.isPreWALWriteDeprecatedCalled()); + assertFalse("non-legacy WALObserver shouldn't have seen legacy post-write.", + newApi.isPostWALWriteDeprecatedCalled()); + assertTrue("legacy WALObserver didn't see pre-write.", oldApi.isPreWALWriteCalled()); + assertTrue("legacy WALObserver didn't see post-write.", oldApi.isPostWALWriteCalled()); + assertTrue("legacy WALObserver didn't see legacy pre-write.", + oldApi.isPreWALWriteDeprecatedCalled()); + assertTrue("legacy WALObserver didn't see legacy post-write.", + oldApi.isPostWALWriteDeprecatedCalled()); } /** @@ -237,10 +349,9 @@ public class TestWALObserver { final HTableDescriptor htd = createBasic3FamilyHTD(Bytes.toString(TEST_TABLE)); final AtomicLong sequenceId = new AtomicLong(0); - HLog log = HLogFactory.createHLog(this.fs, hbaseRootDir, - TestWALObserver.class.getName(), this.conf); + WAL log = wals.getWAL(UNSPECIFIED_REGION); try { - SampleRegionWALObserver cp = getCoprocessor(log); + SampleRegionWALObserver cp = getCoprocessor(log, SampleRegionWALObserver.class); cp.setTestValues(TEST_TABLE, null, null, null, null, null, null, null); @@ -248,13 +359,14 @@ public class TestWALObserver { assertFalse(cp.isPostWALWriteCalled()); final long now = EnvironmentEdgeManager.currentTime(); - log.append(hri, hri.getTable(), new WALEdit(), now, htd, sequenceId); - log.sync(); + long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), now), + new WALEdit(), sequenceId, true, null); + log.sync(txid); assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPreWALWriteCalled()); assertFalse("Empty WALEdit should skip coprocessor evaluation.", cp.isPostWALWriteCalled()); } finally { - log.closeAndDelete(); + log.close(); } } @@ -281,8 +393,8 @@ public class TestWALObserver { final Configuration newConf = HBaseConfiguration.create(this.conf); - // HLog wal = new HLog(this.fs, this.dir, this.oldLogDir, this.conf); - HLog wal = createWAL(this.conf); + // WAL wal = new WAL(this.fs, this.dir, this.oldLogDir, this.conf); + WAL wal = wals.getWAL(UNSPECIFIED_REGION); // Put p = creatPutWith2Families(TEST_ROW); WALEdit edit = new WALEdit(); long now = EnvironmentEdgeManager.currentTime(); @@ -290,12 +402,11 @@ public class TestWALObserver { final int countPerFamily = 1000; // for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) { for (HColumnDescriptor hcd : htd.getFamilies()) { - // addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, - // EnvironmentEdgeManager.getDelegate(), wal); addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, EnvironmentEdgeManager.getDelegate(), wal, htd, sequenceId); } - wal.append(hri, tableName, edit, now, htd, sequenceId); + wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, now), edit, sequenceId, + true, null); // sync to fs. wal.sync(); @@ -307,7 +418,8 @@ public class TestWALObserver { LOG.info("WALSplit path == " + p); FileSystem newFS = FileSystem.get(newConf); // Make a new wal for new region open. - HLog wal2 = createWAL(newConf); + final WALFactory wals2 = new WALFactory(conf, null, currentTest.getMethodName()+"2"); + WAL wal2 = wals2.getWAL(UNSPECIFIED_REGION);; HRegion region = HRegion.openHRegion(newConf, FileSystem.get(newConf), hbaseRootDir, hri, htd, wal2, TEST_UTIL.getHBaseCluster().getRegionServer(0), null); long seqid2 = region.getOpenSeqNum(); @@ -319,8 +431,10 @@ public class TestWALObserver { assertNotNull(cp2); assertTrue(cp2.isPreWALRestoreCalled()); assertTrue(cp2.isPostWALRestoreCalled()); + assertFalse(cp2.isPreWALRestoreDeprecatedCalled()); + assertFalse(cp2.isPostWALRestoreDeprecatedCalled()); region.close(); - wal2.closeAndDelete(); + wals2.close(); return null; } }); @@ -329,19 +443,18 @@ public class TestWALObserver { /** * Test to see CP loaded successfully or not. There is a duplication at * TestHLog, but the purpose of that one is to see whether the loaded CP will - * impact existing HLog tests or not. + * impact existing WAL tests or not. */ @Test public void testWALObserverLoaded() throws Exception { - HLog log = HLogFactory.createHLog(fs, hbaseRootDir, - TestWALObserver.class.getName(), conf); - assertNotNull(getCoprocessor(log)); + WAL log = wals.getWAL(UNSPECIFIED_REGION); + assertNotNull(getCoprocessor(log, SampleRegionWALObserver.class)); } - private SampleRegionWALObserver getCoprocessor(HLog wal) throws Exception { + private SampleRegionWALObserver getCoprocessor(WAL wal, + Class<? extends SampleRegionWALObserver> clazz) throws Exception { WALCoprocessorHost host = wal.getCoprocessorHost(); - Coprocessor c = host.findCoprocessor(SampleRegionWALObserver.class - .getName()); + Coprocessor c = host.findCoprocessor(clazz.getName()); return (SampleRegionWALObserver) c; } @@ -399,8 +512,8 @@ public class TestWALObserver { } private Path runWALSplit(final Configuration c) throws IOException { - List<Path> splits = HLogSplitter.split( - hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c); + List<Path> splits = WALSplitter.split( + hbaseRootDir, logDir, oldLogDir, FileSystem.get(c), c, wals); // Split should generate only 1 file since there's only 1 region assertEquals(1, splits.size()); // Make sure the file exists @@ -409,21 +522,25 @@ public class TestWALObserver { return splits.get(0); } - private HLog createWAL(final Configuration c) throws IOException { - return HLogFactory.createHLog(FileSystem.get(c), hbaseRootDir, logName, c); - } + private static final byte[] UNSPECIFIED_REGION = new byte[]{}; - private void addWALEdits(final TableName tableName, final HRegionInfo hri, - final byte[] rowName, final byte[] family, final int count, - EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd, final AtomicLong sequenceId) - throws IOException { + private void addWALEdits(final TableName tableName, final HRegionInfo hri, final byte[] rowName, + final byte[] family, final int count, EnvironmentEdge ee, final WAL wal, + final HTableDescriptor htd, final AtomicLong sequenceId) throws IOException { String familyStr = Bytes.toString(family); + long txid = -1; for (int j = 0; j < count; j++) { byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j)); byte[] columnBytes = Bytes.toBytes(familyStr + ":" + Integer.toString(j)); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTime(), columnBytes)); - wal.append(hri, tableName, edit, ee.currentTime(), htd, sequenceId); + // uses WALKey instead of HLogKey on purpose. will only work for tests where we don't care + // about legacy coprocessors + txid = wal.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, + ee.currentTime()), edit, sequenceId, true, null); + } + if (-1 != txid) { + wal.sync(txid); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java index b71473e..a4782b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestFilter.java @@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Assert; @@ -217,9 +217,9 @@ public class TestFilter { @After public void tearDown() throws Exception { - HLog hlog = region.getLog(); + WAL wal = region.getWAL(); region.close(); - hlog.closeAndDelete(); + wal.close(); } @Test @@ -1488,9 +1488,9 @@ public class TestFilter { assertEquals(2, resultCount); scanner.close(); - HLog hlog = testRegion.getLog(); + WAL wal = testRegion.getWAL(); testRegion.close(); - hlog.closeAndDelete(); + wal.close(); } @Test @@ -2096,8 +2096,8 @@ public class TestFilter { results.clear(); } assertFalse(scanner.next(results)); - HLog hlog = testRegion.getLog(); + WAL wal = testRegion.getWAL(); testRegion.close(); - hlog.closeAndDelete(); + wal.close(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java index ba712f3..7223573 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/TestInvocationRecordFilter.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Assert; @@ -149,9 +149,9 @@ public class TestInvocationRecordFilter { @After public void tearDown() throws Exception { - HLog hlog = region.getLog(); + WAL wal = region.getWAL(); region.close(); - hlog.closeAndDelete(); + wal.close(); } /** @@ -179,4 +179,4 @@ public class TestInvocationRecordFilter { return true; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java index ead2c47..9b610e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hbase.fs; - +import java.io.FileNotFoundException; +import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.BindException; import java.net.ServerSocket; +import java.util.List; +import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +38,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.LargeTests; @@ -42,8 +46,10 @@ import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -278,7 +284,32 @@ public class TestBlockReorder { int nbTest = 0; while (nbTest < 10) { - htu.getHBaseAdmin().rollHLogWriter(targetRs.getServerName().toString()); + final List<HRegion> regions = targetRs.getOnlineRegions(h.getName()); + final CountDownLatch latch = new CountDownLatch(regions.size()); + // listen for successful log rolls + final WALActionsListener listener = new WALActionsListener.Base() { + @Override + public void postLogRoll(final Path oldPath, final Path newPath) throws IOException { + latch.countDown(); + } + }; + for (HRegion region : regions) { + region.getWAL().registerWALActionsListener(listener); + } + + htu.getHBaseAdmin().rollWALWriter(targetRs.getServerName()); + + // wait + try { + latch.await(); + } catch (InterruptedException exception) { + LOG.warn("Interrupted while waiting for the wal of '" + targetRs + "' to roll. If later " + + "tests fail, it's probably because we should still be waiting."); + Thread.currentThread().interrupt(); + } + for (HRegion region : regions) { + region.getWAL().unregisterWALActionsListener(listener); + } // We need a sleep as the namenode is informed asynchronously Thread.sleep(100); @@ -294,37 +325,52 @@ public class TestBlockReorder { // As we wrote a put, we should have at least one log file. Assert.assertTrue(hfs.length >= 1); for (HdfsFileStatus hf : hfs) { - LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir); - String logFile = rootDir + "/" + hf.getLocalName(); - FileStatus fsLog = rfs.getFileStatus(new Path(logFile)); - - LOG.info("Checking log file: " + logFile); - // Now checking that the hook is up and running - // We can't call directly getBlockLocations, it's not available in HFileSystem - // We're trying multiple times to be sure, as the order is random - - BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1); - if (bls.length > 0) { - BlockLocation bl = bls[0]; - - LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " "); - for (int i = 0; i < bl.getHosts().length - 1; i++) { - LOG.info(bl.getHosts()[i] + " " + logFile); - Assert.assertNotSame(bl.getHosts()[i], host4); - } - String last = bl.getHosts()[bl.getHosts().length - 1]; - LOG.info(last + " " + logFile); - if (host4.equals(last)) { - nbTest++; - LOG.info(logFile + " is on the new datanode and is ok"); - if (bl.getHosts().length == 3) { - // We can test this case from the file system as well - // Checking the underlying file system. Multiple times as the order is random - testFromDFS(dfs, logFile, repCount, host4); - - // now from the master - testFromDFS(mdfs, logFile, repCount, host4); + // Because this is a live cluster, log files might get archived while we're processing + try { + LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir); + String logFile = rootDir + "/" + hf.getLocalName(); + FileStatus fsLog = rfs.getFileStatus(new Path(logFile)); + + LOG.info("Checking log file: " + logFile); + // Now checking that the hook is up and running + // We can't call directly getBlockLocations, it's not available in HFileSystem + // We're trying multiple times to be sure, as the order is random + + BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1); + if (bls.length > 0) { + BlockLocation bl = bls[0]; + + LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile + " "); + for (int i = 0; i < bl.getHosts().length - 1; i++) { + LOG.info(bl.getHosts()[i] + " " + logFile); + Assert.assertNotSame(bl.getHosts()[i], host4); } + String last = bl.getHosts()[bl.getHosts().length - 1]; + LOG.info(last + " " + logFile); + if (host4.equals(last)) { + nbTest++; + LOG.info(logFile + " is on the new datanode and is ok"); + if (bl.getHosts().length == 3) { + // We can test this case from the file system as well + // Checking the underlying file system. Multiple times as the order is random + testFromDFS(dfs, logFile, repCount, host4); + + // now from the master + testFromDFS(mdfs, logFile, repCount, host4); + } + } + } + } catch (FileNotFoundException exception) { + LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " + + "archived out from under us so we'll ignore and retry. If this test hangs " + + "indefinitely you should treat this failure as a symptom.", exception); + } catch (RemoteException exception) { + if (exception.unwrapRemoteException() instanceof FileNotFoundException) { + LOG.debug("Failed to find log file '" + hf.getLocalName() + "'; it probably was " + + "archived out from under us so we'll ignore and retry. If this test hangs " + + "indefinitely you should treat this failure as a symptom.", exception); + } else { + throw exception; } } } @@ -414,7 +460,7 @@ public class TestBlockReorder { // Check that it will be possible to extract a ServerName from our construction Assert.assertNotNull("log= " + pseudoLogFile, - HLogUtil.getServerNameFromHLogDirectoryName(dfs.getConf(), pseudoLogFile)); + DefaultWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile)); // And check we're doing the right reorder. lrb.reorderBlocks(conf, l, pseudoLogFile); http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 2d5060e..478fde3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -65,7 +65,7 @@ public class TestHeapSize { static final Log LOG = LogFactory.getLog(TestHeapSize.class); // List of classes implementing HeapSize // BatchOperation, BatchUpdate, BlockIndex, Entry, Entry<K,V>, HStoreKey - // KeyValue, LruBlockCache, LruHashMap<K,V>, Put, HLogKey + // KeyValue, LruBlockCache, LruHashMap<K,V>, Put, WALKey @BeforeClass public static void beforeClass() throws Exception { http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java index 22acfa9..5f890d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java @@ -17,232 +17,26 @@ */ package org.apache.hadoop.hbase.mapreduce; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogRecordReader; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.MapReduceTestUtil; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; +import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader; +import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogKeyRecordReader; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.wal.WALKey; import org.junit.experimental.categories.Category; /** - * JUnit tests for the HLogRecordReader + * JUnit tests for the record reader in HLogInputFormat */ @Category(MediumTests.class) -public class TestHLogRecordReader { - private final Log LOG = LogFactory.getLog(getClass()); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static Configuration conf; - private static FileSystem fs; - private static Path hbaseDir; - private static final TableName tableName = - TableName.valueOf(getName()); - private static final byte [] rowName = tableName.getName(); - private static final HRegionInfo info = new HRegionInfo(tableName, - Bytes.toBytes(""), Bytes.toBytes(""), false); - private static final byte [] family = Bytes.toBytes("column"); - private static final byte [] value = Bytes.toBytes("value"); - private static HTableDescriptor htd; - private static Path logDir; - private static String logName; - - private static String getName() { - return "TestHLogRecordReader"; - } - - @Before - public void setUp() throws Exception { - FileStatus[] entries = fs.listStatus(hbaseDir); - for (FileStatus dir : entries) { - fs.delete(dir.getPath(), true); - } - - } - @BeforeClass - public static void setUpBeforeClass() throws Exception { - // Make block sizes small. - conf = TEST_UTIL.getConfiguration(); - conf.setInt("dfs.blocksize", 1024 * 1024); - conf.setInt("dfs.replication", 1); - TEST_UTIL.startMiniDFSCluster(1); - - conf = TEST_UTIL.getConfiguration(); - fs = TEST_UTIL.getDFSCluster().getFileSystem(); +public class TestHLogRecordReader extends TestWALRecordReader { - hbaseDir = TEST_UTIL.createRootDir(); - - logName = HConstants.HREGION_LOGDIR_NAME; - logDir = new Path(hbaseDir, logName); - - htd = new HTableDescriptor(tableName); - htd.addFamily(new HColumnDescriptor(family)); + @Override + protected WALKey getWalKey(final long sequenceid) { + return new HLogKey(info.getEncodedNameAsBytes(), tableName, sequenceid); } - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); + @Override + protected WALRecordReader getReader() { + return new HLogKeyRecordReader(); } - - /** - * Test partial reads from the log based on passed time range - * @throws Exception - */ - @Test - public void testPartialRead() throws Exception { - HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf); - // This test depends on timestamp being millisecond based and the filename of the WAL also - // being millisecond based. - long ts = System.currentTimeMillis(); - WALEdit edit = new WALEdit(); - final AtomicLong sequenceId = new AtomicLong(0); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); - log.append(info, tableName, edit, ts, htd, sequenceId); - edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); - log.append(info, tableName, edit, ts+1, htd, sequenceId); - LOG.info("Before 1st WAL roll " + log.getFilenum()); - log.rollWriter(); - LOG.info("Past 1st WAL roll " + log.getFilenum()); - - Thread.sleep(1); - long ts1 = System.currentTimeMillis(); - - edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); - log.append(info, tableName, edit, ts1+1, htd, sequenceId); - edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); - log.append(info, tableName, edit, ts1+2, htd, sequenceId); - log.close(); - LOG.info("Closed WAL " + log.getFilenum()); - - - HLogInputFormat input = new HLogInputFormat(); - Configuration jobConf = new Configuration(conf); - jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); - jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts); - - // only 1st file is considered, and only its 1st entry is used - List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); - - assertEquals(1, splits.size()); - testSplit(splits.get(0), Bytes.toBytes("1")); - - jobConf.setLong(HLogInputFormat.START_TIME_KEY, ts+1); - jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts1+1); - splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); - // both files need to be considered - assertEquals(2, splits.size()); - // only the 2nd entry from the 1st file is used - testSplit(splits.get(0), Bytes.toBytes("2")); - // only the 1nd entry from the 2nd file is used - testSplit(splits.get(1), Bytes.toBytes("3")); - } - - /** - * Test basic functionality - * @throws Exception - */ - @Test - public void testHLogRecordReader() throws Exception { - HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf); - byte [] value = Bytes.toBytes("value"); - final AtomicLong sequenceId = new AtomicLong(0); - WALEdit edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), - System.currentTimeMillis(), value)); - log.append(info, tableName, edit, - System.currentTimeMillis(), htd, sequenceId); - - Thread.sleep(1); // make sure 2nd log gets a later timestamp - long secondTs = System.currentTimeMillis(); - log.rollWriter(); - - edit = new WALEdit(); - edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), - System.currentTimeMillis(), value)); - log.append(info, tableName, edit, - System.currentTimeMillis(), htd, sequenceId); - log.close(); - long thirdTs = System.currentTimeMillis(); - - // should have 2 log files now - HLogInputFormat input = new HLogInputFormat(); - Configuration jobConf = new Configuration(conf); - jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); - - // make sure both logs are found - List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); - assertEquals(2, splits.size()); - - // should return exactly one KV - testSplit(splits.get(0), Bytes.toBytes("1")); - // same for the 2nd split - testSplit(splits.get(1), Bytes.toBytes("2")); - - // now test basic time ranges: - - // set an endtime, the 2nd log file can be ignored completely. - jobConf.setLong(HLogInputFormat.END_TIME_KEY, secondTs-1); - splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); - assertEquals(1, splits.size()); - testSplit(splits.get(0), Bytes.toBytes("1")); - - // now set a start time - jobConf.setLong(HLogInputFormat.END_TIME_KEY, Long.MAX_VALUE); - jobConf.setLong(HLogInputFormat.START_TIME_KEY, thirdTs); - splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); - // both logs need to be considered - assertEquals(2, splits.size()); - // but both readers skip all edits - testSplit(splits.get(0)); - testSplit(splits.get(1)); - } - - /** - * Create a new reader from the split, and match the edits against the passed columns. - */ - private void testSplit(InputSplit split, byte[]... columns) throws Exception { - HLogRecordReader reader = new HLogRecordReader(); - reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); - - for (byte[] column : columns) { - assertTrue(reader.nextKeyValue()); - Cell cell = reader.getCurrentValue().getCells().get(0); - if (!Bytes.equals(column, cell.getQualifier())) { - assertTrue("expected [" + Bytes.toString(column) + "], actual [" - + Bytes.toString(cell.getQualifier()) + "]", false); - } - } - assertFalse(reader.nextKeyValue()); - reader.close(); - } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java index 3ce657e..7dfc7d8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportExport.java @@ -59,10 +59,10 @@ import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.Import.KeyValueImporter; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LauncherSecurityManager; import org.apache.hadoop.mapreduce.Job; @@ -643,10 +643,10 @@ public class TestImportExport { String importTableName = "importTestDurability1"; Table importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); - // Register the hlog listener for the import table + // Register the wal listener for the import table TableWALActionListener walListener = new TableWALActionListener(importTableName); - HLog hLog = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(); - hLog.registerWALActionsListener(walListener); + WAL wal = UTIL.getMiniHBaseCluster().getRegionServer(0).getWAL(null); + wal.registerWALActionsListener(walListener); // Run the import with SKIP_WAL args = @@ -661,9 +661,9 @@ public class TestImportExport { // Run the import with the default durability option importTableName = "importTestDurability2"; importTable = UTIL.createTable(TableName.valueOf(importTableName), FAMILYA, 3); - hLog.unregisterWALActionsListener(walListener); + wal.unregisterWALActionsListener(walListener); walListener = new TableWALActionListener(importTableName); - hLog.registerWALActionsListener(walListener); + wal.registerWALActionsListener(walListener); args = new String[] { importTableName, FQ_OUTPUT_DIR }; assertTrue(runImport(args)); //Assert that the wal is visisted @@ -673,10 +673,10 @@ public class TestImportExport { } /** - * This listens to the {@link #visitLogEntryBeforeWrite(HTableDescriptor, HLogKey, WALEdit)} to + * This listens to the {@link #visitLogEntryBeforeWrite(HTableDescriptor, WALKey, WALEdit)} to * identify that an entry is written to the Write Ahead Log for the given table. */ - private static class TableWALActionListener implements WALActionsListener { + private static class TableWALActionListener extends WALActionsListener.Base { private String tableName; private boolean isVisited = false; @@ -686,42 +686,7 @@ public class TestImportExport { } @Override - public void preLogRoll(Path oldPath, Path newPath) throws IOException { - // Not interested in this method. - } - - @Override - public void postLogRoll(Path oldPath, Path newPath) throws IOException { - // Not interested in this method. - } - - @Override - public void preLogArchive(Path oldPath, Path newPath) throws IOException { - // Not interested in this method. - } - - @Override - public void postLogArchive(Path oldPath, Path newPath) throws IOException { - // Not interested in this method. - } - - @Override - public void logRollRequested() { - // Not interested in this method. - } - - @Override - public void logCloseRequested() { - // Not interested in this method. - } - - @Override - public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) { - // Not interested in this method. - } - - @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) { + public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) { if (tableName.equalsIgnoreCase(htd.getNameAsString())) { isVisited = true; } http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java index a2ec2ec..5bb672d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java @@ -45,8 +45,8 @@ public class TestTableMapReduceUtil { Job job = new Job(configuration, "tableName"); // test TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class, - Text.class, job, false, HLogInputFormat.class); - assertEquals(HLogInputFormat.class, job.getInputFormatClass()); + Text.class, job, false, WALInputFormat.class); + assertEquals(WALInputFormat.class, job.getInputFormatClass()); assertEquals(Import.Importer.class, job.getMapperClass()); assertEquals(LongWritable.class, job.getOutputKeyClass()); assertEquals(Text.class, job.getOutputValueClass()); @@ -59,8 +59,8 @@ public class TestTableMapReduceUtil { Configuration configuration = new Configuration(); Job job = new Job(configuration, "tableName"); TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(), - Import.Importer.class, Text.class, Text.class, job, false, HLogInputFormat.class); - assertEquals(HLogInputFormat.class, job.getInputFormatClass()); + Import.Importer.class, Text.class, Text.class, job, false, WALInputFormat.class); + assertEquals(WALInputFormat.class, job.getInputFormatClass()); assertEquals(Import.Importer.class, job.getMapperClass()); assertEquals(LongWritable.class, job.getOutputKeyClass()); assertEquals(Text.class, job.getOutputValueClass()); http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java index 03fa9f2..9669df9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java @@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.WALPlayer.HLogKeyValueMapper; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.mapreduce.WALPlayer.WALKeyValueMapper; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.LauncherSecurityManager; @@ -106,7 +106,7 @@ public class TestWALPlayer { t1.delete(d); // replay the WAL, map table 1 to table 2 - HLog log = cluster.getRegionServer(0).getWAL(); + WAL log = cluster.getRegionServer(0).getWAL(null); log.rollWriter(); String walInputDir = new Path(cluster.getMaster().getMasterFileSystem() .getRootDir(), HConstants.HREGION_LOGDIR_NAME).toString(); @@ -129,17 +129,26 @@ public class TestWALPlayer { } /** - * Test HLogKeyValueMapper setup and map + * Test WALKeyValueMapper setup and map */ @Test - public void testHLogKeyValueMapper() throws Exception { + public void testWALKeyValueMapper() throws Exception { + testWALKeyValueMapper(WALPlayer.TABLES_KEY); + } + + @Test + public void testWALKeyValueMapperWithDeprecatedConfig() throws Exception { + testWALKeyValueMapper("hlog.input.tables"); + } + + private void testWALKeyValueMapper(final String tableConfigKey) throws Exception { Configuration configuration = new Configuration(); - configuration.set(WALPlayer.TABLES_KEY, "table"); - HLogKeyValueMapper mapper = new HLogKeyValueMapper(); - HLogKey key = mock(HLogKey.class); + configuration.set(tableConfigKey, "table"); + WALKeyValueMapper mapper = new WALKeyValueMapper(); + WALKey key = mock(WALKey.class); when(key.getTablename()).thenReturn(TableName.valueOf("table")); @SuppressWarnings("unchecked") - Mapper<HLogKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context = + Mapper<WALKey, WALEdit, ImmutableBytesWritable, KeyValue>.Context context = mock(Context.class); when(context.getConfiguration()).thenReturn(configuration); @@ -191,7 +200,7 @@ public class TestWALPlayer { assertTrue(data.toString().contains("ERROR: Wrong number of arguments:")); assertTrue(data.toString().contains("Usage: WALPlayer [options] <wal inputdir>" + " <tables> [<tableMappings>]")); - assertTrue(data.toString().contains("-Dhlog.bulk.output=/path/for/output")); + assertTrue(data.toString().contains("-Dwal.bulk.output=/path/for/output")); } } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java new file mode 100644 index 0000000..62b0f1d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALKeyRecordReader; +import org.apache.hadoop.hbase.mapreduce.WALInputFormat.WALRecordReader; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.MapReduceTestUtil; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * JUnit tests for the WALRecordReader + */ +@Category(MediumTests.class) +public class TestWALRecordReader { + private final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static Configuration conf; + private static FileSystem fs; + private static Path hbaseDir; + // visible for TestHLogRecordReader + static final TableName tableName = TableName.valueOf(getName()); + private static final byte [] rowName = tableName.getName(); + // visible for TestHLogRecordReader + static final HRegionInfo info = new HRegionInfo(tableName, + Bytes.toBytes(""), Bytes.toBytes(""), false); + private static final byte [] family = Bytes.toBytes("column"); + private static final byte [] value = Bytes.toBytes("value"); + private static HTableDescriptor htd; + private static Path logDir; + + private static String getName() { + return "TestWALRecordReader"; + } + + @Before + public void setUp() throws Exception { + FileStatus[] entries = fs.listStatus(hbaseDir); + for (FileStatus dir : entries) { + fs.delete(dir.getPath(), true); + } + + } + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Make block sizes small. + conf = TEST_UTIL.getConfiguration(); + conf.setInt("dfs.blocksize", 1024 * 1024); + conf.setInt("dfs.replication", 1); + TEST_UTIL.startMiniDFSCluster(1); + + conf = TEST_UTIL.getConfiguration(); + fs = TEST_UTIL.getDFSCluster().getFileSystem(); + + hbaseDir = TEST_UTIL.createRootDir(); + + logDir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); + + htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(family)); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Test partial reads from the log based on passed time range + * @throws Exception + */ + @Test + public void testPartialRead() throws Exception { + final WALFactory walfactory = new WALFactory(conf, null, getName()); + WAL log = walfactory.getWAL(info.getEncodedNameAsBytes()); + // This test depends on timestamp being millisecond based and the filename of the WAL also + // being millisecond based. + long ts = System.currentTimeMillis(); + WALEdit edit = new WALEdit(); + final AtomicLong sequenceId = new AtomicLong(0); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); + log.append(htd, info, getWalKey(ts), edit, sequenceId, true, null); + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); + log.append(htd, info, getWalKey(ts+1), edit, sequenceId, + true, null); + log.sync(); + LOG.info("Before 1st WAL roll " + log.toString()); + log.rollWriter(); + LOG.info("Past 1st WAL roll " + log.toString()); + + Thread.sleep(1); + long ts1 = System.currentTimeMillis(); + + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); + log.append(htd, info, getWalKey(ts1+1), edit, sequenceId, + true, null); + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); + log.append(htd, info, getWalKey(ts1+2), edit, sequenceId, + true, null); + log.sync(); + log.shutdown(); + walfactory.shutdown(); + LOG.info("Closed WAL " + log.toString()); + + + WALInputFormat input = new WALInputFormat(); + Configuration jobConf = new Configuration(conf); + jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); + jobConf.setLong(WALInputFormat.END_TIME_KEY, ts); + + // only 1st file is considered, and only its 1st entry is used + List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); + + assertEquals(1, splits.size()); + testSplit(splits.get(0), Bytes.toBytes("1")); + + jobConf.setLong(WALInputFormat.START_TIME_KEY, ts+1); + jobConf.setLong(WALInputFormat.END_TIME_KEY, ts1+1); + splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); + // both files need to be considered + assertEquals(2, splits.size()); + // only the 2nd entry from the 1st file is used + testSplit(splits.get(0), Bytes.toBytes("2")); + // only the 1nd entry from the 2nd file is used + testSplit(splits.get(1), Bytes.toBytes("3")); + } + + /** + * Test basic functionality + * @throws Exception + */ + @Test + public void testWALRecordReader() throws Exception { + final WALFactory walfactory = new WALFactory(conf, null, getName()); + WAL log = walfactory.getWAL(info.getEncodedNameAsBytes()); + byte [] value = Bytes.toBytes("value"); + final AtomicLong sequenceId = new AtomicLong(0); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), + System.currentTimeMillis(), value)); + long txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true, + null); + log.sync(txid); + + Thread.sleep(1); // make sure 2nd log gets a later timestamp + long secondTs = System.currentTimeMillis(); + log.rollWriter(); + + edit = new WALEdit(); + edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), + System.currentTimeMillis(), value)); + txid = log.append(htd, info, getWalKey(System.currentTimeMillis()), edit, sequenceId, true, + null); + log.sync(txid); + log.shutdown(); + walfactory.shutdown(); + long thirdTs = System.currentTimeMillis(); + + // should have 2 log files now + WALInputFormat input = new WALInputFormat(); + Configuration jobConf = new Configuration(conf); + jobConf.set("mapreduce.input.fileinputformat.inputdir", logDir.toString()); + + // make sure both logs are found + List<InputSplit> splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); + assertEquals(2, splits.size()); + + // should return exactly one KV + testSplit(splits.get(0), Bytes.toBytes("1")); + // same for the 2nd split + testSplit(splits.get(1), Bytes.toBytes("2")); + + // now test basic time ranges: + + // set an endtime, the 2nd log file can be ignored completely. + jobConf.setLong(WALInputFormat.END_TIME_KEY, secondTs-1); + splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); + assertEquals(1, splits.size()); + testSplit(splits.get(0), Bytes.toBytes("1")); + + // now set a start time + jobConf.setLong(WALInputFormat.END_TIME_KEY, Long.MAX_VALUE); + jobConf.setLong(WALInputFormat.START_TIME_KEY, thirdTs); + splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); + // both logs need to be considered + assertEquals(2, splits.size()); + // but both readers skip all edits + testSplit(splits.get(0)); + testSplit(splits.get(1)); + } + + protected WALKey getWalKey(final long sequenceid) { + return new WALKey(info.getEncodedNameAsBytes(), tableName, sequenceid); + } + + protected WALRecordReader getReader() { + return new WALKeyRecordReader(); + } + + /** + * Create a new reader from the split, and match the edits against the passed columns. + */ + private void testSplit(InputSplit split, byte[]... columns) throws Exception { + final WALRecordReader reader = getReader(); + reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); + + for (byte[] column : columns) { + assertTrue(reader.nextKeyValue()); + Cell cell = reader.getCurrentValue().getCells().get(0); + if (!Bytes.equals(column, cell.getQualifier())) { + assertTrue("expected [" + Bytes.toString(column) + "], actual [" + + Bytes.toString(cell.getQualifier()) + "]", false); + } + } + assertFalse(reader.nextKeyValue()); + reader.close(); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/8959828f/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 8aac384..d613852 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -97,7 +97,7 @@ import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; -import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -527,7 +527,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { } @Override - public HLog getWAL(HRegionInfo regionInfo) throws IOException { + public WAL getWAL(HRegionInfo regionInfo) throws IOException { // TODO Auto-generated method stub return null; }