Author: apurtell
Date: Sat Dec 19 08:10:45 2009
New Revision: 892451
URL: http://svn.apache.org/viewvc?rev=892451&view=rev
Log:
HBASE-2059 Break out WAL reader and writer impl from HLog
Added:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/conf/hbase-default.xml
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Sat Dec 19 08:10:45 2009
@@ -242,6 +242,7 @@
HBASE-2049 Cleanup HLog binary log output (Dave Latham via Stack)
HBASE-2052 Make hbase more 'live' when comes to noticing table creation,
splits, etc., for 0.20.3
+ HBASE-2059 Break out WAL reader and writer impl from HLog
NEW FEATURES
HBASE-1901 "General" partitioner for "hbase-48" bulk (behind the api, write
Modified: hadoop/hbase/trunk/conf/hbase-default.xml
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/conf/hbase-default.xml?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
--- hadoop/hbase/trunk/conf/hbase-default.xml (original)
+++ hadoop/hbase/trunk/conf/hbase-default.xml Sat Dec 19 08:10:45 2009
@@ -191,6 +191,16 @@
<description>Period at which we will roll the commit log.</description>
</property>
<property>
+ <name>hbase.regionserver.hlog.reader.impl</name>
+
<value>org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogReader</value>
+ <description>The HLog file reader implementation.</description>
+ </property>
+ <property>
+ <name>hbase.regionserver.hlog.writer.impl</name>
+
<value>org.apache.hadoop.hbase.regionserver.wal.SequenceFileLogWriter</value>
+ <description>The HLog file writer implementation.</description>
+ </property>
+ <property>
<name>hbase.regionserver.thread.splitcompactcheckfrequency</name>
<value>20000</value>
<description>How often a region server runs the split/compaction check.
Modified:
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java
(original)
+++
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLog.java
Sat Dec 19 08:10:45 2009
@@ -33,7 +33,6 @@
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
-import org.apache.hadoop.io.SequenceFile;
/**
* Add support for transactional operations to the regionserver's
@@ -48,11 +47,6 @@
}
@Override
- protected SequenceFile.Writer createWriter(Path path) throws IOException {
- return super.createWriter(path, THLogKey.class, KeyValue.class);
- }
-
- @Override
protected HLogKey makeKey(byte[] regionName, byte[] tableName, long seqNum,
long now) {
return new THLogKey(regionName, tableName, seqNum, now);
Modified:
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java
(original)
+++
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java
Sat Dec 19 08:10:45 2009
@@ -42,7 +42,6 @@
import
org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
import org.apache.hadoop.hbase.client.transactional.TransactionLogger;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.util.Progressable;
/**
@@ -107,12 +106,8 @@
Set<Long> commitedTransactions = new HashSet<Long>();
Set<Long> abortedTransactions = new HashSet<Long>();
- SequenceFile.Reader logReader = HLog.getReader(fileSystem,
- reconstructionLog, conf);
-
- try {
- THLogKey key = new THLogKey();
- KeyValue val = new KeyValue();
+ HLog.Reader reader = HLog.getReader(fileSystem, reconstructionLog, conf);
+ try {
long skippedEdits = 0;
long totalEdits = 0;
long startCount = 0;
@@ -123,7 +118,10 @@
int reportInterval = conf.getInt("hbase.hstore.report.interval.edits",
2000);
- while (logReader.next(key, val)) {
+ HLog.Entry entry;
+ while ((entry = reader.next()) != null) {
+ THLogKey key = (THLogKey)entry.getKey();
+ KeyValue val = entry.getEdit();
if (LOG.isTraceEnabled()) {
LOG.trace("Processing edit: key: " + key.toString() + " val: "
+ val.toString());
@@ -200,7 +198,7 @@
+ " aborts, and " + commitCount + " commits.");
}
} finally {
- logReader.close();
+ reader.close();
}
if (pendingTransactionsById.size() > 0) {
Modified:
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
(original)
+++
hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
Sat Dec 19 08:10:45 2009
@@ -108,6 +108,8 @@
@Override
protected HLog instantiateHLog(Path logdir) throws IOException {
+ conf.set("hbase.regionserver.hlog.keyclass",
+ THLogKey.class.getCanonicalName());
HLog newlog = new THLog(super.getFileSystem(), logdir, conf,
super.getLogRoller());
return newlog;
}
Modified:
hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
(original)
+++
hadoop/hbase/trunk/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java
Sat Dec 19 08:10:45 2009
@@ -58,6 +58,8 @@
// Set the hbase.rootdir to be the home directory in mini dfs.
this.conf.set(HConstants.HBASE_DIR, this.cluster.getFileSystem()
.getHomeDirectory().toString());
+ this.conf.set("hbase.regionserver.hlog.keyclass",
+ THLogKey.class.getCanonicalName());
super.setUp();
this.dir = new Path("/hbase", getName());
if (fs.exists(dir)) {
Modified:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
(original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
Sat Dec 19 08:10:45 2009
@@ -59,7 +59,6 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
@@ -317,17 +316,17 @@
// general memory usage accounting.
long maxSeqIdInLog = -1;
long firstSeqIdInLog = -1;
- SequenceFile.Reader logReader = HLog.getReader(this.fs, reconstructionLog,
- this.conf);
+ HLog.Reader logReader = HLog.getReader(this.fs, reconstructionLog, conf);
try {
- HLogKey key = HLog.newKey(conf);
- KeyValue val = new KeyValue();
long skippedEdits = 0;
long editsCount = 0;
// How many edits to apply before we send a progress report.
int reportInterval =
this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
- while (logReader.next(key, val)) {
+ HLog.Entry entry;
+ while ((entry = logReader.next()) != null) {
+ HLogKey key = entry.getKey();
+ KeyValue val = entry.getEdit();
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
Modified:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
(original)
+++
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Sat Dec 19 08:10:45 2009
@@ -23,7 +23,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@@ -46,8 +45,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -64,12 +61,6 @@
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.SequenceFile.Metadata;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.Progressable;
/**
* HLog stores all the edits to the HStore. Its the hbase write-ahead-log
@@ -123,7 +114,30 @@
private final long blocksize;
private final int flushlogentries;
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
- private final short replicationLevel;
+
+ public interface Reader {
+
+ void init(FileSystem fs, Path path, Configuration c) throws IOException;
+
+ void close() throws IOException;
+
+ Entry next() throws IOException;
+
+ Entry next(Entry reuse) throws IOException;
+
+ }
+
+ public interface Writer {
+
+ void init(FileSystem fs, Path path, Configuration c) throws IOException;
+
+ void close() throws IOException;
+
+ void sync() throws IOException;
+
+ void append(Entry entry) throws IOException;
+
+ }
// used to indirectly tell syncFs to force the sync
private boolean forceSync = false;
@@ -131,10 +145,7 @@
/*
* Current log file.
*/
- SequenceFile.Writer writer;
- // This is the above writer's output stream. Its private but we use
reflection
- // to expose it so we can call sync on it.
- FSDataOutputStream writer_out;
+ Writer writer;
/*
* Map of all log files but the current one.
@@ -218,8 +229,6 @@
conf.getInt("hbase.regionserver.flushlogentries", 1);
this.blocksize = conf.getLong("hbase.regionserver.hlog.blocksize",
this.fs.getDefaultBlockSize());
- this.replicationLevel = (short)
conf.getInt("hbase.regionserver.hlog.replication",
- this.fs.getDefaultReplication());
// Roll at 95% of block size.
float multi = conf.getFloat("hbase.regionserver.logroll.multiplier",
0.95f);
this.logrollsize = (long)(this.blocksize * multi);
@@ -250,16 +259,6 @@
}
/**
- * Get the compression type for the hlog files
- * @param c Configuration to use.
- * @return the kind of compression to use
- */
- static CompressionType getCompressionType(final Configuration c) {
- // Compression makes no sense for commit log. Always return NONE.
- return CompressionType.NONE;
- }
-
- /**
* Called by HRegionServer when it opens a new region to ensure that log
* sequence numbers are always greater than the latest sequence number of the
* region being brought on-line.
@@ -318,7 +317,7 @@
Path oldFile = cleanupCurrentWriter(this.filenum);
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename(this.filenum);
- this.writer = createWriter(newPath);
+ this.writer = createWriter(fs, newPath, new HBaseConfiguration(conf));
LOG.info((oldFile != null?
"Roll " + FSUtils.getPath(oldFile) + ", entries=" +
this.numEntries.get() +
@@ -349,113 +348,54 @@
return regionToFlush;
}
- protected SequenceFile.Writer createWriter(Path path) throws IOException {
- return createWriter(path, HLogKey.class, KeyValue.class);
- }
-
/**
- * Hack just to set the correct file length up in SequenceFile.Reader.
- * See HADOOP-6307. The below is all about setting the right length on the
- * file we are reading. fs.getFileStatus(file).getLen() is passed down to
- * a private SequenceFile.Reader constructor. This won't work. Need to do
- * the available on the stream. The below is ugly. It makes getPos, the
- * first time its called, return length of the file -- i.e. tell a lie --
just
- * so this line up in SF.Reader's constructor ends up with right answer:
- *
- * this.end = in.getPos() + length;
+ * Get a reader for the WAL.
+ * @param fs
+ * @param path
+ * @param keyClass
+ * @param valueClass
+ * @return A WAL reader. Close when done with it.
+ * @throws IOException
*/
- private static class WALReader extends SequenceFile.Reader {
-
- WALReader(final FileSystem fs, final Path p, final Configuration c)
- throws IOException {
- super(fs, p, c);
-
- }
-
- @Override
- protected FSDataInputStream openFile(FileSystem fs, Path file,
- int bufferSize, long length)
- throws IOException {
- return new WALReaderFSDataInputStream(super.openFile(fs, file,
bufferSize,
- length), length);
- }
-
- /**
- * Override just so can intercept first call to getPos.
- */
- static class WALReaderFSDataInputStream extends FSDataInputStream {
- private boolean firstGetPosInvocation = true;
- private long length;
-
- WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
- throws IOException {
- super(is);
- this.length = l;
- }
-
- @Override
- public long getPos() throws IOException {
- if (this.firstGetPosInvocation) {
- this.firstGetPosInvocation = false;
- // Tell a lie. We're doing this just so that this line up in
- // SequenceFile.Reader constructor comes out with the correct length
- // on the file:
- // this.end = in.getPos() + length;
- //
- long available = this.in.available();
- // Length gets added up in the SF.Reader constructor so subtract the
- // difference. If available < this.length, then return this.length.
- // I ain't sure what else to do.
- return available >= this.length? available - this.length:
this.length;
- }
- return super.getPos();
- }
+ @SuppressWarnings("unchecked")
+ public static Reader getReader(final FileSystem fs,
+ final Path path, HBaseConfiguration conf)
+ throws IOException {
+ try {
+ Class c = Class.forName(conf.get("hbase.regionserver.hlog.reader.impl",
+ SequenceFileLogReader.class.getCanonicalName()));
+ HLog.Reader reader = (HLog.Reader) c.newInstance();
+ reader.init(fs, path, conf);
+ return reader;
+ } catch (Exception e) {
+ IOException ie = new IOException("cannot get log reader");
+ ie.initCause(e);
+ throw ie;
}
}
/**
- * Get a Reader for WAL.
- * Reader is a subclass of SequenceFile.Reader. The subclass has amendments
- * to make it so we see edits up to the last sync (HDFS-265). Of note, we
- * can only see up to the sync that happened before this file was opened.
- * Will require us doing up our own WAL Reader if we want to keep up with
- * a syncing Writer.
- * @param p
- * @return A WAL Reader. Close when done with it.
+ * Get a writer for the WAL.
+ * @param path
+ * @param keyClass
+ * @param valueClass
+ * @return A WAL writer. Close when done with it.
* @throws IOException
*/
- public static SequenceFile.Reader getReader(final FileSystem fs,
- final Path p, final Configuration c)
- throws IOException {
- return new WALReader(fs, p, c);
- }
-
- protected SequenceFile.Writer createWriter(Path path,
- Class<? extends HLogKey> keyClass, Class<? extends KeyValue> valueClass)
- throws IOException {
- SequenceFile.Writer writer =
- SequenceFile.createWriter(this.fs, this.conf, path, keyClass,
- valueClass, fs.getConf().getInt("io.file.buffer.size", 4096),
- this.replicationLevel, this.blocksize,
- SequenceFile.CompressionType.NONE, new DefaultCodec(), null,
- new Metadata());
- // Get at the private FSDataOutputStream inside in SequenceFile so we can
- // call sync on it. Make it accessible. Stash it aside for call up in
- // the sync method above.
- final Field fields[] = writer.getClass().getDeclaredFields();
- final String fieldName = "out";
- for (int i = 0; i < fields.length; ++i) {
- if (fieldName.equals(fields[i].getName())) {
- try {
- fields[i].setAccessible(true);
- this.writer_out = (FSDataOutputStream)fields[i].get(writer);
- break;
- } catch (IllegalAccessException ex) {
- throw new IOException("Accessing " + fieldName, ex);
- }
- }
+ @SuppressWarnings("unchecked")
+ public static Writer createWriter(final FileSystem fs,
+ final Path path, HBaseConfiguration conf) throws IOException {
+ try {
+ Class c = Class.forName(conf.get("hbase.regionserver.hlog.writer.impl",
+ SequenceFileLogWriter.class.getCanonicalName()));
+ HLog.Writer writer = (HLog.Writer) c.newInstance();
+ writer.init(fs, path, conf);
+ return writer;
+ } catch (Exception e) {
+ IOException ie = new IOException("cannot get log writer");
+ ie.initCause(e);
+ throw ie;
}
- return writer;
}
/*
@@ -820,9 +760,6 @@
this.unflushedEntries.get() >= this.flushlogentries) {
try {
this.writer.sync();
- if (this.writer_out != null) {
- this.writer_out.sync();
- }
this.forceSync = false;
this.unflushedEntries.set(0);
} catch (IOException e) {
@@ -857,7 +794,7 @@
LOG.debug("edit=" + this.numEntries.get() + ", write=" +
logKey.toString());
}
- this.writer.append(logKey, logEdit);
+ this.writer.append(new HLog.Entry(logKey, logEdit));
long took = System.currentTimeMillis() - now;
if (took > 1000) {
LOG.warn(Thread.currentThread().getName() + " took " + took +
@@ -936,8 +873,9 @@
return;
}
synchronized (updateLock) {
- this.writer.append(makeKey(regionName, tableName, logSeqId,
System.currentTimeMillis()),
- completeCacheFlushLogEdit());
+ this.writer.append(new HLog.Entry(
+ makeKey(regionName, tableName, logSeqId, System.currentTimeMillis()),
+ completeCacheFlushLogEdit()));
this.numEntries.incrementAndGet();
Long seq = this.lastSeqWritten.get(regionName);
if (seq != null && logSeqId >= seq.longValue()) {
@@ -1018,20 +956,20 @@
// Private immutable datastructure to hold Writer and its Path.
private final static class WriterAndPath {
final Path p;
- final SequenceFile.Writer w;
- WriterAndPath(final Path p, final SequenceFile.Writer w) {
+ final Writer w;
+ WriterAndPath(final Path p, final Writer w) {
this.p = p;
this.w = w;
}
}
@SuppressWarnings("unchecked")
- static Class<? extends HLogKey> getKeyClass(HBaseConfiguration conf) {
+ public static Class<? extends HLogKey> getKeyClass(Configuration conf) {
return (Class<? extends HLogKey>)
conf.getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
}
- public static HLogKey newKey(HBaseConfiguration conf) throws IOException {
+ public static HLogKey newKey(Configuration conf) throws IOException {
Class<? extends HLogKey> keyClass = getKeyClass(conf);
try {
return keyClass.newInstance();
@@ -1072,8 +1010,8 @@
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
concurrentLogReads)).intValue();
for (int step = 0; step < maxSteps; step++) {
- final Map<byte[], LinkedList<HLogEntry>> logEntries =
- new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
+ final Map<byte[], LinkedList<HLog.Entry>> logEntries =
+ new TreeMap<byte[], LinkedList<HLog.Entry>>(Bytes.BYTES_COMPARATOR);
// Stop at logfiles.length when it's the last step
int endIndex = step == maxSteps - 1? logfiles.length:
step * concurrentLogReads + concurrentLogReads;
@@ -1086,28 +1024,22 @@
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + logfiles[i].getPath() + ", length=" +
logfiles[i].getLen());
}
- SequenceFile.Reader in = null;
+ Reader in = null;
int count = 0;
try {
in = HLog.getReader(fs, logfiles[i].getPath(), conf);
try {
- HLogKey key = newKey(conf);
- KeyValue val = new KeyValue();
- while (in.next(key, val)) {
- byte [] regionName = key.getRegionName();
- LinkedList<HLogEntry> queue = logEntries.get(regionName);
+ HLog.Entry entry;
+ while ((entry = in.next()) != null) {
+ byte [] regionName = entry.getKey().getRegionName();
+ LinkedList<HLog.Entry> queue = logEntries.get(regionName);
if (queue == null) {
- queue = new LinkedList<HLogEntry>();
+ queue = new LinkedList<HLog.Entry>();
LOG.debug("Adding queue for " +
Bytes.toStringBinary(regionName));
logEntries.put(regionName, queue);
}
- HLogEntry hle = new HLogEntry(val, key);
- queue.push(hle);
+ queue.push(entry);
count++;
- // Make the key and value new each time; otherwise same
instance
- // is used over and over.
- key = newKey(conf);
- val = new KeyValue();
}
LOG.debug("Pushed=" + count + " entries from " +
logfiles[i].getPath());
@@ -1148,17 +1080,17 @@
Thread thread = new Thread(Bytes.toStringBinary(key)) {
@Override
public void run() {
- LinkedList<HLogEntry> entries = logEntries.get(key);
+ LinkedList<HLog.Entry> entries = logEntries.get(key);
LOG.debug("Thread got " + entries.size() + " to process");
long threadTime = System.currentTimeMillis();
try {
int count = 0;
// Items were added to the linkedlist oldest first. Pull them
// out in that order.
- for (ListIterator<HLogEntry> i =
+ for (ListIterator<HLog.Entry> i =
entries.listIterator(entries.size());
i.hasPrevious();) {
- HLogEntry logEntry = i.previous();
+ HLog.Entry logEntry = i.previous();
WriterAndPath wap = logWriters.get(key);
if (wap == null) {
Path logfile = new
Path(HRegion.getRegionDir(HTableDescriptor
@@ -1166,7 +1098,7 @@
HRegionInfo.encodeRegionName(key)),
HREGION_OLDLOGFILE_NAME);
Path oldlogfile = null;
- SequenceFile.Reader old = null;
+ Reader old = null;
if (fs.exists(logfile)) {
FileStatus stat = fs.getFileStatus(logfile);
if (stat.getLen() <= 0) {
@@ -1178,12 +1110,10 @@
"exists. Copying existing file to new file");
oldlogfile = new Path(logfile.toString() + ".old");
fs.rename(logfile, oldlogfile);
- old = new SequenceFile.Reader(fs, oldlogfile, conf);
+ old = getReader(fs, oldlogfile, conf);
}
}
- SequenceFile.Writer w =
- SequenceFile.createWriter(fs, conf, logfile,
- getKeyClass(conf), KeyValue.class,
getCompressionType(conf));
+ Writer w = createWriter(fs, logfile, conf);
wap = new WriterAndPath(logfile, w);
logWriters.put(key, wap);
if (LOG.isDebugEnabled()) {
@@ -1193,20 +1123,19 @@
if (old != null) {
// Copy from existing log file
- HLogKey oldkey = newKey(conf);
- KeyValue oldval = new KeyValue();
- for (; old.next(oldkey, oldval); count++) {
+ HLog.Entry entry;
+ for (; (entry = old.next()) != null; count++) {
if (LOG.isDebugEnabled() && count > 0
&& count % 10000 == 0) {
LOG.debug("Copied " + count + " edits");
}
- w.append(oldkey, oldval);
+ w.append(entry);
}
old.close();
fs.delete(oldlogfile, true);
}
}
- wap.w.append(logEntry.getKey(), logEntry.getEdit());
+ wap.w.append(logEntry);
count++;
}
if (LOG.isDebugEnabled()) {
@@ -1249,18 +1178,24 @@
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
*/
- public static class HLogEntry {
+ public static class Entry {
private KeyValue edit;
private HLogKey key;
+
+ public Entry() {
+ edit = new KeyValue();
+ key = new HLogKey();
+ }
+
/**
* Constructor for both params
* @param edit log's edit
* @param key log's key
*/
- public HLogEntry(KeyValue edit, HLogKey key) {
+ public Entry(HLogKey key, KeyValue edit) {
super();
- this.edit = edit;
this.key = key;
+ this.edit = edit;
}
/**
* Gets the edit
@@ -1360,12 +1295,11 @@
if (!fs.isFile(logPath)) {
throw new IOException(args[i] + " is not a file");
}
- Reader log = new SequenceFile.Reader(fs, logPath, conf);
+ Reader log = getReader(fs, logPath, conf);
try {
- HLogKey key = new HLogKey();
- KeyValue val = new KeyValue();
- while (log.next(key, val)) {
- System.out.println(key.toString() + " " + val.toString());
+ HLog.Entry entry;
+ while ((entry = log.next()) != null) {
+ System.out.println(entry.toString());
}
} finally {
log.close();
@@ -1382,16 +1316,5 @@
public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + (5 * ClassSize.REFERENCE) +
ClassSize.ATOMIC_INTEGER + Bytes.SIZEOF_INT + (3 * Bytes.SIZEOF_LONG));
-
- static class HLogWriter extends SequenceFile.Writer {
- public HLogWriter(FileSystem arg0, Configuration arg1, Path arg2,
- Class<?> arg3, Class<?> arg4, int arg5, short arg6, long arg7,
- Progressable arg8, Metadata arg9) throws IOException {
- super(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
- }
-
- void flush() {
-
- }
- }
+
}
Added:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java?rev=892451&view=auto
==============================================================================
---
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
(added)
+++
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
Sat Dec 19 08:10:45 2009
@@ -0,0 +1,110 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.io.SequenceFile;
+
+public class SequenceFileLogReader implements HLog.Reader {
+
+ /**
+ * Hack just to set the correct file length up in SequenceFile.Reader.
+ * See HADOOP-6307. The below is all about setting the right length on the
+ * file we are reading. fs.getFileStatus(file).getLen() is passed down to
+ * a private SequenceFile.Reader constructor. This won't work. Need to do
+ * the available on the stream. The below is ugly. It makes getPos, the
+ * first time its called, return length of the file -- i.e. tell a lie --
just
+ * so this line up in SF.Reader's constructor ends up with right answer:
+ *
+ * this.end = in.getPos() + length;
+ *
+ */
+ private static class WALReader extends SequenceFile.Reader {
+
+ WALReader(final FileSystem fs, final Path p, final Configuration c)
+ throws IOException {
+ super(fs, p, c);
+
+ }
+
+ @Override
+ protected FSDataInputStream openFile(FileSystem fs, Path file,
+ int bufferSize, long length)
+ throws IOException {
+ return new WALReaderFSDataInputStream(super.openFile(fs, file,
+ bufferSize, length), length);
+ }
+
+ /**
+ * Override just so can intercept first call to getPos.
+ */
+ static class WALReaderFSDataInputStream extends FSDataInputStream {
+ private boolean firstGetPosInvocation = true;
+ private long length;
+
+ WALReaderFSDataInputStream(final FSDataInputStream is, final long l)
+ throws IOException {
+ super(is);
+ this.length = l;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ if (this.firstGetPosInvocation) {
+ this.firstGetPosInvocation = false;
+ // Tell a lie. We're doing this just so that this line up in
+ // SequenceFile.Reader constructor comes out with the correct length
+ // on the file:
+ // this.end = in.getPos() + length;
+ long available = this.in.available();
+ // Length gets added up in the SF.Reader constructor so subtract the
+ // difference. If available < this.length, then return this.length.
+ return available >= this.length? available - this.length:
this.length;
+ }
+ return super.getPos();
+ }
+ }
+ }
+
+ Configuration conf;
+ WALReader reader;
+
+ public SequenceFileLogReader() { }
+
+ @Override
+ public void init(FileSystem fs, Path path, Configuration conf)
+ throws IOException {
+ this.conf = conf;
+ reader = new WALReader(fs, path, conf);
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public HLog.Entry next() throws IOException {
+ return next(null);
+ }
+
+ @Override
+ public HLog.Entry next(HLog.Entry reuse) throws IOException {
+ if (reuse == null) {
+ HLogKey key = HLog.newKey(conf);
+ KeyValue val = new KeyValue();
+ if (reader.next(key, val)) {
+ return new HLog.Entry(key, val);
+ }
+ } else if (reader.next(reuse.getKey(), reuse.getEdit())) {
+ return reuse;
+ }
+ return null;
+ }
+
+}
Added:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=892451&view=auto
==============================================================================
---
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
(added)
+++
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
Sat Dec 19 08:10:45 2009
@@ -0,0 +1,74 @@
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.compress.DefaultCodec;
+
+public class SequenceFileLogWriter implements HLog.Writer {
+
+ SequenceFile.Writer writer;
+ FSDataOutputStream writer_out;
+
+ public SequenceFileLogWriter() { }
+
+ @Override
+ public void init(FileSystem fs, Path path, Configuration conf)
+ throws IOException {
+ writer = SequenceFile.createWriter(fs, conf, path,
+ HLog.getKeyClass(conf), KeyValue.class,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ (short) conf.getInt("hbase.regionserver.hlog.replication",
+ fs.getDefaultReplication()),
+ conf.getLong("hbase.regionserver.hlog.blocksize",
+ fs.getDefaultBlockSize()),
+ SequenceFile.CompressionType.NONE,
+ new DefaultCodec(),
+ null,
+ new Metadata());
+
+ // Get at the private FSDataOutputStream inside in SequenceFile so we can
+ // call sync on it. Make it accessible. Stash it aside for call up in
+ // the sync method.
+ final Field fields[] = writer.getClass().getDeclaredFields();
+ final String fieldName = "out";
+ for (int i = 0; i < fields.length; ++i) {
+ if (fieldName.equals(fields[i].getName())) {
+ try {
+ fields[i].setAccessible(true);
+ this.writer_out = (FSDataOutputStream)fields[i].get(writer);
+ break;
+ } catch (IllegalAccessException ex) {
+ throw new IOException("Accessing " + fieldName, ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void append(HLog.Entry entry) throws IOException {
+ this.writer.append(entry.getKey(), entry.getEdit());
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.writer.close();
+ }
+
+ @Override
+ public void sync() throws IOException {
+ this.writer.sync();
+ if (this.writer_out != null) {
+ this.writer_out.sync();
+ }
+ }
+
+}
Modified:
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
URL:
http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java?rev=892451&r1=892450&r2=892451&view=diff
==============================================================================
---
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
(original)
+++
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java
Sat Dec 19 08:10:45 2009
@@ -33,9 +33,6 @@
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-
/** JUnit test case for HLog */
public class TestHLog extends HBaseTestCase implements HConstants {
@@ -139,10 +136,10 @@
wal.sync();
// Open a Reader.
Path walPath = wal.computeFilename(wal.getFilenum());
- SequenceFile.Reader reader = HLog.getReader(this.fs, walPath, this.conf);
+ HLog.Reader reader = HLog.getReader(fs, walPath, conf);
int count = 0;
- HLogKey key = new HLogKey();
- while(reader.next(key)) count++;
+ HLog.Entry entry = new HLog.Entry();
+ while ((entry = reader.next(entry)) != null) count++;
assertEquals(total, count);
reader.close();
// Add test that checks to see that an open of a Reader works on a file
@@ -152,16 +149,16 @@
kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes));
wal.append(bytes, bytes, kvs, System.currentTimeMillis());
}
- reader = HLog.getReader(this.fs, walPath, this.conf);
+ reader = HLog.getReader(fs, walPath, conf);
count = 0;
- while(reader.next(key)) count++;
+ while((entry = reader.next(entry)) != null) count++;
assertTrue(count >= total);
reader.close();
// If I sync, should see double the edits.
wal.sync();
- reader = HLog.getReader(this.fs, walPath, this.conf);
+ reader = HLog.getReader(fs, walPath, conf);
count = 0;
- while(reader.next(key)) count++;
+ while((entry = reader.next(entry)) != null) count++;
assertEquals(total * 2, count);
// Now do a test that ensures stuff works when we go over block boundary,
// especially that we return good length on file.
@@ -173,16 +170,16 @@
}
// Now I should have written out lots of blocks. Sync then read.
wal.sync();
- reader = HLog.getReader(this.fs, walPath, this.conf);
+ reader = HLog.getReader(fs, walPath, conf);
count = 0;
- while(reader.next(key)) count++;
+ while((entry = reader.next(entry)) != null) count++;
assertEquals(total * 3, count);
reader.close();
// Close it and ensure that closed, Reader gets right length also.
wal.close();
- reader = HLog.getReader(this.fs, walPath, this.conf);
+ reader = HLog.getReader(fs, walPath, conf);
count = 0;
- while(reader.next(key)) count++;
+ while((entry = reader.next(entry)) != null) count++;
assertEquals(total * 3, count);
reader.close();
}
@@ -191,14 +188,15 @@
throws IOException {
assertEquals(howmany, splits.size());
for (int i = 0; i < splits.size(); i++) {
- SequenceFile.Reader r = HLog.getReader(this.fs, splits.get(i),
this.conf);
+ HLog.Reader reader = HLog.getReader(this.fs, splits.get(i), conf);
try {
- HLogKey key = new HLogKey();
- KeyValue kv = new KeyValue();
int count = 0;
String previousRegion = null;
long seqno = -1;
- while(r.next(key, kv)) {
+ HLog.Entry entry = new HLog.Entry();
+ while((entry = reader.next(entry)) != null) {
+ HLogKey key = entry.getKey();
+ KeyValue kv = entry.getEdit();
String region = Bytes.toString(key.getRegionName());
// Assert that all edits are for same region.
if (previousRegion != null) {
@@ -212,7 +210,7 @@
}
assertEquals(howmany * howmany, count);
} finally {
- r.close();
+ reader.close();
}
}
}
@@ -226,7 +224,7 @@
final byte [] regionName = Bytes.toBytes("regionname");
final byte [] tableName = Bytes.toBytes("tablename");
final byte [] row = Bytes.toBytes("row");
- Reader reader = null;
+ HLog.Reader reader = null;
HLog log = new HLog(fs, dir, this.conf, null);
try {
// Write columns named 1, 2, 3, etc. and then values of single byte
@@ -246,17 +244,20 @@
log = null;
// Now open a reader on the log and assert append worked.
reader = HLog.getReader(fs, filename, conf);
- HLogKey key = new HLogKey();
- KeyValue val = new KeyValue();
+ HLog.Entry entry = new HLog.Entry();
for (int i = 0; i < COL_COUNT; i++) {
- reader.next(key, val);
+ reader.next(entry);
+ HLogKey key = entry.getKey();
+ KeyValue val = entry.getEdit();
assertTrue(Bytes.equals(regionName, key.getRegionName()));
assertTrue(Bytes.equals(tableName, key.getTablename()));
assertTrue(Bytes.equals(row, val.getRow()));
assertEquals((byte)(i + '0'), val.getValue()[0]);
System.out.println(key + " " + val);
}
- while (reader.next(key, val)) {
+ while ((entry = reader.next(null)) != null) {
+ HLogKey key = entry.getKey();
+ KeyValue val = entry.getEdit();
// Assert only one more row... the meta flushed row.
assertTrue(Bytes.equals(regionName, key.getRegionName()));
assertTrue(Bytes.equals(tableName, key.getTablename()));