Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java?rev=1094662&view=auto ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java (added) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java Mon Apr 18 17:16:15 2011 @@ -0,0 +1,280 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + + +public class TestSplitLogWorker { + private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class); + static { + Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG); + } + private final static HBaseTestingUtility TEST_UTIL = + new HBaseTestingUtility(); + private ZooKeeperWatcher zkw; + private SplitLogWorker slw; + + private void waitForCounter(AtomicLong ctr, long oldval, long newval, + long timems) { + long curt = System.currentTimeMillis(); + long endt = curt + timems; + while (curt < endt) { + if (ctr.get() == oldval) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + curt = System.currentTimeMillis(); + } else { + assertEquals(newval, ctr.get()); + return; + } + } + assertTrue(false); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + } + + @Before + public void setup() throws Exception { + slw = null; + TEST_UTIL.startMiniZKCluster(); + zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), + "split-log-worker-tests", null); + ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode); + ZKUtil.createAndFailSilent(zkw, zkw.baseZNode); + assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1); + LOG.debug(zkw.baseZNode + " created"); + ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode); + assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1); + LOG.debug(zkw.splitLogZNode + " created"); + resetCounters(); + + } + + @After + public void teardown() throws Exception { + if (slw != null) { + slw.stop(); + slw.worker.join(3000); + if (slw.worker.isAlive()) { + assertTrue("could not stop the worker thread" == null); + } + } + TEST_UTIL.shutdownMiniZKCluster(); + } + + SplitLogWorker.TaskExecutor neverEndingTask = + new SplitLogWorker.TaskExecutor() { + + @Override + public Status exec(String name, CancelableProgressable p) { + while (true) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + return Status.PREEMPTED; + } + if (!p.progress()) { + return Status.PREEMPTED; + } + } + } + + }; + + @Test + public void testAcquireTaskAtStartup() throws Exception { + LOG.info("testAcquireTaskAtStartup"); + + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tatas"), + TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), "rs", + neverEndingTask); + slw.start(); + waitForCounter(tot_wkr_task_acquired, 0, 1, 100); + assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + ZKSplitLog.getNodeName(zkw, "tatas")), "rs")); + } + + @Test + public void testRaceForTask() throws Exception { + LOG.info("testRaceForTask"); + + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "trft"), + TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + "svr1", neverEndingTask); + SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + "svr2", neverEndingTask); + slw1.start(); + slw2.start(); + waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + waitForCounter(tot_wkr_failed_to_grab_task_lost_race, 0, 1, 1000); + assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + ZKSplitLog.getNodeName(zkw, "trft")), "svr1") || + TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + ZKSplitLog.getNodeName(zkw, "trft")), "svr2")); + slw1.stop(); + slw2.stop(); + slw1.worker.join(); + slw2.worker.join(); + } + + @Test + public void testPreemptTask() throws Exception { + LOG.info("testPreemptTask"); + + slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + "tpt_svr", neverEndingTask); + slw.start(); + Thread.yield(); // let the worker start + Thread.sleep(100); + + // this time create a task node after starting the splitLogWorker + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tpt_task"), + TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + assertEquals(1, slw.taskReadySeq); + assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + ZKSplitLog.getNodeName(zkw, "tpt_task")), "tpt_svr")); + + ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "tpt_task"), + TaskState.TASK_UNASSIGNED.get("manager")); + waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); + } + + @Test + public void testMultipleTasks() throws Exception { + LOG.info("testMultipleTasks"); + slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + "tmt_svr", neverEndingTask); + slw.start(); + Thread.yield(); // let the worker start + Thread.sleep(100); + + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tmt_task"), + TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + // now the worker is busy doing the above task + + // create another task + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tmt_task_2"), + TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + // preempt the first task, have it owned by another worker + ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "tmt_task"), + TaskState.TASK_OWNED.get("another-worker")); + waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); + + waitForCounter(tot_wkr_task_acquired, 1, 2, 1000); + assertEquals(2, slw.taskReadySeq); + assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw, + ZKSplitLog.getNodeName(zkw, "tmt_task_2")), "tmt_svr")); + } + + @Test + public void testRescan() throws Exception { + LOG.info("testRescan"); + slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), + "svr", neverEndingTask); + slw.start(); + Thread.yield(); // let the worker start + Thread.sleep(100); + + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "task"), + TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + waitForCounter(tot_wkr_task_acquired, 0, 1, 1000); + // now the worker is busy doing the above task + + // preempt the task, have it owned by another worker + ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "task"), + TaskState.TASK_UNASSIGNED.get("manager")); + waitForCounter(tot_wkr_preempt_task, 0, 1, 1000); + + // create a RESCAN node + zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "RESCAN"), + TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL); + + waitForCounter(tot_wkr_task_acquired, 1, 2, 1000); + // RESCAN node might not have been processed if the worker became busy + // with the above task. preempt the task again so that now the RESCAN + // node is processed + ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "task"), + TaskState.TASK_UNASSIGNED.get("manager")); + waitForCounter(tot_wkr_preempt_task, 1, 2, 1000); + waitForCounter(tot_wkr_task_acquired_rescan, 0, 1, 1000); + + List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode); + LOG.debug(nodes); + int num = 0; + for (String node : nodes) { + num++; + if (node.startsWith("RESCAN")) { + assertTrue(TaskState.TASK_DONE.equals(ZKUtil.getData(zkw, + ZKSplitLog.getNodeName(zkw, node)), "svr")); + } + } + assertEquals(2, num); + } +}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1094662&r1=1094661&r2=1094662&view=diff ============================================================================== --- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java (original) +++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java Mon Apr 18 17:16:15 2011 @@ -1,5 +1,5 @@ /** - * Copyright 2010 The Apache Software Foundation + * Copyright 2011 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HTableDes import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.ipc.RemoteException; @@ -150,7 +151,7 @@ public class TestHLogSplit { } /** - * @throws IOException + * @throws IOException * @see https://issues.apache.org/jira/browse/HBASE-3020 */ @Test public void testRecoveredEditsPathForMeta() throws IOException { @@ -164,7 +165,7 @@ public class TestHLogSplit { HLog.Entry entry = new HLog.Entry(new HLogKey(encoded, HConstants.META_TABLE_NAME, 1, now), new WALEdit()); - Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir); + Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir, true); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); } @@ -173,13 +174,13 @@ public class TestHLogSplit { public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted() throws IOException { AtomicBoolean stop = new AtomicBoolean(false); - + FileStatus[] stats = fs.listStatus(new Path("/hbase/t1")); assertTrue("Previous test should clean up table dir", stats == null || stats.length == 0); generateHLogs(-1); - + try { (new ZombieNewLogWriterRegionServer(stop)).start(); HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, @@ -456,7 +457,7 @@ public class TestHLogSplit { FileStatus[] archivedLogs = fs.listStatus(corruptDir); assertEquals(archivedLogs.length, 0); } - + @Test public void testLogsGetArchivedAfterSplit() throws IOException { conf.setBoolean(HBASE_SKIP_ERRORS, false); @@ -507,7 +508,7 @@ public class TestHLogSplit { // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null } } -/* DISABLED for now. TODO: HBASE-2645 +/* DISABLED for now. TODO: HBASE-2645 @Test public void testLogCannotBeWrittenOnceParsed() throws IOException { AtomicLong counter = new AtomicLong(0); @@ -545,7 +546,7 @@ public class TestHLogSplit { generateHLogs(-1); fs.initialize(fs.getUri(), conf); Thread zombie = new ZombieNewLogWriterRegionServer(stop); - + try { zombie.start(); try { @@ -644,10 +645,10 @@ public class TestHLogSplit { HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf, hbaseDir, hlogDir, oldLogDir, fs); logSplitter.splitLog(); - + assertFalse(fs.exists(regiondir)); } - + @Test public void testIOEOnOutputThread() throws Exception { conf.setBoolean(HBASE_SKIP_ERRORS, false); @@ -663,7 +664,7 @@ public class TestHLogSplit { HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); Mockito.doThrow(new IOException("Injected")).when(mockWriter).append(Mockito.<HLog.Entry>any()); return mockWriter; - + } }; try { @@ -700,7 +701,7 @@ public class TestHLogSplit { fail("There shouldn't be any exception but: " + e.toString()); } } - + /** * Test log split process with fake data and lots of edits to trigger threading * issues. @@ -709,7 +710,7 @@ public class TestHLogSplit { public void testThreading() throws Exception { doTestThreading(20000, 128*1024*1024, 0); } - + /** * Test blocking behavior of the log split process if writers are writing slower * than the reader is reading. @@ -718,7 +719,7 @@ public class TestHLogSplit { public void testThreadingSlowWriterSmallBuffer() throws Exception { doTestThreading(200, 1024, 50); } - + /** * Sets up a log splitter with a mock reader and writer. The mock reader generates * a specified number of edits spread across 5 regions. The mock writer optionally @@ -726,7 +727,7 @@ public class TestHLogSplit { * * * After the split is complete, verifies that the statistics show the correct number * of edits output into each region. - * + * * @param numFakeEdits number of fake edits to push through pipeline * @param bufferSize size of in-memory buffer * @param writerSlowness writer threads will sleep this many ms per edit @@ -743,20 +744,20 @@ public class TestHLogSplit { out.close(); // Make region dirs for our destination regions so the output doesn't get skipped - final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); + final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", "r4"); makeRegionDirs(fs, regions); // Create a splitter that reads and writes the data without touching disk HLogSplitter logSplitter = new HLogSplitter( localConf, hbaseDir, hlogDir, oldLogDir, fs) { - + /* Produce a mock writer that doesn't write anywhere */ protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf) throws IOException { HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class); Mockito.doAnswer(new Answer<Void>() { int expectedIndex = 0; - + @Override public Void answer(InvocationOnMock invocation) { if (writerSlowness > 0) { @@ -771,17 +772,17 @@ public class TestHLogSplit { List<KeyValue> keyValues = edit.getKeyValues(); assertEquals(1, keyValues.size()); KeyValue kv = keyValues.get(0); - + // Check that the edits come in the right order. assertEquals(expectedIndex, Bytes.toInt(kv.getRow())); expectedIndex++; return null; } }).when(mockWriter).append(Mockito.<HLog.Entry>any()); - return mockWriter; + return mockWriter; } - - + + /* Produce a mock reader that generates fake entries */ protected Reader getReader(FileSystem fs, Path curLogFile, Configuration conf) throws IOException { @@ -792,11 +793,11 @@ public class TestHLogSplit { @Override public HLog.Entry answer(InvocationOnMock invocation) throws Throwable { if (index >= numFakeEdits) return null; - + // Generate r0 through r4 in round robin fashion int regionIdx = index % regions.size(); byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)}; - + HLog.Entry ret = createTestEntry(TABLE_NAME, region, Bytes.toBytes((int)(index / regions.size())), FAMILY, QUALIFIER, VALUE, index); @@ -807,22 +808,22 @@ public class TestHLogSplit { return mockReader; } }; - + logSplitter.splitLog(); - + // Verify number of written edits per region Map<byte[], Long> outputCounts = logSplitter.getOutputCounts(); for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) { - LOG.info("Got " + entry.getValue() + " output edits for region " + + LOG.info("Got " + entry.getValue() + " output edits for region " + Bytes.toString(entry.getKey())); - + assertEquals((long)entry.getValue(), numFakeEdits / regions.size()); } assertEquals(regions.size(), outputCounts.size()); } - - + + /** * This thread will keep writing to the file after the split process has started @@ -849,7 +850,7 @@ public class TestHLogSplit { while (true) { try { String region = "juliet"; - + fs.mkdirs(new Path(new Path(hbaseDir, region), region)); appendEntry(lastLogWriter, TABLE_NAME, region.getBytes(), ("r" + editsCount).getBytes(), FAMILY, QUALIFIER, VALUE, 0); @@ -896,7 +897,7 @@ public class TestHLogSplit { return; } Path tableDir = new Path(hbaseDir, new String(TABLE_NAME)); - Path regionDir = new Path(tableDir, regions.get(0)); + Path regionDir = new Path(tableDir, regions.get(0)); Path recoveredEdits = new Path(regionDir, HLogSplitter.RECOVERED_EDITS); String region = "juliet"; Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet"); @@ -906,12 +907,12 @@ public class TestHLogSplit { flushToConsole("Juliet: split not started, sleeping a bit..."); Threads.sleep(10); } - + fs.mkdirs(new Path(tableDir, region)); HLog.Writer writer = HLog.createWriter(fs, - julietLog, conf); + julietLog, conf); appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(), - ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0); + ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0); writer.close(); flushToConsole("Juliet file creator: created file " + julietLog); } catch (IOException e1) { @@ -920,6 +921,99 @@ public class TestHLogSplit { } } + private CancelableProgressable reporter = new CancelableProgressable() { + int count = 0; + + @Override + public boolean progress() { + count++; + LOG.debug("progress = " + count); + return true; + } + }; + + @Test + public void testSplitLogFileWithOneRegion() throws IOException { + LOG.info("testSplitLogFileWithOneRegion"); + final String REGION = "region__1"; + regions.removeAll(regions); + regions.add(REGION); + + + generateHLogs(1, 10, -1); + FileStatus logfile = fs.listStatus(hlogDir)[0]; + fs.initialize(fs.getUri(), conf); + HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, + conf, reporter); + HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, + logfile.getPath().toString(), conf); + + + Path originalLog = (fs.listStatus(oldLogDir))[0].getPath(); + Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION); + + + assertEquals(true, logsAreEqual(originalLog, splitLog)); + } + + @Test + public void testSplitLogFileEmpty() throws IOException { + LOG.info("testSplitLogFileEmpty"); + injectEmptyFile(".empty", true); + FileStatus logfile = fs.listStatus(hlogDir)[0]; + + fs.initialize(fs.getUri(), conf); + + HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, + conf, reporter); + HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, + logfile.getPath().toString(), conf); + Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME); + FileStatus [] files = this.fs.listStatus(tdir); + assertTrue(files == null || files.length == 0); + + assertEquals(0, countHLog(fs.listStatus(oldLogDir)[0].getPath(), fs, conf)); + } + + @Test + public void testSplitLogFileMultipleRegions() throws IOException { + LOG.info("testSplitLogFileMultipleRegions"); + generateHLogs(1, 10, -1); + FileStatus logfile = fs.listStatus(hlogDir)[0]; + fs.initialize(fs.getUri(), conf); + + HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, + conf, reporter); + HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, + logfile.getPath().toString(), conf); + for (String region : regions) { + Path recovered = getLogForRegion(hbaseDir, TABLE_NAME, region); + assertEquals(10, countHLog(recovered, fs, conf)); + } + } + + @Test + public void testSplitLogFileFirstLineCorruptionLog() + throws IOException { + conf.setBoolean(HBASE_SKIP_ERRORS, true); + generateHLogs(1, 10, -1); + FileStatus logfile = fs.listStatus(hlogDir)[0]; + + corruptHLog(logfile.getPath(), + Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs); + + fs.initialize(fs.getUri(), conf); + HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs, + conf, reporter); + HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir, + logfile.getPath().toString(), conf); + + final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get( + "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt")); + assertEquals(1, fs.listStatus(corruptDir).length); + } + + private void flushToConsole(String s) { System.out.println(s); System.out.flush(); @@ -936,7 +1030,7 @@ public class TestHLogSplit { fs.mkdirs(new Path(tabledir, region)); } } - + private void generateHLogs(int writers, int entries, int leaveOpen) throws IOException { makeRegionDirs(fs, regions); for (int i = 0; i < writers; i++) { @@ -1001,13 +1095,13 @@ public class TestHLogSplit { out.write(corrupted_bytes, middle, corrupted_bytes.length - middle); closeOrFlush(close, out); break; - + case TRUNCATE: fs.delete(path, false); out = fs.create(path); out.write(corrupted_bytes, 0, fileSize-32); closeOrFlush(close, out); - + break; } @@ -1052,7 +1146,7 @@ public class TestHLogSplit { writer.sync(); return seq; } - + private HLog.Entry createTestEntry( byte[] table, byte[] region, byte[] row, byte[] family, byte[] qualifier, @@ -1085,7 +1179,7 @@ public class TestHLogSplit { FileStatus[] f2 = fs.listStatus(p2); assertNotNull("Path " + p1 + " doesn't exist", f1); assertNotNull("Path " + p2 + " doesn't exist", f2); - + System.out.println("Files in " + p1 + ": " + Joiner.on(",").join(FileUtil.stat2Paths(f1))); System.out.println("Files in " + p2 + ": " +
