http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java index 39743d5..41a790e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java @@ -27,12 +27,15 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.util.LRUDictionary; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; @@ -51,7 +54,7 @@ import org.apache.hadoop.io.compress.DefaultCodec; * Hadoop serialization). */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) -public class SequenceFileLogWriter extends WriterBase { +public class SequenceFileLogWriter implements DefaultWALProvider.Writer { private static final Log LOG = LogFactory.getLog(SequenceFileLogWriter.class); // The sequence file we delegate to. private SequenceFile.Writer writer; @@ -59,6 +62,8 @@ public class SequenceFileLogWriter extends WriterBase { // in the SequenceFile.Writer 'writer' instance above. private FSDataOutputStream writer_out; + private CompressionContext compressionContext; + // Legacy stuff from pre-PB WAL metadata. private static final Text WAL_VERSION_KEY = new Text("version"); private static final Text WAL_COMPRESSION_TYPE_KEY = new Text("compression.type"); @@ -88,10 +93,23 @@ public class SequenceFileLogWriter extends WriterBase { return new Metadata(metaMap); } + private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { + boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); + if (doCompress) { + try { + this.compressionContext = new CompressionContext(LRUDictionary.class, + FSUtils.isRecoveredEdits(path), conf.getBoolean( + CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true)); + } catch (Exception e) { + throw new IOException("Failed to initiate CompressionContext", e); + } + } + return doCompress; + } + @Override public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException { - super.init(fs, path, conf, overwritable); boolean compress = initializeCompressionContext(conf, path); // Create a SF.Writer instance. try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java new file mode 100644 index 0000000..bedb915 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRollPeriod.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestAsyncLogRollPeriod extends TestLogRollPeriod { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TestLogRollPeriod.TEST_UTIL.getConfiguration(); + conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); + TestLogRollPeriod.setUpBeforeClass(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java new file mode 100644 index 0000000..fabf6d2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ VerySlowRegionServerTests.class, LargeTests.class }) +public class TestAsyncLogRolling extends AbstractTestLogRolling { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TestAsyncLogRolling.TEST_UTIL.getConfiguration(); + conf.setInt(AsyncFSWAL.ASYNC_WAL_CREATE_MAX_RETRIES, 100); + conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); + AbstractTestLogRolling.setUpBeforeClass(); + } + + @Test(timeout = 180000) + public void testLogRollOnDatanodeDeath() throws IOException, InterruptedException { + dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 3, true, null, null); + tableName = getName(); + Table table = createTestTable(tableName); + TEST_UTIL.waitUntilAllRegionsAssigned(table.getName()); + doPut(table, 1); + server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); + HRegionInfo hri = server.getOnlineRegions(table.getName()).get(0).getRegionInfo(); + AsyncFSWAL wal = (AsyncFSWAL) server.getWAL(hri); + int numRolledLogFiles = AsyncFSWALProvider.getNumRolledLogFiles(wal); + DatanodeInfo[] dnInfos = wal.getPipeline(); + DataNodeProperties dnProp = TEST_UTIL.getDFSCluster().stopDataNode(dnInfos[0].getName()); + TEST_UTIL.getDFSCluster().restartDataNode(dnProp); + doPut(table, 2); + assertEquals(numRolledLogFiles + 1, AsyncFSWALProvider.getNumRolledLogFiles(wal)); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java new file mode 100644 index 0000000..7d6c6d9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncProtobufLog.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputFlushHandler; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +import com.google.common.base.Throwables; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestAsyncProtobufLog extends AbstractTestProtobufLog<WALProvider.AsyncWriter> { + + private static EventLoopGroup EVENT_LOOP_GROUP; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + EVENT_LOOP_GROUP = new NioEventLoopGroup(); + AbstractTestProtobufLog.setUpBeforeClass(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + AbstractTestProtobufLog.tearDownAfterClass(); + EVENT_LOOP_GROUP.shutdownGracefully().syncUninterruptibly(); + } + + @Override + protected AsyncWriter createWriter(Path path) throws IOException { + return AsyncFSWALProvider.createAsyncWriter(TEST_UTIL.getConfiguration(), fs, path, false, + EVENT_LOOP_GROUP.next()); + } + + @Override + protected void append(AsyncWriter writer, Entry entry) throws IOException { + writer.append(entry); + } + + @Override + protected void sync(AsyncWriter writer) throws IOException { + FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); + writer.sync(handler, null); + try { + handler.get(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } catch (ExecutionException e) { + Throwables.propagateIfPossible(e.getCause(), IOException.class); + throw new IOException(e.getCause()); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java new file mode 100644 index 0000000..ca415fd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestAsyncWALReplay extends TestWALReplay { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); + TestWALReplay.setUpBeforeClass(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java new file mode 100644 index 0000000..3b8869b --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplayCompressed.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestAsyncWALReplayCompressed extends TestWALReplay { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); + conf.set(WALFactory.WAL_PROVIDER, "asyncfs"); + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + TestWALReplay.setUpBeforeClass(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java index 0662716..5783106 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import java.io.IOException; +import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -30,28 +31,35 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; /** * Tests for WAL write durability */ +@RunWith(Parameterized.class) @Category({RegionServerTests.class, MediumTests.class}) public class TestDurability { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -64,6 +72,13 @@ public class TestDurability { private static byte[] ROW = Bytes.toBytes("row"); private static byte[] COL = Bytes.toBytes("col"); + @Parameter + public String walProvider; + + @Parameters(name = "{index}: provider={0}") + public static Iterable<Object[]> data() { + return Arrays.asList(new Object[] { "defaultProvider" }, new Object[] { "asyncfs" }); + } @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -81,6 +96,16 @@ public class TestDurability { TEST_UTIL.shutdownMiniCluster(); } + @Before + public void setUp() { + CONF.set(WALFactory.WAL_PROVIDER, walProvider); + } + + @After + public void tearDown() throws IOException { + FS.delete(DIR, true); + } + @Test public void testDurability() throws Exception { final WALFactory wals = new WALFactory(CONF, null, "TestDurability"); http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index c05e7f0..b7c1c73 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -65,11 +65,11 @@ import org.junit.experimental.categories.Category; */ @Category({RegionServerTests.class, MediumTests.class}) public class TestLogRollAbort { - private static final Log LOG = LogFactory.getLog(TestLogRolling.class); + private static final Log LOG = LogFactory.getLog(AbstractTestLogRolling.class); private static MiniDFSCluster dfsCluster; private static Admin admin; private static MiniHBaseCluster cluster; - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); /* For the split-then-roll test */ private static final Path HBASEDIR = new Path("/hbase"); @@ -212,7 +212,7 @@ public class TestLogRollAbort { } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); - ((FSHLog) log).replaceWriter(((FSHLog)log).getOldPath(), null, null, null); + ((AbstractFSWAL<?>) log).replaceWriter(((FSHLog)log).getOldPath(), null, null); /* code taken from MasterFileSystem.getLogDirs(), which is called from MasterFileSystem.splitLog() * handles RS shutdowns (as observed by the splitting process) http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java index 1bf686f..1141871 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollPeriod.java @@ -19,21 +19,21 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertFalse; -import java.util.List; import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -44,9 +44,9 @@ import org.junit.experimental.categories.Category; */ @Category({RegionServerTests.class, MediumTests.class}) public class TestLogRollPeriod { - private static final Log LOG = LogFactory.getLog(TestLogRolling.class); + private static final Log LOG = LogFactory.getLog(AbstractTestLogRolling.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static long LOG_ROLL_PERIOD = 4000; http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 3ab49c0..37b23e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,7 +23,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.EOFException; -import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -33,275 +31,70 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.JVMClusterUtil; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -/** - * Test log deletion as logs are rolled. - */ -@Category({VerySlowRegionServerTests.class, LargeTests.class}) -public class TestLogRolling { +@Category({ VerySlowRegionServerTests.class, LargeTests.class }) +public class TestLogRolling extends AbstractTestLogRolling { + private static final Log LOG = LogFactory.getLog(TestLogRolling.class); - private HRegionServer server; - private String tableName; - private byte[] value; - private FileSystem fs; - private MiniDFSCluster dfsCluster; - private Admin admin; - private MiniHBaseCluster cluster; - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - @Rule public final TestName name = new TestName(); - - public TestLogRolling() { - this.server = null; - this.tableName = null; - - String className = this.getClass().getName(); - StringBuilder v = new StringBuilder(className); - while (v.length() < 1000) { - v.append(className); - } - this.value = Bytes.toBytes(v.toString()); - } - // Need to override this setup so we can edit the config before it gets sent - // to the HDFS & HBase cluster startup. @BeforeClass public static void setUpBeforeClass() throws Exception { // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2 // profile. See HBASE-9337 for related issues. System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); - /**** configuration for testLogRolling ****/ - // Force a region split after every 768KB - TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L); - - // We roll the log after every 32 writes - TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); - - TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2); - TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); - - // For less frequently updated regions flush after every 2 flushes - TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2); - - // We flush the cache after every 8192 bytes - TEST_UTIL.getConfiguration().setInt( - HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192); - - // Increase the amount of time between client retries - TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000); - - // Reduce thread wake frequency so that other threads can get - // a chance to run. - TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000); - - /**** configuration for testLogRollOnDatanodeDeath ****/ - // make sure log.hflush() calls syncFs() to open a pipeline + /**** configuration for testLogRollOnDatanodeDeath ****/ + // make sure log.hflush() calls syncFs() to open a pipeline TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); - // lower the namenode & datanode heartbeat so the namenode - // quickly detects datanode failures + // lower the namenode & datanode heartbeat so the namenode + // quickly detects datanode failures TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); // the namenode might still try to choose the recently-dead datanode // for a pipeline, so try to a new pipeline multiple times - TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); + TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.tolerable.lowreplication", 2); TEST_UTIL.getConfiguration().setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); - } - - @Before - public void setUp() throws Exception { - TEST_UTIL.startMiniCluster(1, 1, 2); - - cluster = TEST_UTIL.getHBaseCluster(); - dfsCluster = TEST_UTIL.getDFSCluster(); - fs = TEST_UTIL.getTestFileSystem(); - admin = TEST_UTIL.getHBaseAdmin(); - - // disable region rebalancing (interferes with log watching) - cluster.getMaster().balanceSwitch(false); - } - - @After - public void tearDown() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - private void startAndWriteData() throws IOException, InterruptedException { - // When the hbase:meta table can be opened, the region servers are running - TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); - this.server = cluster.getRegionServerThreads().get(0).getRegionServer(); - - Table table = createTestTable(this.tableName); - - server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); - for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls - doPut(table, i); - if (i % 32 == 0) { - // After every 32 writes sleep to let the log roller run - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - // continue - } - } - } - } - - /** - * Tests that log rolling doesn't hang when no data is written. - */ - @Test(timeout=120000) - public void testLogRollOnNothingWritten() throws Exception { - final Configuration conf = TEST_UTIL.getConfiguration(); - final WALFactory wals = new WALFactory(conf, null, - ServerName.valueOf("test.com",8080, 1).toString()); - final WAL newLog = wals.getWAL(new byte[]{}, null); - try { - // Now roll the log before we write anything. - newLog.rollWriter(true); - } finally { - wals.close(); - } - } - - /** - * Tests that logs are deleted - * @throws IOException - * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException - */ - @Test - public void testLogRolling() throws Exception { - this.tableName = getName(); - // TODO: Why does this write data take for ever? - startAndWriteData(); - HRegionInfo region = - server.getOnlineRegions(TableName.valueOf(tableName)).get(0).getRegionInfo(); - final WAL log = server.getWAL(region); - LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) + - " log files"); - - // flush all regions - for (Region r: server.getOnlineRegionsLocalContext()) { - r.flush(true); - } - - // Now roll the log - log.rollWriter(); - - int count = DefaultWALProvider.getNumRolledLogFiles(log); - LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); - assertTrue(("actual count: " + count), count <= 2); - } - - private String getName() { - return "TestLogRolling-" + name.getMethodName(); - } - - void writeData(Table table, int rownum) throws IOException { - doPut(table, rownum); - - // sleep to let the log roller run (if it needs to) - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - // continue - } - } - - void validateData(Table table, int rownum) throws IOException { - String row = "row" + String.format("%1$04d", rownum); - Get get = new Get(Bytes.toBytes(row)); - get.addFamily(HConstants.CATALOG_FAMILY); - Result result = table.get(get); - assertTrue(result.size() == 1); - assertTrue(Bytes.equals(value, - result.getValue(HConstants.CATALOG_FAMILY, null))); - LOG.info("Validated row " + row); - } - - void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) - throws IOException { - for (int i = 0; i < 10; i++) { - Put put = new Put(Bytes.toBytes("row" - + String.format("%1$04d", (start + i)))); - put.addColumn(HConstants.CATALOG_FAMILY, null, value); - table.put(put); - } - Put tmpPut = new Put(Bytes.toBytes("tmprow")); - tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value); - long startTime = System.currentTimeMillis(); - long remaining = timeout; - while (remaining > 0) { - if (log.isLowReplicationRollEnabled() == expect) { - break; - } else { - // Trigger calling FSHlog#checkLowReplication() - table.put(tmpPut); - try { - Thread.sleep(200); - } catch (InterruptedException e) { - // continue - } - remaining = timeout - (System.currentTimeMillis() - startTime); - } - } + AbstractTestLogRolling.setUpBeforeClass(); } /** - * Tests that logs are rolled upon detecting datanode death - * Requires an HDFS jar with HDFS-826 & syncFs() support (HDFS-200) + * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 & + * syncFs() support (HDFS-200) */ @Test public void testLogRollOnDatanodeDeath() throws Exception { TEST_UTIL.ensureSomeRegionServersAvailable(2); assertTrue("This test requires WAL file replication set to 2.", fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) == 2); - LOG.info("Replication=" + - fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); + LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); this.server = cluster.getRegionServer(0); @@ -328,29 +121,28 @@ public class TestLogRolling { }); // don't run this test without append support (HDFS-200 & HDFS-142) - assertTrue("Need append support for this test", FSUtils - .isAppendSupported(TEST_UTIL.getConfiguration())); + assertTrue("Need append support for this test", + FSUtils.isAppendSupported(TEST_UTIL.getConfiguration())); // add up the datanode count, to ensure proper replication when we kill 1 // This function is synchronous; when it returns, the dfs cluster is active // We start 3 servers and then stop 2 to avoid a directory naming conflict - // when we stop/start a namenode later, as mentioned in HBASE-5163 + // when we stop/start a namenode later, as mentioned in HBASE-5163 List<DataNode> existingNodes = dfsCluster.getDataNodes(); int numDataNodes = 3; - dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, - null, null); + dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), numDataNodes, true, null, null); List<DataNode> allNodes = dfsCluster.getDataNodes(); - for (int i = allNodes.size()-1; i >= 0; i--) { + for (int i = allNodes.size() - 1; i >= 0; i--) { if (existingNodes.contains(allNodes.get(i))) { - dfsCluster.stopDataNode( i ); + dfsCluster.stopDataNode(i); } } - assertTrue("DataNodes " + dfsCluster.getDataNodes().size() + - " default replication " + - fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()), - dfsCluster.getDataNodes().size() >= - fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1); + assertTrue( + "DataNodes " + dfsCluster.getDataNodes().size() + " default replication " + + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()), + dfsCluster.getDataNodes() + .size() >= fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) + 1); writeData(table, 2); @@ -358,13 +150,12 @@ public class TestLogRolling { LOG.info("log.getCurrentFileName(): " + log.getCurrentFileName()); long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log); assertTrue("Log should have a timestamp older than now", - curTime > oldFilenum && oldFilenum != -1); + curTime > oldFilenum && oldFilenum != -1); assertTrue("The log shouldn't have rolled yet", - oldFilenum == DefaultWALProvider.extractFileNumFromWAL(log)); - final DatanodeInfo[] pipeline = log.getPipeLine(); - assertTrue(pipeline.length == - fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); + oldFilenum == DefaultWALProvider.extractFileNumFromWAL(log)); + final DatanodeInfo[] pipeline = log.getPipeline(); + assertTrue(pipeline.length == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); // kill a datanode in the pipeline to force a log roll on the next sync() // This function is synchronous, when it returns the node is killed. @@ -375,41 +166,38 @@ public class TestLogRolling { long newFilenum = DefaultWALProvider.extractFileNumFromWAL(log); assertTrue("Missing datanode should've triggered a log roll", - newFilenum > oldFilenum && newFilenum > curTime); + newFilenum > oldFilenum && newFilenum > curTime); assertTrue("The log rolling hook should have been called with the low replication flag", - lowReplicationHookCalled.get()); + lowReplicationHookCalled.get()); // write some more log data (this should use a new hdfs_out) writeData(table, 3); assertTrue("The log should not roll again.", - DefaultWALProvider.extractFileNumFromWAL(log) == newFilenum); + DefaultWALProvider.extractFileNumFromWAL(log) == newFilenum); // kill another datanode in the pipeline, so the replicas will be lower than // the configured value 2. assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); batchWriteAndWait(table, log, 3, false, 14000); int replication = log.getLogReplication(); - assertTrue("LowReplication Roller should've been disabled, current replication=" - + replication, !log.isLowReplicationRollEnabled()); + assertTrue("LowReplication Roller should've been disabled, current replication=" + replication, + !log.isLowReplicationRollEnabled()); - dfsCluster - .startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null); + dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 1, true, null, null); // Force roll writer. The new log file will have the default replications, // and the LowReplication Roller will be enabled. log.rollWriter(true); batchWriteAndWait(table, log, 13, true, 10000); replication = log.getLogReplication(); - assertTrue("New log file should have the default replication instead of " + - replication, + assertTrue("New log file should have the default replication instead of " + replication, replication == fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); assertTrue("LowReplication Roller should've been enabled", log.isLowReplicationRollEnabled()); } /** - * Test that WAL is rolled when all data nodes in the pipeline have been - * restarted. + * Test that WAL is rolled when all data nodes in the pipeline have been restarted. * @throws Exception */ @Test @@ -417,8 +205,7 @@ public class TestLogRolling { LOG.info("Starting testLogRollOnPipelineRestart"); assertTrue("This test requires WAL file replication.", fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS()) > 1); - LOG.info("Replication=" + - fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); + LOG.info("Replication=" + fs.getDefaultReplication(TEST_UTIL.getDataTestDirOnTestFS())); // When the hbase:meta table can be opened, the region servers are running Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); try { @@ -441,10 +228,11 @@ public class TestLogRolling { log.registerWALActionsListener(new WALActionsListener.Base() { @Override - public void preLogRoll(Path oldFile, Path newFile) { - LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile); + public void preLogRoll(Path oldFile, Path newFile) { + LOG.debug("preLogRoll: oldFile=" + oldFile + " newFile=" + newFile); preLogRolledCalled.add(new Integer(1)); } + @Override public void postLogRoll(Path oldFile, Path newFile) { paths.add(newFile); @@ -452,8 +240,8 @@ public class TestLogRolling { }); // don't run this test without append support (HDFS-200 & HDFS-142) - assertTrue("Need append support for this test", FSUtils - .isAppendSupported(TEST_UTIL.getConfiguration())); + assertTrue("Need append support for this test", + FSUtils.isAppendSupported(TEST_UTIL.getConfiguration())); writeData(table, 1002); @@ -461,10 +249,10 @@ public class TestLogRolling { LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log)); long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log); assertTrue("Log should have a timestamp older than now", - curTime > oldFilenum && oldFilenum != -1); + curTime > oldFilenum && oldFilenum != -1); - assertTrue("The log shouldn't have rolled yet", oldFilenum == - DefaultWALProvider.extractFileNumFromWAL(log)); + assertTrue("The log shouldn't have rolled yet", + oldFilenum == DefaultWALProvider.extractFileNumFromWAL(log)); // roll all datanodes in the pipeline dfsCluster.restartDataNodes(); @@ -478,7 +266,7 @@ public class TestLogRolling { long newFilenum = DefaultWALProvider.extractFileNumFromWAL(log); assertTrue("Missing datanode should've triggered a log roll", - newFilenum > oldFilenum && newFilenum > curTime); + newFilenum > oldFilenum && newFilenum > curTime); validateData(table, 1003); writeData(table, 1004); @@ -496,30 +284,30 @@ public class TestLogRolling { // force a log roll to read back and verify previously written logs log.rollWriter(true); assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), - preLogRolledCalled.size() >= 1); + preLogRolledCalled.size() >= 1); // read back the data written Set<String> loggedRows = new HashSet<String>(); FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration()); for (Path p : paths) { LOG.debug("recovering lease for " + p); - fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, - TEST_UTIL.getConfiguration(), null); + fsUtils.recoverFileLease(((HFileSystem) fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), + null); - LOG.debug("Reading WAL "+FSUtils.getPath(p)); + LOG.debug("Reading WAL " + FSUtils.getPath(p)); WAL.Reader reader = null; try { reader = WALFactory.createReader(fs, p, TEST_UTIL.getConfiguration()); WAL.Entry entry; while ((entry = reader.next()) != null) { - LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getCells()); + LOG.debug("#" + entry.getKey().getSequenceId() + ": " + entry.getEdit().getCells()); for (Cell cell : entry.getEdit().getCells()) { - loggedRows.add(Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), - cell.getRowLength())); + loggedRows.add( + Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); } } } catch (EOFException e) { - LOG.debug("EOF reading file "+FSUtils.getPath(p)); + LOG.debug("EOF reading file " + FSUtils.getPath(p)); } finally { if (reader != null) reader.close(); } @@ -532,7 +320,7 @@ public class TestLogRolling { assertTrue(loggedRows.contains("row1005")); // flush all regions - for (Region r: server.getOnlineRegionsLocalContext()) { + for (Region r : server.getOnlineRegionsLocalContext()) { try { r.flush(true); } catch (Exception e) { @@ -547,19 +335,19 @@ public class TestLogRolling { ResultScanner scanner = table.getScanner(new Scan()); try { - for (int i=2; i<=5; i++) { + for (int i = 2; i <= 5; i++) { Result r = scanner.next(); assertNotNull(r); assertFalse(r.isEmpty()); - assertEquals("row100"+i, Bytes.toString(r.getRow())); + assertEquals("row100" + i, Bytes.toString(r.getRow())); } } finally { scanner.close(); } // verify that no region servers aborted - for (JVMClusterUtil.RegionServerThread rsThread: - TEST_UTIL.getHBaseCluster().getRegionServerThreads()) { + for (JVMClusterUtil.RegionServerThread rsThread : TEST_UTIL.getHBaseCluster() + .getRegionServerThreads()) { assertFalse(rsThread.getRegionServer().isAborted()); } } finally { @@ -567,80 +355,4 @@ public class TestLogRolling { } } - /** - * Tests that logs are deleted when some region has a compaction - * record in WAL and no other records. See HBASE-8597. - */ - @Test - public void testCompactionRecordDoesntBlockRolling() throws Exception { - Table table = null; - - // When the hbase:meta table can be opened, the region servers are running - Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); - try { - table = createTestTable(getName()); - - server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); - Region region = server.getOnlineRegions(table.getName()).get(0); - final WAL log = server.getWAL(region.getRegionInfo()); - Store s = region.getStore(HConstants.CATALOG_FAMILY); - - //have to flush namespace to ensure it doesn't affect wall tests - admin.flush(TableName.NAMESPACE_TABLE_NAME); - - // Put some stuff into table, to make sure we have some files to compact. - for (int i = 1; i <= 2; ++i) { - doPut(table, i); - admin.flush(table.getName()); - } - doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL - assertEquals("Should have no WAL after initial writes", 0, - DefaultWALProvider.getNumRolledLogFiles(log)); - assertEquals(2, s.getStorefilesCount()); - - // Roll the log and compact table, to have compaction record in the 2nd WAL. - log.rollWriter(); - assertEquals("Should have WAL; one table is not flushed", 1, - DefaultWALProvider.getNumRolledLogFiles(log)); - admin.flush(table.getName()); - region.compact(false); - // Wait for compaction in case if flush triggered it before us. - Assert.assertNotNull(s); - for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) { - Threads.sleepWithoutInterrupt(200); - } - assertEquals("Compaction didn't happen", 1, s.getStorefilesCount()); - - // Write some value to the table so the WAL cannot be deleted until table is flushed. - doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table. - log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet. - assertEquals("Should have WAL; one table is not flushed", 1, - DefaultWALProvider.getNumRolledLogFiles(log)); - - // Flush table to make latest WAL obsolete; write another record, and roll again. - admin.flush(table.getName()); - doPut(table, 1); - log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. - assertEquals("Should have 1 WALs at the end", 1, - DefaultWALProvider.getNumRolledLogFiles(log)); - } finally { - if (t != null) t.close(); - if (table != null) table.close(); - } - } - - private void doPut(Table table, int i) throws IOException { - Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); - put.addColumn(HConstants.CATALOG_FAMILY, null, value); - table.put(put); - } - - private Table createTestTable(String tableName) throws IOException { - // Create the test table and open it - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); - admin.createTable(desc); - return TEST_UTIL.getConnection().getTable(desc.getTableName()); - } } - http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java index e6237f8..61ee589 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestProtobufLog.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,190 +17,32 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - import java.io.IOException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WALProvider; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -/** - * WAL tests that can be reused across providers. - */ -@Category({RegionServerTests.class, MediumTests.class}) -public class TestProtobufLog { - protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - protected FileSystem fs; - protected Path dir; - protected WALFactory wals; - - @Rule - public final TestName currentTest = new TestName(); - @Before - public void setUp() throws Exception { - fs = TEST_UTIL.getDFSCluster().getFileSystem(); - dir = new Path(TEST_UTIL.createRootDir(), currentTest.getMethodName()); - wals = new WALFactory(TEST_UTIL.getConfiguration(), null, currentTest.getMethodName()); - } - - @After - public void tearDown() throws Exception { - wals.close(); - FileStatus[] entries = fs.listStatus(new Path("/")); - for (FileStatus dir : entries) { - fs.delete(dir.getPath(), true); - } - } - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - // Make block sizes small. - TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); - // needed for testAppendClose() - TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true); - TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); - // quicker heartbeat interval for faster DN death notification - TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); - TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); - TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestProtobufLog extends AbstractTestProtobufLog<WALProvider.Writer> { - // faster failover with cluster.shutdown();fs.close() idiom - TEST_UTIL.getConfiguration() - .setInt("hbase.ipc.client.connect.max.retries", 1); - TEST_UTIL.getConfiguration().setInt( - "dfs.client.block.recovery.retries", 1); - TEST_UTIL.getConfiguration().setInt( - "hbase.ipc.client.connection.maxidletime", 500); - TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, - SampleRegionWALObserver.class.getName()); - TEST_UTIL.startMiniDFSCluster(3); + @Override + protected Writer createWriter(Path path) throws IOException { + return DefaultWALProvider.createWriter(TEST_UTIL.getConfiguration(), fs, path, false); } - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); + @Override + protected void append(Writer writer, Entry entry) throws IOException { + writer.append(entry); } - /** - * Reads the WAL with and without WALTrailer. - * @throws IOException - */ - @Test - public void testWALTrailer() throws IOException { - // read With trailer. - doRead(true); - // read without trailer - doRead(false); - } - - /** - * Appends entries in the WAL and reads it. - * @param withTrailer If 'withTrailer' is true, it calls a close on the WALwriter before reading - * so that a trailer is appended to the WAL. Otherwise, it starts reading after the sync - * call. This means that reader is not aware of the trailer. In this scenario, if the - * reader tries to read the trailer in its next() call, it returns false from - * ProtoBufLogReader. - * @throws IOException - */ - private void doRead(boolean withTrailer) throws IOException { - final int columnCount = 5; - final int recordCount = 5; - final TableName tableName = - TableName.valueOf("tablename"); - final byte[] row = Bytes.toBytes("row"); - long timestamp = System.currentTimeMillis(); - Path path = new Path(dir, "tempwal"); - // delete the log if already exists, for test only - fs.delete(path, true); - WALProvider.Writer writer = null; - ProtobufLogReader reader = null; - try { - HRegionInfo hri = new HRegionInfo(tableName, - HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - HTableDescriptor htd = new HTableDescriptor(tableName); - fs.mkdirs(dir); - // Write log in pb format. - writer = wals.createWALWriter(fs, path); - for (int i = 0; i < recordCount; ++i) { - WALKey key = new WALKey( - hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID); - WALEdit edit = new WALEdit(); - for (int j = 0; j < columnCount; ++j) { - if (i == 0) { - htd.addFamily(new HColumnDescriptor("column" + j)); - } - String value = i + "" + j; - edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value))); - } - writer.append(new WAL.Entry(key, edit)); - } - writer.sync(); - if (withTrailer) writer.close(); - - // Now read the log using standard means. - reader = (ProtobufLogReader) wals.createReader(fs, path); - if (withTrailer) { - assertNotNull(reader.trailer); - } else { - assertNull(reader.trailer); - } - for (int i = 0; i < recordCount; ++i) { - WAL.Entry entry = reader.next(); - assertNotNull(entry); - assertEquals(columnCount, entry.getEdit().size()); - assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName()); - assertEquals(tableName, entry.getKey().getTablename()); - int idx = 0; - for (Cell val : entry.getEdit().getCells()) { - assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), - val.getRowLength())); - String value = i + "" + idx; - assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val)); - idx++; - } - } - WAL.Entry entry = reader.next(); - assertNull(entry); - } finally { - if (writer != null) { - writer.close(); - } - if (reader != null) { - reader.close(); - } - } + @Override + protected void sync(Writer writer) throws IOException { + writer.sync(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java index 4987fd4..b225554 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayCompressed.java @@ -36,5 +36,4 @@ public class TestWALReplayCompressed extends TestWALReplay { Configuration conf = TestWALReplay.TEST_UTIL.getConfiguration(); conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); } - }
