Author: mbautin Date: Thu Mar 1 19:05:25 2012 New Revision: 1295744 URL: http://svn.apache.org/viewvc?rev=1295744&view=rev Log: [master] Unit test for log splitting on master failover
Summary: While trying to improve master failover I realized that I would have to modify the way we detect what logs to split on master startup. This is a unit test that starts a load, waits until half of the data is loaded, kills a regionserver, then kills the master, lets the backup master take over, and verifies that only the dead regionserver's logs have been split. The data load continues in background and when it finishes, the loaded data is verified, too. The test runs in two modes: regular and distributed log splitting. This patch also imports a few useful pieces of code from the trunk. Test Plan: Run all unit tests Reviewers: pkhemani, kranganathan, liyintang, kannan Reviewed By: pkhemani Differential Revision: https://reviews.facebook.net/D2019 Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestLogSplitOnMasterFailover.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/MultiMasterTest.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1295744&r1=1295743&r2=1295744&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Thu Mar 1 19:05:25 2012 @@ -446,6 +446,13 @@ public final class HConstants { public static final float HFILE_BLOCK_CACHE_SIZE_DEFAULT = 0.25f; + /** The delay when re-trying a socket operation in a loop (HBASE-4712) */ + public static final int SOCKET_RETRY_WAIT_MS = 200; + + /** Conf key that enables distributed log splitting */ + public static final String DISTRIBUTED_LOG_SPLITTING_KEY = + "hbase.master.distributed.log.splitting"; + private HConstants() { // Can't be instantiated with this ctor. } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1295744&r1=1295743&r2=1295744&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Thu Mar 1 19:05:25 2012 @@ -68,7 +68,6 @@ import org.apache.hadoop.ipc.RemoteExcep */ public class HBaseAdmin { private final Log LOG = LogFactory.getLog(this.getClass().getName()); -// private final HConnection connection; final HConnection connection; private volatile Configuration conf; private final long pause; @@ -1256,4 +1255,10 @@ public class HBaseAdmin { copyOfConf.setInt("hbase.client.retries.number", 1); new HBaseAdmin(copyOfConf); } + + public void close() throws IOException { + if (this.connection != null) { + connection.close(); + } + } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1295744&r1=1295743&r2=1295744&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Thu Mar 1 19:05:25 2012 @@ -19,6 +19,13 @@ */ package org.apache.hadoop.hbase.client; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; @@ -28,17 +35,11 @@ import org.apache.hadoop.hbase.ipc.HMast import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; - /** * Cluster connection. * {@link HConnectionManager} manages instances of this class. */ -public interface HConnection { +public interface HConnection extends Closeable { /** * Retrieve ZooKeeperWrapper used by the connection. * @return ZooKeeperWrapper handle being used by the connection. Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1295744&r1=1295743&r2=1295744&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Thu Mar 1 19:05:25 2012 @@ -1563,6 +1563,10 @@ public class HConnectionManager { } } + public void close() { + close(true); + } + /** * Process a batch of Puts on the given executor service. * Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1295744&r1=1295743&r2=1295744&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Thu Mar 1 19:05:25 2012 @@ -98,7 +98,6 @@ import org.apache.hadoop.hbase.monitorin import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; @@ -212,6 +211,9 @@ public class HMaster extends Thread impl public ThreadPoolExecutor logSplitThreadPool; + /** Log directories split on startup for testing master failover */ + private List<String> logDirsSplitOnStartup; + /** * Constructor * @param conf configuration @@ -262,7 +264,7 @@ public class HMaster extends Thread impl checkRootDir(this.rootdir, this.conf, this.fs); this.distributedLogSplitting = conf.getBoolean( - "hbase.master.distributed.log.splitting", false); + HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, false); this.splitLogManager = null; // Make sure the region servers can archive their old logs @@ -955,7 +957,11 @@ public class HMaster extends Thread impl try { Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME); - if (!this.fs.exists(logsDirPath)) return; + if (!this.fs.exists(logsDirPath)) { + LOG.debug("Log directory " + logsDirPath + + " does not exist, no logs to split"); + return; + } FileStatus[] logFolders = this.fs.listStatus(logsDirPath); if (logFolders == null || logFolders.length == 0) { LOG.debug("No log files to split, proceeding..."); @@ -974,6 +980,7 @@ public class HMaster extends Thread impl " belongs to an existing region server"); } } + logDirsSplitOnStartup = serverNames; splitLog(serverNames); retrySplitting = false; @@ -1979,4 +1986,8 @@ public class HMaster extends Thread impl return this.splitLogManager; } + List<String> getLogDirsSplitOnStartup() { + return logDirsSplitOnStartup; + } + } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1295744&r1=1295743&r2=1295744&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Mar 1 19:05:25 2012 @@ -715,8 +715,8 @@ public class HRegionServer implements HR this.hbaseMaster = null; } + this.zooKeeperWrapper.close(); if (!killed) { - this.zooKeeperWrapper.close(); join(); if ((this.fs != null) && (stopRequested.get() || abortRequested)) { // Finally attempt to close the Filesystem, to flush out any open streams. Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1295744&r1=1295743&r2=1295744&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Thu Mar 1 19:05:25 2012 @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.util.Thre import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -244,6 +245,8 @@ public class HLog implements Syncable { */ private static final Pattern pattern = Pattern.compile(".*\\.\\d*"); + private static final FileStatus[] NO_FILES = new FileStatus[0]; + static byte [] COMPLETE_CACHE_FLUSH; static { try { @@ -1354,6 +1357,9 @@ public class HLog implements Syncable { splits = splitLog(rootDir, srcDir, oldLogDir, logfiles, fs, conf, maxWriteTime); try { FileStatus[] files = fs.listStatus(srcDir); + if (files == null) { + files = NO_FILES; + } for(FileStatus file : files) { Path newPath = getHLogArchivePath(oldLogDir, file.getPath()); LOG.info("Moving " + FSUtils.getPath(file.getPath()) + " to " + @@ -1509,7 +1515,12 @@ public class HLog implements Syncable { writeEditsBatchToRegions(editsByRegion, logWriters, rootDir, fs, conf); } - if (fs.listStatus(srcDir).length > processedLogs.size() + + Preconditions.checkNotNull(fs); + Preconditions.checkNotNull(srcDir); + Preconditions.checkNotNull(processedLogs); + Preconditions.checkNotNull(corruptedLogs); + FileStatus[] srcDirList = fs.listStatus(srcDir); + if (srcDirList != null && srcDirList.length > processedLogs.size() + corruptedLogs.size()) { status.abort("Discovered orphan hlog after split"); throw new IOException("Discovered orphan hlog after split. Maybe " + Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1295744&r1=1295743&r2=1295744&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Thu Mar 1 19:05:25 2012 @@ -637,8 +637,9 @@ public class FSUtils { long startWaiting = System.currentTimeMillis(); boolean discardlastBlock = conf.getBoolean("hbase.regionserver.discardLastNonExistantBlock", - true); - LOG.info("Recovering file" + p + ", discard last block: " + discardlastBlock); + true); + LOG.info("Recovering file " + p + ", discard last block: " + + discardlastBlock); // Trying recovery boolean recovered = false; @@ -1135,4 +1136,4 @@ class FSRegionScanner implements Runnabl LOG.warn("Problem scanning file system", e); } } -} +} \ No newline at end of file Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1295744&r1=1295743&r2=1295744&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Mar 1 19:05:25 2012 @@ -54,7 +54,9 @@ import org.apache.hadoop.hbase.client.Pu import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -62,8 +64,10 @@ import org.apache.hadoop.hbase.regionser import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; @@ -91,6 +95,13 @@ public class HBaseTestingUtility { private final static Log LOG = LogFactory.getLog(HBaseTestingUtility.class); private final Configuration conf; private MiniZooKeeperCluster zkCluster = null; + + /** + * The default number of regions per regionserver when creating a pre-split + * table. + */ + private static int DEFAULT_REGIONS_PER_SERVER = 5; + private MiniDFSCluster dfsCluster = null; private MiniHBaseCluster hbaseCluster = null; private MiniMRCluster mrCluster = null; @@ -1307,6 +1318,52 @@ public class HBaseTestingUtility { return port; } + /** + * Creates a pre-split table for load testing. If the table already exists, + * logs a warning and continues. + * @return the number of regions the table was split into + */ + public static int createPreSplitLoadTestTable(Configuration conf, + byte[] tableName, byte[] columnFamily, Algorithm compression, + DataBlockEncoding dataBlockEncoding) throws IOException { + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); + hcd.setDataBlockEncoding(dataBlockEncoding); + hcd.setCompressionType(compression); + desc.addFamily(hcd); + + int totalNumberOfRegions = 0; + try { + HBaseAdmin admin = new HBaseAdmin(conf); + + // create a table a pre-splits regions. + // The number of splits is set as: + // region servers * regions per region server + int numberOfServers = admin.getClusterStatus().getServers(); + if (numberOfServers == 0) { + throw new IllegalStateException("No live regionservers"); + } + + totalNumberOfRegions = numberOfServers * DEFAULT_REGIONS_PER_SERVER; + LOG.info("Number of live regionservers: " + numberOfServers + ", " + + "pre-splitting table into " + totalNumberOfRegions + " regions " + + "(default regions per server: " + DEFAULT_REGIONS_PER_SERVER + ")"); + + byte[][] splits = new RegionSplitter.HexStringSplit().split( + totalNumberOfRegions); + + admin.createTable(desc, splits); + admin.close(); + } catch (MasterNotRunningException e) { + LOG.error("Master not running", e); + throw new IOException(e); + } catch (TableExistsException e) { + LOG.warn("Table " + Bytes.toStringBinary(tableName) + + " already exists, continuing"); + } + return totalNumberOfRegions; + } + public HRegion createTestRegion(String tableName, HColumnDescriptor hcd) throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName); Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/MultiMasterTest.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/MultiMasterTest.java?rev=1295744&r1=1295743&r2=1295744&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/MultiMasterTest.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/MultiMasterTest.java Thu Mar 1 19:05:25 2012 @@ -26,6 +26,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.LocalHBaseCluster; @@ -41,6 +42,7 @@ public class MultiMasterTest { private MiniHBaseCluster cluster; protected final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + protected final Configuration conf = TEST_UTIL.getConfiguration(); public void startMiniCluster(int numMasters, int numRS) throws IOException, InterruptedException { Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java?rev=1295744&r1=1295743&r2=1295744&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java Thu Mar 1 19:05:25 2012 @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; @@ -54,7 +55,6 @@ import org.apache.hadoop.hbase.master.Sp import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -91,7 +91,7 @@ public class TestDistributedLogSplitting conf = HBaseConfiguration.create(); conf.setInt("hbase.regionserver.info.port", -1); conf.setFloat("hbase.regions.slop", (float)100.0); // no load balancing - conf.setBoolean("hbase.master.distributed.log.splitting", true); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true); TEST_UTIL = new HBaseTestingUtility(conf); cluster = TEST_UTIL.startMiniCluster(num_rs); int live_rs; Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestLogSplitOnMasterFailover.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestLogSplitOnMasterFailover.java?rev=1295744&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestLogSplitOnMasterFailover.java (added) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestLogSplitOnMasterFailover.java Thu Mar 1 19:05:25 2012 @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Semaphore; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.junit.Test; + +/** + * Tests that the master splits the logs of dead regionservers on startup and + * does not attempt to split live regionservers' logs. Done by killing a + * regionserver to create a need to split logs, and quickly killing a master to + * cause master failover. + */ +public class TestLogSplitOnMasterFailover extends MultiMasterTest { + + private static final Log LOG = + LogFactory.getLog(TestLogSplitOnMasterFailover.class); + + private final int NUM_MASTERS = 2; + private final int NUM_RS = 2; + private final int NUM_ROWS = 8000; + private final int COLS_PER_ROW = 30; + + private final byte[] TABLE_BYTES = Bytes.toBytes("myTable"); + private final byte[] CF_BYTES = Bytes.toBytes("myCF"); + + private Compression.Algorithm COMPRESSION = Compression.Algorithm.GZ; + + private Semaphore halfRowsLoaded = new Semaphore(0); + private Semaphore dataLoadVerifyFinished = new Semaphore(0); + + /** + * A worker that inserts data into HBase on a separate thread. + */ + private class DataLoader implements Runnable { + + private volatile boolean failure = false; + + private Map<String, List<String>> rowToQuals = + new HashMap<String, List<String>>(); + private HTable t; + + @Override + public void run() { + Thread.currentThread().setName(getClass().getSimpleName()); + try { + HBaseTestingUtility.createPreSplitLoadTestTable(conf, + TABLE_BYTES, CF_BYTES, COMPRESSION, DataBlockEncoding.NONE); + t = new HTable(conf, TABLE_BYTES); + + loadData(); + verifyData(); + } catch (Throwable ex) { + LOG.error("Data loader failure", ex); + failure = true; + } finally { + dataLoadVerifyFinished.release(); + if (t != null) { + try { + t.close(); + } catch (IOException e) { + LOG.error("Error closing HTable", e); + } + } + } + } + + private void loadData() throws IOException { + Random rand = new Random(190879817L); + int bytesInserted = 0; + for (int i = 0; i < NUM_ROWS; ++i) { + int rowsLoaded = i + 1; + String rowStr = String.format("%04x", rand.nextInt(65536)) + "_" + i; + byte[] rowBytes = Bytes.toBytes(rowStr); + Put p = new Put(rowBytes); + List<String> quals = new ArrayList<String>(); + rowToQuals.put(rowStr, quals); + for (int j = 0; j < COLS_PER_ROW; ++j) { + String qualStr = "" + rand.nextInt(10000) + "_" + j; + quals.add(qualStr); + String valueStr = createValue(rowStr, qualStr); + byte[] qualBytes = Bytes.toBytes(qualStr); + byte[] valueBytes = Bytes.toBytes(valueStr); + p.add(CF_BYTES, qualBytes, valueBytes); + bytesInserted += rowBytes.length + qualBytes.length + + valueBytes.length; + } + t.put(p); + if (rowsLoaded % (NUM_ROWS / 10) == 0) { + LOG.info("Loaded " + rowsLoaded + " rows"); + } + if (rowsLoaded == NUM_ROWS / 2) { + LOG.info("Loaded half of the rows (" + rowsLoaded + + "), waking up main thread"); + halfRowsLoaded.release(); + } + } + LOG.info("Approximate number of bytes inserted: " + bytesInserted); + } + + private void verifyData() throws IOException { + LOG.debug("Starting data verification"); + for (Map.Entry<String, List<String>> entry : rowToQuals.entrySet()) { + String row = entry.getKey(); + List<String> quals = entry.getValue(); + Get g = new Get(Bytes.toBytes(row)); + Result r = t.get(g); + Map<byte[], byte[]> familyMap = r.getFamilyMap(CF_BYTES); + assertNotNull(familyMap); + assertEquals(quals.size(), familyMap.size()); + for (String q : quals) { + byte[] v = familyMap.get(Bytes.toBytes(q)); + assertNotNull(v); + assertEquals(createValue(row, q), Bytes.toStringBinary(v)); + } + } + LOG.debug("Data verification completed"); + } + + private String createValue(String rowStr, String qualStr) { + return "v" + rowStr + "_" + qualStr; + } + } + + @Test(timeout=180000) + public void testWithRegularLogSplitting() throws Exception { + ZooKeeperWrapper.setNamespaceForTesting(); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, false); + runTest(); + } + + @Test(timeout=180000) + public void testWithDistributedLogSplitting() throws Exception { + ZooKeeperWrapper.setNamespaceForTesting(); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_SPLITTING_KEY, true); + runTest(); + } + + private void runTest() throws Exception { + startMiniCluster(NUM_MASTERS, NUM_RS); + Thread.currentThread().setName(getClass().getSimpleName()); + ensureMastersAreUp(NUM_MASTERS); + + final int activeIndex = getActiveMasterIndex(); + + List<HMaster> masters = miniCluster().getMasters(); + + header("Starting data loader"); + DataLoader dataLoader = new DataLoader(); + Thread inserterThread = new Thread(dataLoader); + inserterThread.start(); + halfRowsLoaded.acquire(); + + Path logsDir = new Path(FSUtils.getRootDir(conf), + HConstants.HREGION_LOGDIR_NAME); + + header("Killing one region server so we have some logs to split"); + HRegionServer rsToKill = miniCluster().getRegionServer(0); + String killedRsName = rsToKill.getServerInfo().getServerName(); + List<String> otherRsNames = new ArrayList<String>(); + for (int i = 1; i < NUM_RS; ++i) { + otherRsNames.add( + miniCluster().getRegionServer(i).getServerInfo().getServerName()); + } + rsToKill.kill(); + // Wait until the regionserver actually goes down. + while (miniCluster().getLiveRegionServerThreads().size() == NUM_RS) { + Threads.sleep(HConstants.SOCKET_RETRY_WAIT_MS); + } + + // Check that we have some logs. + FileSystem fs = FileSystem.get(conf); + assertTrue("Directory " + logsDir + " does not exist", + fs.exists(logsDir)); + FileStatus[] logDirs = fs.listStatus(logsDir); + assertTrue("No logs in the log directory " + logsDir, logDirs.length > 0); + + header("Killing the active master (#" + activeIndex + ")"); + + miniCluster().killMaster(activeIndex); + miniCluster().getHBaseCluster().waitOnMasterStop(activeIndex); + + masters = miniCluster().getMasters(); + assertEquals(1, masters.size()); + + // wait for an active master to show up and be ready + assertTrue(miniCluster().waitForActiveAndReadyMaster()); + + header("Verifying backup master is now active"); + // should only have one master now + assertEquals(1, masters.size()); + // and he should be active + HMaster master = masters.get(0); + assertTrue(master.isActiveMaster()); + + LOG.debug("Waiting until we finish loading/verifying the data"); + dataLoadVerifyFinished.acquire(); + assertFalse("Data loader failure, check the logs", dataLoader.failure); + + // Check the master split the correct logs at startup; + List<String> logDirsSplitAtStartup = master.getLogDirsSplitOnStartup(); + LOG.info("Log dirs split at startup: " + logDirsSplitAtStartup); + + Set<String> logsSplit = new HashSet<String>(); + logsSplit.addAll(logDirsSplitAtStartup); + String logDirToBeSplit = killedRsName + "-splitting"; + assertTrue("Log directory " + logDirToBeSplit + " was not split " + + "on startup. Logs split: " + logDirsSplitAtStartup, + logsSplit.contains(logDirToBeSplit)); + for (String logNotToSplit : otherRsNames) { + assertFalse("Log directory " + logNotToSplit + + " should not have been split", logsSplit.contains(logNotToSplit)); + } + } + +}
