Added: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java?rev=1094662&view=auto
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
 (added)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
 Mon Apr 18 17:16:15 2011
@@ -0,0 +1,280 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.zookeeper.ZKSplitLog.Counters.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
+import org.apache.hadoop.hbase.zookeeper.ZKSplitLog.TaskState;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+
+public class TestSplitLogWorker {
+  private static final Log LOG = LogFactory.getLog(TestSplitLogWorker.class);
+  static {
+    Logger.getLogger("org.apache.hadoop.hbase").setLevel(Level.DEBUG);
+  }
+  private final static HBaseTestingUtility TEST_UTIL =
+    new HBaseTestingUtility();
+  private ZooKeeperWatcher zkw;
+  private SplitLogWorker slw;
+
+  private void waitForCounter(AtomicLong ctr, long oldval, long newval,
+      long timems) {
+    long curt = System.currentTimeMillis();
+    long endt = curt + timems;
+    while (curt < endt) {
+      if (ctr.get() == oldval) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+        }
+        curt = System.currentTimeMillis();
+      } else {
+        assertEquals(newval, ctr.get());
+        return;
+      }
+    }
+    assertTrue(false);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+  }
+
+  @Before
+  public void setup() throws Exception {
+    slw = null;
+    TEST_UTIL.startMiniZKCluster();
+    zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
+        "split-log-worker-tests", null);
+    ZKUtil.deleteChildrenRecursively(zkw, zkw.baseZNode);
+    ZKUtil.createAndFailSilent(zkw, zkw.baseZNode);
+    assertTrue(ZKUtil.checkExists(zkw, zkw.baseZNode) != -1);
+    LOG.debug(zkw.baseZNode + " created");
+    ZKUtil.createAndFailSilent(zkw, zkw.splitLogZNode);
+    assertTrue(ZKUtil.checkExists(zkw, zkw.splitLogZNode) != -1);
+    LOG.debug(zkw.splitLogZNode + " created");
+    resetCounters();
+
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (slw != null) {
+      slw.stop();
+      slw.worker.join(3000);
+      if (slw.worker.isAlive()) {
+        assertTrue("could not stop the worker thread" == null);
+      }
+    }
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  SplitLogWorker.TaskExecutor neverEndingTask =
+    new SplitLogWorker.TaskExecutor() {
+
+      @Override
+      public Status exec(String name, CancelableProgressable p) {
+        while (true) {
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            return Status.PREEMPTED;
+          }
+          if (!p.progress()) {
+            return Status.PREEMPTED;
+          }
+        }
+      }
+
+  };
+
+  @Test
+  public void testAcquireTaskAtStartup() throws Exception {
+    LOG.info("testAcquireTaskAtStartup");
+
+    zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tatas"),
+        TaskState.TASK_UNASSIGNED.get("mgr"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(), "rs",
+        neverEndingTask);
+    slw.start();
+    waitForCounter(tot_wkr_task_acquired, 0, 1, 100);
+    assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
+        ZKSplitLog.getNodeName(zkw, "tatas")), "rs"));
+  }
+
+  @Test
+  public void testRaceForTask() throws Exception {
+    LOG.info("testRaceForTask");
+
+    zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "trft"),
+        TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    SplitLogWorker slw1 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+        "svr1", neverEndingTask);
+    SplitLogWorker slw2 = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+        "svr2", neverEndingTask);
+    slw1.start();
+    slw2.start();
+    waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+    waitForCounter(tot_wkr_failed_to_grab_task_lost_race, 0, 1, 1000);
+    assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
+        ZKSplitLog.getNodeName(zkw, "trft")), "svr1") ||
+        TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
+            ZKSplitLog.getNodeName(zkw, "trft")), "svr2"));
+    slw1.stop();
+    slw2.stop();
+    slw1.worker.join();
+    slw2.worker.join();
+  }
+
+  @Test
+  public void testPreemptTask() throws Exception {
+    LOG.info("testPreemptTask");
+
+    slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+        "tpt_svr", neverEndingTask);
+    slw.start();
+    Thread.yield(); // let the worker start
+    Thread.sleep(100);
+
+    // this time create a task node after starting the splitLogWorker
+    zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tpt_task"),
+        TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+    assertEquals(1, slw.taskReadySeq);
+    assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
+        ZKSplitLog.getNodeName(zkw, "tpt_task")), "tpt_svr"));
+
+    ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "tpt_task"),
+        TaskState.TASK_UNASSIGNED.get("manager"));
+    waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
+  }
+
+  @Test
+  public void testMultipleTasks() throws Exception {
+    LOG.info("testMultipleTasks");
+    slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+        "tmt_svr", neverEndingTask);
+    slw.start();
+    Thread.yield(); // let the worker start
+    Thread.sleep(100);
+
+    zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tmt_task"),
+        TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+    // now the worker is busy doing the above task
+
+    // create another task
+    zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "tmt_task_2"),
+        TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    // preempt the first task, have it owned by another worker
+    ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "tmt_task"),
+        TaskState.TASK_OWNED.get("another-worker"));
+    waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
+
+    waitForCounter(tot_wkr_task_acquired, 1, 2, 1000);
+    assertEquals(2, slw.taskReadySeq);
+    assertTrue(TaskState.TASK_OWNED.equals(ZKUtil.getData(zkw,
+        ZKSplitLog.getNodeName(zkw, "tmt_task_2")), "tmt_svr"));
+  }
+
+  @Test
+  public void testRescan() throws Exception {
+    LOG.info("testRescan");
+    slw = new SplitLogWorker(zkw, TEST_UTIL.getConfiguration(),
+        "svr", neverEndingTask);
+    slw.start();
+    Thread.yield(); // let the worker start
+    Thread.sleep(100);
+
+    zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "task"),
+        TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+
+    waitForCounter(tot_wkr_task_acquired, 0, 1, 1000);
+    // now the worker is busy doing the above task
+
+    // preempt the task, have it owned by another worker
+    ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "task"),
+        TaskState.TASK_UNASSIGNED.get("manager"));
+    waitForCounter(tot_wkr_preempt_task, 0, 1, 1000);
+
+    // create a RESCAN node
+    zkw.getZooKeeper().create(ZKSplitLog.getNodeName(zkw, "RESCAN"),
+        TaskState.TASK_UNASSIGNED.get("manager"), Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT_SEQUENTIAL);
+
+    waitForCounter(tot_wkr_task_acquired, 1, 2, 1000);
+    // RESCAN node might not have been processed if the worker became busy
+    // with the above task. preempt the task again so that now the RESCAN
+    // node is processed
+    ZKUtil.setData(zkw, ZKSplitLog.getNodeName(zkw, "task"),
+        TaskState.TASK_UNASSIGNED.get("manager"));
+    waitForCounter(tot_wkr_preempt_task, 1, 2, 1000);
+    waitForCounter(tot_wkr_task_acquired_rescan, 0, 1, 1000);
+
+    List<String> nodes = ZKUtil.listChildrenNoWatch(zkw, zkw.splitLogZNode);
+    LOG.debug(nodes);
+    int num = 0;
+    for (String node : nodes) {
+      num++;
+      if (node.startsWith("RESCAN")) {
+        assertTrue(TaskState.TASK_DONE.equals(ZKUtil.getData(zkw,
+            ZKSplitLog.getNodeName(zkw, node)), "svr"));
+      }
+    }
+    assertEquals(2, num);
+  }
+}

Modified: 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
URL: 
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java?rev=1094662&r1=1094661&r2=1094662&view=diff
==============================================================================
--- 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
 (original)
+++ 
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java
 Mon Apr 18 17:16:15 2011
@@ -1,5 +1,5 @@
 /**
- * Copyright 2010 The Apache Software Foundation
+ * Copyright 2011 The Apache Software Foundation
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HTableDes
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.ipc.RemoteException;
@@ -150,7 +151,7 @@ public class TestHLogSplit {
   }
 
   /**
-   * @throws IOException 
+   * @throws IOException
    * @see https://issues.apache.org/jira/browse/HBASE-3020
    */
   @Test public void testRecoveredEditsPathForMeta() throws IOException {
@@ -164,7 +165,7 @@ public class TestHLogSplit {
     HLog.Entry entry =
       new HLog.Entry(new HLogKey(encoded, HConstants.META_TABLE_NAME, 1, now),
       new WALEdit());
-    Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir);
+    Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir, true);
     String parentOfParent = p.getParent().getParent().getName();
     assertEquals(parentOfParent, 
HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
   }
@@ -173,13 +174,13 @@ public class TestHLogSplit {
   public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
   throws IOException {
     AtomicBoolean stop = new AtomicBoolean(false);
-    
+
     FileStatus[] stats = fs.listStatus(new Path("/hbase/t1"));
     assertTrue("Previous test should clean up table dir",
         stats == null || stats.length == 0);
 
     generateHLogs(-1);
-    
+
     try {
     (new ZombieNewLogWriterRegionServer(stop)).start();
     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
@@ -456,7 +457,7 @@ public class TestHLogSplit {
     FileStatus[] archivedLogs = fs.listStatus(corruptDir);
     assertEquals(archivedLogs.length, 0);
   }
-  
+
   @Test
   public void testLogsGetArchivedAfterSplit() throws IOException {
     conf.setBoolean(HBASE_SKIP_ERRORS, false);
@@ -507,7 +508,7 @@ public class TestHLogSplit {
       // hadoop 0.21 throws FNFE whereas hadoop 0.20 returns null
     }
   }
-/* DISABLED for now.  TODO: HBASE-2645 
+/* DISABLED for now.  TODO: HBASE-2645
   @Test
   public void testLogCannotBeWrittenOnceParsed() throws IOException {
     AtomicLong counter = new AtomicLong(0);
@@ -545,7 +546,7 @@ public class TestHLogSplit {
     generateHLogs(-1);
     fs.initialize(fs.getUri(), conf);
     Thread zombie = new ZombieNewLogWriterRegionServer(stop);
-    
+
     try {
       zombie.start();
       try {
@@ -644,10 +645,10 @@ public class TestHLogSplit {
     HLogSplitter logSplitter = HLogSplitter.createLogSplitter(conf,
         hbaseDir, hlogDir, oldLogDir, fs);
     logSplitter.splitLog();
-    
+
     assertFalse(fs.exists(regiondir));
   }
-  
+
   @Test
   public void testIOEOnOutputThread() throws Exception {
     conf.setBoolean(HBASE_SKIP_ERRORS, false);
@@ -663,7 +664,7 @@ public class TestHLogSplit {
         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
         Mockito.doThrow(new 
IOException("Injected")).when(mockWriter).append(Mockito.<HLog.Entry>any());
         return mockWriter;
-        
+
       }
     };
     try {
@@ -700,7 +701,7 @@ public class TestHLogSplit {
       fail("There shouldn't be any exception but: " + e.toString());
     }
   }
-  
+
   /**
    * Test log split process with fake data and lots of edits to trigger 
threading
    * issues.
@@ -709,7 +710,7 @@ public class TestHLogSplit {
   public void testThreading() throws Exception {
     doTestThreading(20000, 128*1024*1024, 0);
   }
-  
+
   /**
    * Test blocking behavior of the log split process if writers are writing 
slower
    * than the reader is reading.
@@ -718,7 +719,7 @@ public class TestHLogSplit {
   public void testThreadingSlowWriterSmallBuffer() throws Exception {
     doTestThreading(200, 1024, 50);
   }
-  
+
   /**
    * Sets up a log splitter with a mock reader and writer. The mock reader 
generates
    * a specified number of edits spread across 5 regions. The mock writer 
optionally
@@ -726,7 +727,7 @@ public class TestHLogSplit {
    * *
    * After the split is complete, verifies that the statistics show the 
correct number
    * of edits output into each region.
-   * 
+   *
    * @param numFakeEdits number of fake edits to push through pipeline
    * @param bufferSize size of in-memory buffer
    * @param writerSlowness writer threads will sleep this many ms per edit
@@ -743,20 +744,20 @@ public class TestHLogSplit {
     out.close();
 
     // Make region dirs for our destination regions so the output doesn't get 
skipped
-    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", 
"r4"); 
+    final List<String> regions = ImmutableList.of("r0", "r1", "r2", "r3", 
"r4");
     makeRegionDirs(fs, regions);
 
     // Create a splitter that reads and writes the data without touching disk
     HLogSplitter logSplitter = new HLogSplitter(
         localConf, hbaseDir, hlogDir, oldLogDir, fs) {
-      
+
       /* Produce a mock writer that doesn't write anywhere */
       protected HLog.Writer createWriter(FileSystem fs, Path logfile, 
Configuration conf)
       throws IOException {
         HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
         Mockito.doAnswer(new Answer<Void>() {
           int expectedIndex = 0;
-          
+
           @Override
           public Void answer(InvocationOnMock invocation) {
             if (writerSlowness > 0) {
@@ -771,17 +772,17 @@ public class TestHLogSplit {
             List<KeyValue> keyValues = edit.getKeyValues();
             assertEquals(1, keyValues.size());
             KeyValue kv = keyValues.get(0);
-            
+
             // Check that the edits come in the right order.
             assertEquals(expectedIndex, Bytes.toInt(kv.getRow()));
             expectedIndex++;
             return null;
           }
         }).when(mockWriter).append(Mockito.<HLog.Entry>any());
-        return mockWriter;        
+        return mockWriter;
       }
-      
-      
+
+
       /* Produce a mock reader that generates fake entries */
       protected Reader getReader(FileSystem fs, Path curLogFile, Configuration 
conf)
       throws IOException {
@@ -792,11 +793,11 @@ public class TestHLogSplit {
           @Override
           public HLog.Entry answer(InvocationOnMock invocation) throws 
Throwable {
             if (index >= numFakeEdits) return null;
-           
+
             // Generate r0 through r4 in round robin fashion
             int regionIdx = index % regions.size();
             byte region[] = new byte[] {(byte)'r', (byte) (0x30 + regionIdx)};
-            
+
             HLog.Entry ret = createTestEntry(TABLE_NAME, region,
                 Bytes.toBytes((int)(index / regions.size())),
                 FAMILY, QUALIFIER, VALUE, index);
@@ -807,22 +808,22 @@ public class TestHLogSplit {
         return mockReader;
       }
     };
-    
+
     logSplitter.splitLog();
-    
+
     // Verify number of written edits per region
 
     Map<byte[], Long> outputCounts = logSplitter.getOutputCounts();
     for (Map.Entry<byte[], Long> entry : outputCounts.entrySet()) {
-      LOG.info("Got " + entry.getValue() + " output edits for region " + 
+      LOG.info("Got " + entry.getValue() + " output edits for region " +
           Bytes.toString(entry.getKey()));
-      
+
       assertEquals((long)entry.getValue(), numFakeEdits / regions.size());
     }
     assertEquals(regions.size(), outputCounts.size());
   }
-  
-  
+
+
 
   /**
    * This thread will keep writing to the file after the split process has 
started
@@ -849,7 +850,7 @@ public class TestHLogSplit {
       while (true) {
         try {
           String region = "juliet";
-          
+
           fs.mkdirs(new Path(new Path(hbaseDir, region), region));
           appendEntry(lastLogWriter, TABLE_NAME, region.getBytes(),
                   ("r" + editsCount).getBytes(), FAMILY, QUALIFIER, VALUE, 0);
@@ -896,7 +897,7 @@ public class TestHLogSplit {
         return;
       }
       Path tableDir = new Path(hbaseDir, new String(TABLE_NAME));
-      Path regionDir = new Path(tableDir, regions.get(0));      
+      Path regionDir = new Path(tableDir, regions.get(0));
       Path recoveredEdits = new Path(regionDir, HLogSplitter.RECOVERED_EDITS);
       String region = "juliet";
       Path julietLog = new Path(hlogDir, HLOG_FILE_PREFIX + ".juliet");
@@ -906,12 +907,12 @@ public class TestHLogSplit {
           flushToConsole("Juliet: split not started, sleeping a bit...");
           Threads.sleep(10);
         }
-
+ 
         fs.mkdirs(new Path(tableDir, region));
         HLog.Writer writer = HLog.createWriter(fs,
-                julietLog, conf);
+            julietLog, conf);
         appendEntry(writer, "juliet".getBytes(), ("juliet").getBytes(),
-                ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
+            ("r").getBytes(), FAMILY, QUALIFIER, VALUE, 0);
         writer.close();
         flushToConsole("Juliet file creator: created file " + julietLog);
       } catch (IOException e1) {
@@ -920,6 +921,99 @@ public class TestHLogSplit {
     }
   }
 
+  private CancelableProgressable reporter = new CancelableProgressable() {
+    int count = 0;
+
+    @Override
+    public boolean progress() {
+      count++;
+      LOG.debug("progress = " + count);
+      return true;
+    }
+  };
+
+  @Test
+  public void testSplitLogFileWithOneRegion() throws IOException {
+    LOG.info("testSplitLogFileWithOneRegion");
+    final String REGION = "region__1";
+    regions.removeAll(regions);
+    regions.add(REGION);
+
+
+    generateHLogs(1, 10, -1);
+    FileStatus logfile = fs.listStatus(hlogDir)[0];
+    fs.initialize(fs.getUri(), conf);
+    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
+        conf, reporter);
+    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+        logfile.getPath().toString(), conf);
+
+
+    Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
+    Path splitLog = getLogForRegion(hbaseDir, TABLE_NAME, REGION);
+
+
+    assertEquals(true, logsAreEqual(originalLog, splitLog));
+  }
+
+  @Test
+  public void testSplitLogFileEmpty() throws IOException {
+    LOG.info("testSplitLogFileEmpty");
+    injectEmptyFile(".empty", true);
+    FileStatus logfile = fs.listStatus(hlogDir)[0];
+
+    fs.initialize(fs.getUri(), conf);
+
+    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
+        conf, reporter);
+    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+        logfile.getPath().toString(), conf);
+    Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME);
+    FileStatus [] files = this.fs.listStatus(tdir);
+    assertTrue(files == null || files.length == 0);
+
+    assertEquals(0, countHLog(fs.listStatus(oldLogDir)[0].getPath(), fs, 
conf));
+  }
+
+  @Test
+  public void testSplitLogFileMultipleRegions() throws IOException {
+    LOG.info("testSplitLogFileMultipleRegions");
+    generateHLogs(1, 10, -1);
+    FileStatus logfile = fs.listStatus(hlogDir)[0];
+    fs.initialize(fs.getUri(), conf);
+
+    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
+        conf, reporter);
+    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+        logfile.getPath().toString(), conf);
+    for (String region : regions) {
+      Path recovered = getLogForRegion(hbaseDir, TABLE_NAME, region);
+      assertEquals(10, countHLog(recovered, fs, conf));
+    }
+  }
+
+  @Test
+  public void testSplitLogFileFirstLineCorruptionLog()
+  throws IOException {
+    conf.setBoolean(HBASE_SKIP_ERRORS, true);
+    generateHLogs(1, 10, -1);
+    FileStatus logfile = fs.listStatus(hlogDir)[0];
+
+    corruptHLog(logfile.getPath(),
+        Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
+
+    fs.initialize(fs.getUri(), conf);
+    HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
+        conf, reporter);
+    HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
+        logfile.getPath().toString(), conf);
+
+    final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
+        "hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));
+    assertEquals(1, fs.listStatus(corruptDir).length);
+  }
+
+
   private void flushToConsole(String s) {
     System.out.println(s);
     System.out.flush();
@@ -936,7 +1030,7 @@ public class TestHLogSplit {
       fs.mkdirs(new Path(tabledir, region));
     }
   }
-  
+
   private void generateHLogs(int writers, int entries, int leaveOpen) throws 
IOException {
     makeRegionDirs(fs, regions);
     for (int i = 0; i < writers; i++) {
@@ -1001,13 +1095,13 @@ public class TestHLogSplit {
         out.write(corrupted_bytes, middle, corrupted_bytes.length - middle);
         closeOrFlush(close, out);
         break;
-        
+
       case TRUNCATE:
         fs.delete(path, false);
         out = fs.create(path);
         out.write(corrupted_bytes, 0, fileSize-32);
         closeOrFlush(close, out);
-        
+
         break;
     }
 
@@ -1052,7 +1146,7 @@ public class TestHLogSplit {
     writer.sync();
     return seq;
   }
-  
+
   private HLog.Entry createTestEntry(
       byte[] table, byte[] region,
       byte[] row, byte[] family, byte[] qualifier,
@@ -1085,7 +1179,7 @@ public class TestHLogSplit {
     FileStatus[] f2 = fs.listStatus(p2);
     assertNotNull("Path " + p1 + " doesn't exist", f1);
     assertNotNull("Path " + p2 + " doesn't exist", f2);
-    
+
     System.out.println("Files in " + p1 + ": " +
         Joiner.on(",").join(FileUtil.stat2Paths(f1)));
     System.out.println("Files in " + p2 + ": " +


Reply via email to