http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 06318f0..24f195d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -48,17 +48,20 @@ import org.apache.hadoop.hbase.wal.WALKey; class FSWALEntry extends Entry { // The below data members are denoted 'transient' just to highlight these are not persisted; // they are only in memory and held here while passing over the ring buffer. - private final transient long sequence; + private final transient long txid; private final transient boolean inMemstore; private final transient HRegionInfo hri; - private final Set<byte[]> familyNames; + private final transient Set<byte[]> familyNames; + // In the new WAL logic, we will rewrite failed WAL entries to new WAL file, so we need to avoid + // calling stampRegionSequenceId again. + private transient boolean stamped = false; - FSWALEntry(final long sequence, final WALKey key, final WALEdit edit, + FSWALEntry(final long txid, final WALKey key, final WALEdit edit, final HRegionInfo hri, final boolean inMemstore) { super(key, edit); this.inMemstore = inMemstore; this.hri = hri; - this.sequence = sequence; + this.txid = txid; if (inMemstore) { // construct familyNames here to reduce the work of log sinker. ArrayList<Cell> cells = this.getEdit().getCells(); @@ -80,7 +83,7 @@ class FSWALEntry extends Entry { } public String toString() { - return "sequence=" + this.sequence + ", " + super.toString(); + return "sequence=" + this.txid + ", " + super.toString(); }; boolean isInMemstore() { @@ -92,10 +95,10 @@ class FSWALEntry extends Entry { } /** - * @return The sequence on the ring buffer when this edit was added. + * @return The transaction id of this edit. */ - long getSequence() { - return this.sequence; + long getTxid() { + return this.txid; } /** @@ -103,9 +106,12 @@ class FSWALEntry extends Entry { * SIDE-EFFECT is our stamping the sequenceid into every Cell AND setting the sequenceid into the * MVCC WriteEntry!!!! * @return The sequenceid we stamped on this edit. - * @throws IOException */ long stampRegionSequenceId() throws IOException { + if (stamped) { + return getKey().getSequenceId(); + } + stamped = true; long regionSequenceId = WALKey.NO_SEQUENCE_ID; MultiVersionConcurrencyControl mvcc = getKey().getMvcc(); MultiVersionConcurrencyControl.WriteEntry we = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 42abeae..7161e1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -19,108 +19,42 @@ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE; -import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE; - import java.io.IOException; +import java.io.OutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; -import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; /** * Writer for protobuf-based WAL. */ [email protected](HBaseInterfaceAudience.CONFIG) -public class ProtobufLogWriter extends WriterBase { - private static final Log LOG = LogFactory.getLog(ProtobufLogWriter.class); - protected FSDataOutputStream output; - protected Codec.Encoder cellEncoder; - protected WALCellCodec.ByteStringCompressor compressor; - private boolean trailerWritten; - private WALTrailer trailer; - // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger - // than this size, it is written/read respectively, with a WARN message in the log. - private int trailerWarnSize; - - public ProtobufLogWriter() { - super(); - } - - protected WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext) - throws IOException { - return WALCellCodec.create(conf, null, compressionContext); - } - - protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder) - throws IOException { - if (!builder.hasWriterClsName()) { - builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName()); - } - if (!builder.hasCellCodecClsName()) { - builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf)); - } - return builder.build(); - } [email protected] +public class ProtobufLogWriter extends AbstractProtobufLogWriter + implements DefaultWALProvider.Writer { - @Override - @SuppressWarnings("deprecation") - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) - throws IOException { - super.init(fs, path, conf, overwritable); - assert this.output == null; - boolean doCompress = initializeCompressionContext(conf, path); - this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); - int bufferSize = FSUtils.getDefaultBufferSize(fs); - short replication = (short)conf.getInt( - "hbase.regionserver.hlog.replication", FSUtils.getDefaultReplication(fs, path)); - long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize", - FSUtils.getDefaultBlockSize(fs, path)); - output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); - output.write(ProtobufLogReader.PB_WAL_MAGIC); - boolean doTagCompress = doCompress - && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); - buildWALHeader(conf, - WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)) - .writeDelimitedTo(output); - - initAfterHeader(doCompress); - - // instantiate trailer to default value. - trailer = WALTrailer.newBuilder().build(); - if (LOG.isTraceEnabled()) { - LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress); - } - } + private static final Log LOG = LogFactory.getLog(ProtobufLogWriter.class); - protected void initAfterHeader(boolean doCompress) throws IOException { - WALCellCodec codec = getCodec(conf, this.compressionContext); - this.cellEncoder = codec.getEncoder(this.output); - if (doCompress) { - this.compressor = codec.getByteStringCompressor(); - } - } + protected FSDataOutputStream output; @Override public void append(Entry entry) throws IOException { entry.setCompressionContext(compressionContext); - entry.getKey().getBuilder(compressor). - setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output); + entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() + .writeDelimitedTo(output); for (Cell cell : entry.getEdit().getCells()) { // cellEncoder must assume little about the stream, since we write PB and cells in turn. cellEncoder.write(cell); } + length.set(output.getPos()); } @Override @@ -137,32 +71,6 @@ public class ProtobufLogWriter extends WriterBase { } } - WALTrailer buildWALTrailer(WALTrailer.Builder builder) { - return builder.build(); - } - - private void writeWALTrailer() { - try { - int trailerSize = 0; - if (this.trailer == null) { - // use default trailer. - LOG.warn("WALTrailer is null. Continuing with default."); - this.trailer = buildWALTrailer(WALTrailer.newBuilder()); - trailerSize = this.trailer.getSerializedSize(); - } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) { - // continue writing after warning the user. - LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + - trailerSize + " > " + this.trailerWarnSize); - } - this.trailer.writeTo(output); - output.writeInt(trailerSize); - output.write(ProtobufLogReader.PB_WAL_COMPLETE_MAGIC); - this.trailerWritten = true; - } catch (IOException ioe) { - LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe); - } - } - @Override public void sync() throws IOException { FSDataOutputStream fsdos = this.output; @@ -171,21 +79,35 @@ public class ProtobufLogWriter extends WriterBase { fsdos.hflush(); } + public FSDataOutputStream getStream() { + return this.output; + } + + @SuppressWarnings("deprecation") @Override - public long getLength() throws IOException { - try { - return this.output.getPos(); - } catch (NullPointerException npe) { - // Concurrent close... - throw new IOException(npe); - } + protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, + short replication, long blockSize) throws IOException { + this.output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, + null); } - public FSDataOutputStream getStream() { + @Override + protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { + output.write(magic); + header.writeDelimitedTo(output); + return output.getPos(); + } + + @Override + protected OutputStream getOutputStreamForCellEncoder() { return this.output; } - void setWALTrailer(WALTrailer walTrailer) { - this.trailer = walTrailer; + @Override + protected long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) throws IOException { + trailer.writeTo(output); + output.writeInt(trailer.getSerializedSize()); + output.write(magic); + return output.getPos(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java index 7de8367..b5c9a2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java @@ -18,33 +18,28 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.htrace.Span; /** - * A Future on a filesystem sync call. It given to a client or 'Handler' for it to wait on till - * the sync completes. - * - * <p>Handlers coming in call append, append, append, and then do a flush/sync of - * the edits they have appended the WAL before returning. Since sync takes a while to - * complete, we give the Handlers back this sync future to wait on until the - * actual HDFS sync completes. Meantime this sync future goes across the ringbuffer and into a - * sync runner thread; when it completes, it finishes up the future, the handler get or failed - * check completes and the Handler can then progress. + * A Future on a filesystem sync call. It given to a client or 'Handler' for it to wait on till the + * sync completes. + * <p> + * Handlers coming in call append, append, append, and then do a flush/sync of the edits they have + * appended the WAL before returning. Since sync takes a while to complete, we give the Handlers + * back this sync future to wait on until the actual HDFS sync completes. Meantime this sync future + * goes across a queue and is handled by a background thread; when it completes, it finishes up the + * future, the handler get or failed check completes and the Handler can then progress. * <p> - * This is just a partial implementation of Future; we just implement get and - * failure. Unimplemented methods throw {@link UnsupportedOperationException}. + * This is just a partial implementation of Future; we just implement get and failure. * <p> - * There is not a one-to-one correlation between dfs sync invocations and - * instances of this class. A single dfs sync call may complete and mark many - * SyncFutures as done; i.e. we batch up sync calls rather than do a dfs sync - * call every time a Handler asks for it. + * There is not a one-to-one correlation between dfs sync invocations and instances of this class. A + * single dfs sync call may complete and mark many SyncFutures as done; i.e. we batch up sync calls + * rather than do a dfs sync call every time a Handler asks for it. * <p> - * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even - * if it the first time, start the sync, then park the 'hitched' thread on a call to - * #get(). + * SyncFutures are immutable but recycled. Call #reset(long, Span) before use even if it the first + * time, start the sync, then park the 'hitched' thread on a call to #get(). */ @InterfaceAudience.Private class SyncFuture { @@ -54,17 +49,17 @@ class SyncFuture { private static final long NOT_DONE = 0; /** - * The sequence at which we were added to the ring buffer. + * The transaction id of this operation, monotonically increases. */ - private long ringBufferSequence; + private long txid; /** - * The sequence that was set in here when we were marked done. Should be equal - * or > ringBufferSequence. Put this data member into the NOT_DONE state while this - * class is in use. But for the first position on construction, let it be -1 so we can - * immediately call {@link #reset(long, Span)} below and it will work. + * The transaction id that was set in here when we were marked done. Should be equal or > txnId. + * Put this data member into the NOT_DONE state while this class is in use. But for the first + * position on construction, let it be -1 so we can immediately call {@link #reset(long, Span)} + * below and it will work. */ - private long doneSequence = -1; + private long doneTxid = -1; /** * If error, the associated throwable. Set when the future is 'done'. @@ -79,80 +74,83 @@ class SyncFuture { private Span span; /** - * Call this method to clear old usage and get it ready for new deploy. Call - * this method even if it is being used for the first time. - * - * @param sequence sequenceId from this Future's position in the RingBuffer + * Call this method to clear old usage and get it ready for new deploy. Call this method even if + * it is being used for the first time. + * @param txnId the new transaction id * @return this */ - synchronized SyncFuture reset(final long sequence) { - return reset(sequence, null); + synchronized SyncFuture reset(final long txnId) { + return reset(txnId, null); } /** - * Call this method to clear old usage and get it ready for new deploy. Call - * this method even if it is being used for the first time. - * + * Call this method to clear old usage and get it ready for new deploy. Call this method even if + * it is being used for the first time. * @param sequence sequenceId from this Future's position in the RingBuffer - * @param span curren span, detached from caller. Don't forget to attach it when - * resuming after a call to {@link #get()}. + * @param span curren span, detached from caller. Don't forget to attach it when resuming after a + * call to {@link #get()}. * @return this */ - synchronized SyncFuture reset(final long sequence, Span span) { - if (t != null && t != Thread.currentThread()) throw new IllegalStateException(); + synchronized SyncFuture reset(final long txnId, Span span) { + if (t != null && t != Thread.currentThread()) { + throw new IllegalStateException(); + } t = Thread.currentThread(); - if (!isDone()) throw new IllegalStateException("" + sequence + " " + Thread.currentThread()); - this.doneSequence = NOT_DONE; - this.ringBufferSequence = sequence; + if (!isDone()) { + throw new IllegalStateException("" + txnId + " " + Thread.currentThread()); + } + this.doneTxid = NOT_DONE; + this.txid = txnId; this.span = span; return this; } @Override public synchronized String toString() { - return "done=" + isDone() + ", ringBufferSequence=" + this.ringBufferSequence; + return "done=" + isDone() + ", txid=" + this.txid; } - synchronized long getRingBufferSequence() { - return this.ringBufferSequence; + synchronized long getTxid() { + return this.txid; } /** - * Retrieve the {@code span} instance from this Future. EventHandler calls - * this method to continue the span. Thread waiting on this Future musn't call - * this method until AFTER calling {@link #get()} and the future has been - * released back to the originating thread. + * Retrieve the {@code span} instance from this Future. EventHandler calls this method to continue + * the span. Thread waiting on this Future musn't call this method until AFTER calling + * {@link #get()} and the future has been released back to the originating thread. */ synchronized Span getSpan() { return this.span; } /** - * Used to re-attach a {@code span} to the Future. Called by the EventHandler - * after a it has completed processing and detached the span from its scope. + * Used to re-attach a {@code span} to the Future. Called by the EventHandler after a it has + * completed processing and detached the span from its scope. */ synchronized void setSpan(Span span) { this.span = span; } /** - * @param sequence Sync sequence at which this future 'completed'. - * @param t Can be null. Set if we are 'completing' on error (and this 't' is the error). - * @return True if we successfully marked this outstanding future as completed/done. - * Returns false if this future is already 'done' when this method called. + * @param txid the transaction id at which this future 'completed'. + * @param t Can be null. Set if we are 'completing' on error (and this 't' is the error). + * @return True if we successfully marked this outstanding future as completed/done. Returns false + * if this future is already 'done' when this method called. */ - synchronized boolean done(final long sequence, final Throwable t) { - if (isDone()) return false; + synchronized boolean done(final long txid, final Throwable t) { + if (isDone()) { + return false; + } this.throwable = t; - if (sequence < this.ringBufferSequence) { + if (txid < this.txid) { // Something badly wrong. if (throwable == null) { - this.throwable = new IllegalStateException("sequence=" + sequence + - ", ringBufferSequence=" + this.ringBufferSequence); + this.throwable = + new IllegalStateException("done txid=" + txid + ", my txid=" + this.txid); } } // Mark done. - this.doneSequence = sequence; + this.doneTxid = txid; // Wake up waiting threads. notify(); return true; @@ -166,21 +164,14 @@ class SyncFuture { while (!isDone()) { wait(1000); } - if (this.throwable != null) throw new ExecutionException(this.throwable); - return this.doneSequence; - } - - public Long get(long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException { - throw new UnsupportedOperationException(); - } - - public boolean isCancelled() { - throw new UnsupportedOperationException(); + if (this.throwable != null) { + throw new ExecutionException(this.throwable); + } + return this.doneTxid; } synchronized boolean isDone() { - return this.doneSequence != NOT_DONE; + return this.doneTxid != NOT_DONE; } synchronized boolean isThrowable() { http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java deleted file mode 100644 index 8188e02..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.wal; - -import java.io.IOException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.io.util.LRUDictionary; -import org.apache.hadoop.hbase.util.FSUtils; - -import org.apache.hadoop.hbase.wal.DefaultWALProvider; - -/** - * Context used by our wal dictionary compressor. Null if we're not to do our - * custom dictionary compression. - */ [email protected] -public abstract class WriterBase implements DefaultWALProvider.Writer { - - protected CompressionContext compressionContext; - protected Configuration conf; - - @Override - public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException { - this.conf = conf; - } - - public boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { - boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); - if (doCompress) { - try { - this.compressionContext = new CompressionContext(LRUDictionary.class, - FSUtils.isRecoveredEdits(path), conf.getBoolean( - CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true)); - } catch (Exception e) { - throw new IOException("Failed to initiate CompressionContext", e); - } - } - return doCompress; - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java index ea71701..32fe48b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.DataChecksum; @@ -417,7 +418,6 @@ public class FanOutOneBlockAsyncDFSOutputHelper { public boolean progress() { return DFS_CLIENT_ADAPTOR.isClientRunning(client); } - } static { @@ -579,6 +579,18 @@ public class FanOutOneBlockAsyncDFSOutputHelper { return futureList; } + /** + * Exception other than RemoteException thrown when calling create on namenode + */ + public static class NameNodeException extends IOException { + + private static final long serialVersionUID = 3143237406477095390L; + + public NameNodeException(Throwable cause) { + super(cause); + } + } + private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoop eventLoop) throws IOException { @@ -587,11 +599,20 @@ public class FanOutOneBlockAsyncDFSOutputHelper { DFSClient client = dfs.getClient(); String clientName = client.getClientName(); ClientProtocol namenode = client.getNamenode(); - HdfsFileStatus stat = FILE_CREATER.create(namenode, src, - FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, - new EnumSetWritable<CreateFlag>( - overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), - createParent, replication, blockSize); + HdfsFileStatus stat; + try { + stat = FILE_CREATER.create(namenode, src, + FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, + new EnumSetWritable<CreateFlag>( + overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), + createParent, replication, blockSize); + } catch (Exception e) { + if (e instanceof RemoteException) { + throw (RemoteException) e; + } else { + throw new NameNodeException(e); + } + } beginFileLease(client, src, stat.getFileId()); boolean succ = false; LocatedBlock locatedBlock = null; @@ -656,6 +677,13 @@ public class FanOutOneBlockAsyncDFSOutputHelper { }.resolve(dfs, f); } + public static boolean shouldRetryCreate(RemoteException e) { + // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name. + // For exceptions other than this, we just throw it out. This is same with + // DFSOutputStream.newStreamForCreate. + return e.getClassName().endsWith("RetryStartFileException"); + } + static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, ExtendedBlock block, long fileId) { for (int retry = 0;; retry++) { http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java new file mode 100644 index 0000000..2f5c299 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Base class of a WAL Provider that returns a single thread safe WAL that writes to HDFS. By + * default, this implementation picks a directory in HDFS based on a combination of + * <ul> + * <li>the HBase root directory + * <li>HConstants.HREGION_LOGDIR_NAME + * <li>the given factory's factoryId (usually identifying the regionserver by host:port) + * </ul> + * It also uses the providerId to differentiate among files. + */ [email protected] [email protected] +public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implements WALProvider { + + private static final Log LOG = LogFactory.getLog(AbstractFSWALProvider.class); + + // Only public so classes back in regionserver.wal can access + public interface Reader extends WAL.Reader { + /** + * @param fs File system. + * @param path Path. + * @param c Configuration. + * @param s Input stream that may have been pre-opened by the caller; may be null. + */ + void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException; + } + + protected volatile T wal; + protected WALFactory factory = null; + protected Configuration conf = null; + protected List<WALActionsListener> listeners = null; + protected String providerId = null; + protected AtomicBoolean initialized = new AtomicBoolean(false); + // for default wal provider, logPrefix won't change + protected String logPrefix = null; + + /** + * we synchronized on walCreateLock to prevent wal recreation in different threads + */ + private final Object walCreateLock = new Object(); + + /** + * @param factory factory that made us, identity used for FS layout. may not be null + * @param conf may not be null + * @param listeners may be null + * @param providerId differentiate between providers from one facotry, used for FS layout. may be + * null + */ + @Override + public void init(WALFactory factory, Configuration conf, List<WALActionsListener> listeners, + String providerId) throws IOException { + if (!initialized.compareAndSet(false, true)) { + throw new IllegalStateException("WALProvider.init should only be called once."); + } + this.factory = factory; + this.conf = conf; + this.listeners = listeners; + this.providerId = providerId; + // get log prefix + StringBuilder sb = new StringBuilder().append(factory.factoryId); + if (providerId != null) { + if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) { + sb.append(providerId); + } else { + sb.append(WAL_FILE_NAME_DELIMITER).append(providerId); + } + } + logPrefix = sb.toString(); + doInit(conf); + } + + @Override + public WAL getWAL(byte[] identifier, byte[] namespace) throws IOException { + T walCopy = wal; + if (walCopy == null) { + // only lock when need to create wal, and need to lock since + // creating hlog on fs is time consuming + synchronized (walCreateLock) { + walCopy = wal; + if (walCopy == null) { + walCopy = createWAL(); + wal = walCopy; + } + } + } + return walCopy; + } + + protected abstract T createWAL() throws IOException; + + protected abstract void doInit(Configuration conf) throws IOException; + + @Override + public void shutdown() throws IOException { + T log = this.wal; + if (log != null) { + log.shutdown(); + } + } + + @Override + public void close() throws IOException { + T log = this.wal; + if (log != null) { + log.close(); + } + } + + /** + * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the + * number of files (rolled and active). if either of them aren't, count 0 for that provider. + */ + @Override + public long getNumLogFiles() { + T log = this.wal; + return log == null ? 0 : log.getNumLogFiles(); + } + + /** + * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the + * size of files (rolled and active). if either of them aren't, count 0 for that provider. + */ + @Override + public long getLogFileSize() { + T log = this.wal; + return log == null ? 0 : log.getLogFileSize(); + } + + /** + * returns the number of rolled WAL files. + */ + @VisibleForTesting + public static int getNumRolledLogFiles(WAL wal) { + return ((AbstractFSWAL<?>) wal).getNumRolledLogFiles(); + } + + /** + * return the current filename from the current wal. + */ + @VisibleForTesting + public static Path getCurrentFileName(final WAL wal) { + return ((AbstractFSWAL<?>) wal).getCurrentFileName(); + } + + /** + * request a log roll, but don't actually do it. + */ + @VisibleForTesting + static void requestLogRoll(final WAL wal) { + ((AbstractFSWAL<?>) wal).requestLogRoll(); + } + + // should be package private; more visible for use in AbstractFSWAL + public static final String WAL_FILE_NAME_DELIMITER = "."; + /** The hbase:meta region's WAL filename extension */ + @VisibleForTesting + public static final String META_WAL_PROVIDER_ID = ".meta"; + static final String DEFAULT_PROVIDER_ID = "default"; + + // Implementation details that currently leak in tests or elsewhere follow + /** File Extension used while splitting an WAL into regions (HBASE-2312) */ + public static final String SPLITTING_EXT = "-splitting"; + + /** + * It returns the file create timestamp from the file name. For name format see + * {@link #validateWALFilename(String)} public until remaining tests move to o.a.h.h.wal + * @param wal must not be null + * @return the file number that is part of the WAL file name + */ + @VisibleForTesting + public static long extractFileNumFromWAL(final WAL wal) { + final Path walName = ((AbstractFSWAL<?>) wal).getCurrentFileName(); + if (walName == null) { + throw new IllegalArgumentException("The WAL path couldn't be null"); + } + final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER); + return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2 : 1)]); + } + + /** + * Pattern used to validate a WAL file name see {@link #validateWALFilename(String)} for + * description. + */ + private static final Pattern pattern = Pattern + .compile(".*\\.\\d*(" + META_WAL_PROVIDER_ID + ")*"); + + /** + * A WAL file name is of the format: <wal-name>{@link #WAL_FILE_NAME_DELIMITER} + * <file-creation-timestamp>[.meta]. provider-name is usually made up of a server-name and a + * provider-id + * @param filename name of the file to validate + * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> otherwise + */ + public static boolean validateWALFilename(String filename) { + return pattern.matcher(filename).matches(); + } + + /** + * Construct the directory name for all WALs on a given server. + * @param serverName Server name formatted as described in {@link ServerName} + * @return the relative WAL directory name, e.g. <code>.logs/1.example.org,60030,12345</code> if + * <code>serverName</code> passed is <code>1.example.org,60030,12345</code> + */ + public static String getWALDirectoryName(final String serverName) { + StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME); + dirName.append("/"); + dirName.append(serverName); + return dirName.toString(); + } + + /** + * Pulls a ServerName out of a Path generated according to our layout rules. In the below layouts, + * this method ignores the format of the logfile component. Current format: [base directory for + * hbase]/hbase/.logs/ServerName/logfile or [base directory for + * hbase]/hbase/.logs/ServerName-splitting/logfile Expected to work for individual log files and + * server-specific directories. + * @return null if it's not a log file. Returns the ServerName of the region server that created + * this log file otherwise. + */ + public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path) + throws IOException { + if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) { + return null; + } + + if (conf == null) { + throw new IllegalArgumentException("parameter conf must be set"); + } + + final String rootDir = conf.get(HConstants.HBASE_DIR); + if (rootDir == null || rootDir.isEmpty()) { + throw new IllegalArgumentException(HConstants.HBASE_DIR + " key not found in conf."); + } + + final StringBuilder startPathSB = new StringBuilder(rootDir); + if (!rootDir.endsWith("/")) { + startPathSB.append('/'); + } + startPathSB.append(HConstants.HREGION_LOGDIR_NAME); + if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) { + startPathSB.append('/'); + } + final String startPath = startPathSB.toString(); + + String fullPath; + try { + fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString(); + } catch (IllegalArgumentException e) { + LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage()); + return null; + } + + if (!fullPath.startsWith(startPath)) { + return null; + } + + final String serverNameAndFile = fullPath.substring(startPath.length()); + + if (serverNameAndFile.indexOf('/') < "a,0,0".length()) { + // Either it's a file (not a directory) or it's not a ServerName format + return null; + } + + Path p = new Path(path); + return getServerNameFromWALDirectoryName(p); + } + + /** + * This function returns region server name from a log file name which is in one of the following + * formats: + * <ul> + * <li>hdfs://<name node>/hbase/.logs/<server name>-splitting/...</li> + * <li>hdfs://<name node>/hbase/.logs/<server name>/...</li> + * </ul> + * @return null if the passed in logFile isn't a valid WAL file path + */ + public static ServerName getServerNameFromWALDirectoryName(Path logFile) { + String logDirName = logFile.getParent().getName(); + // We were passed the directory and not a file in it. + if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) { + logDirName = logFile.getName(); + } + ServerName serverName = null; + if (logDirName.endsWith(SPLITTING_EXT)) { + logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length()); + } + try { + serverName = ServerName.parseServerName(logDirName); + } catch (IllegalArgumentException ex) { + serverName = null; + LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage()); + } + if (serverName != null && serverName.getStartcode() < 0) { + LOG.warn("Invalid log file path=" + logFile); + serverName = null; + } + return serverName; + } + + public static boolean isMetaFile(Path p) { + return isMetaFile(p.getName()); + } + + public static boolean isMetaFile(String p) { + if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) { + return true; + } + return false; + } + + /** + * Get prefix of the log from its name, assuming WAL name in format of + * log_prefix.filenumber.log_suffix + * @param name Name of the WAL to parse + * @return prefix of the log + * @see AbstractFSWAL#getCurrentFileName() + */ + public static String getWALPrefixFromWALName(String name) { + int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf("."); + return name.substring(0, endIndex); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java new file mode 100644 index 0000000..bc142ce --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; +import org.apache.hadoop.hbase.regionserver.wal.AsyncProtobufLogWriter; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; + +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +/** + * A WAL provider that use {@link AsyncFSWAL}. + */ [email protected] [email protected] +public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> { + + // Only public so classes back in regionserver.wal can access + public interface AsyncWriter extends WALProvider.AsyncWriter { + void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException; + } + + private EventLoopGroup eventLoopGroup = null; + + @Override + protected AsyncFSWAL createWAL() throws IOException { + return new AsyncFSWAL(FileSystem.get(conf), FSUtils.getRootDir(conf), + getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, + true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, + eventLoopGroup.next()); + } + + @Override + protected void doInit(Configuration conf) throws IOException { + eventLoopGroup = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("AsyncFSWAL")); + } + + /** + * public because of AsyncFSWAL. Should be package-private + */ + public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, + boolean overwritable, EventLoop eventLoop) throws IOException { + AsyncWriter writer = new AsyncProtobufLogWriter(eventLoop); + writer.init(fs, path, conf, overwritable); + return writer; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java index 027e7a2..9f0d0ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DefaultWALProvider.java @@ -19,357 +19,42 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.regex.Pattern; - -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.util.FSUtils; - +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.FSHLog; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.util.FSUtils; /** - * A WAL Provider that returns a single thread safe WAL that writes to HDFS. - * By default, this implementation picks a directory in HDFS based on a combination of - * <ul> - * <li>the HBase root directory - * <li>HConstants.HREGION_LOGDIR_NAME - * <li>the given factory's factoryId (usually identifying the regionserver by host:port) - * </ul> - * It also uses the providerId to diffentiate among files. - * + * A WAL provider that use {@link FSHLog}. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class DefaultWALProvider implements WALProvider { - private static final Log LOG = LogFactory.getLog(DefaultWALProvider.class); +public class DefaultWALProvider extends AbstractFSWALProvider<FSHLog> { - // Only public so classes back in regionserver.wal can access - public interface Reader extends WAL.Reader { - /** - * @param fs File system. - * @param path Path. - * @param c Configuration. - * @param s Input stream that may have been pre-opened by the caller; may be null. - */ - void init(FileSystem fs, Path path, Configuration c, FSDataInputStream s) throws IOException; - } + private static final Log LOG = LogFactory.getLog(DefaultWALProvider.class); // Only public so classes back in regionserver.wal can access public interface Writer extends WALProvider.Writer { void init(FileSystem fs, Path path, Configuration c, boolean overwritable) throws IOException; } - protected volatile FSHLog log = null; - private WALFactory factory = null; - private Configuration conf = null; - private List<WALActionsListener> listeners = null; - private String providerId = null; - private AtomicBoolean initialized = new AtomicBoolean(false); - // for default wal provider, logPrefix won't change - private String logPrefix = null; - - /** - * we synchronized on walCreateLock to prevent wal recreation in different threads - */ - private final Object walCreateLock = new Object(); - - /** - * @param factory factory that made us, identity used for FS layout. may not be null - * @param conf may not be null - * @param listeners may be null - * @param providerId differentiate between providers from one facotry, used for FS layout. may be - * null - */ - @Override - public void init(final WALFactory factory, final Configuration conf, - final List<WALActionsListener> listeners, String providerId) throws IOException { - if (!initialized.compareAndSet(false, true)) { - throw new IllegalStateException("WALProvider.init should only be called once."); - } - this.factory = factory; - this.conf = conf; - this.listeners = listeners; - this.providerId = providerId; - // get log prefix - StringBuilder sb = new StringBuilder().append(factory.factoryId); - if (providerId != null) { - if (providerId.startsWith(WAL_FILE_NAME_DELIMITER)) { - sb.append(providerId); - } else { - sb.append(WAL_FILE_NAME_DELIMITER).append(providerId); - } - } - logPrefix = sb.toString(); - } - - @Override - public WAL getWAL(final byte[] identifier, byte[] namespace) throws IOException { - if (log == null) { - // only lock when need to create wal, and need to lock since - // creating hlog on fs is time consuming - synchronized (walCreateLock) { - if (log == null) { - log = new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf), - getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, - listeners, true, logPrefix, - META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); - } - } - } - return log; - } - - @Override - public void close() throws IOException { - if (log != null) log.close(); - } - - @Override - public void shutdown() throws IOException { - if (log != null) log.shutdown(); - } - - // should be package private; more visible for use in FSHLog - public static final String WAL_FILE_NAME_DELIMITER = "."; - /** The hbase:meta region's WAL filename extension */ - @VisibleForTesting - public static final String META_WAL_PROVIDER_ID = ".meta"; - static final String DEFAULT_PROVIDER_ID = "default"; - - // Implementation details that currently leak in tests or elsewhere follow - /** File Extension used while splitting an WAL into regions (HBASE-2312) */ - public static final String SPLITTING_EXT = "-splitting"; - - /** - * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, - * count the number of files (rolled and active). if either of them aren't, count 0 - * for that provider. - */ - @Override - public long getNumLogFiles() { - return log == null ? 0 : this.log.getNumLogFiles(); - } - - /** - * iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, - * count the size of files (rolled and active). if either of them aren't, count 0 - * for that provider. - */ - @Override - public long getLogFileSize() { - return log == null ? 0 : this.log.getLogFileSize(); - } - - /** - * returns the number of rolled WAL files. - */ - @VisibleForTesting - public static int getNumRolledLogFiles(WAL wal) { - return ((FSHLog)wal).getNumRolledLogFiles(); - } - - /** - * return the current filename from the current wal. - */ - @VisibleForTesting - public static Path getCurrentFileName(final WAL wal) { - return ((FSHLog)wal).getCurrentFileName(); - } - - /** - * request a log roll, but don't actually do it. - */ - @VisibleForTesting - static void requestLogRoll(final WAL wal) { - ((FSHLog)wal).requestLogRoll(); - } - - /** - * It returns the file create timestamp from the file name. - * For name format see {@link #validateWALFilename(String)} - * public until remaining tests move to o.a.h.h.wal - * @param wal must not be null - * @return the file number that is part of the WAL file name - */ - @VisibleForTesting - public static long extractFileNumFromWAL(final WAL wal) { - final Path walName = ((FSHLog)wal).getCurrentFileName(); - if (walName == null) { - throw new IllegalArgumentException("The WAL path couldn't be null"); - } - final String[] walPathStrs = walName.toString().split("\\" + WAL_FILE_NAME_DELIMITER); - return Long.parseLong(walPathStrs[walPathStrs.length - (isMetaFile(walName) ? 2:1)]); - } - - /** - * Pattern used to validate a WAL file name - * see {@link #validateWALFilename(String)} for description. - */ - private static final Pattern pattern = Pattern.compile(".*\\.\\d*("+META_WAL_PROVIDER_ID+")*"); - - /** - * A WAL file name is of the format: - * <wal-name>{@link #WAL_FILE_NAME_DELIMITER}<file-creation-timestamp>[.meta]. - * - * provider-name is usually made up of a server-name and a provider-id - * - * @param filename name of the file to validate - * @return <tt>true</tt> if the filename matches an WAL, <tt>false</tt> - * otherwise - */ - public static boolean validateWALFilename(String filename) { - return pattern.matcher(filename).matches(); - } - - /** - * Construct the directory name for all WALs on a given server. - * - * @param serverName - * Server name formatted as described in {@link ServerName} - * @return the relative WAL directory name, e.g. - * <code>.logs/1.example.org,60030,12345</code> if - * <code>serverName</code> passed is - * <code>1.example.org,60030,12345</code> - */ - public static String getWALDirectoryName(final String serverName) { - StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME); - dirName.append("/"); - dirName.append(serverName); - return dirName.toString(); - } - - /** - * Pulls a ServerName out of a Path generated according to our layout rules. - * - * In the below layouts, this method ignores the format of the logfile component. - * - * Current format: - * - * [base directory for hbase]/hbase/.logs/ServerName/logfile - * or - * [base directory for hbase]/hbase/.logs/ServerName-splitting/logfile - * - * Expected to work for individual log files and server-specific directories. - * - * @return null if it's not a log file. Returns the ServerName of the region - * server that created this log file otherwise. - */ - public static ServerName getServerNameFromWALDirectoryName(Configuration conf, String path) - throws IOException { - if (path == null - || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) { - return null; - } - - if (conf == null) { - throw new IllegalArgumentException("parameter conf must be set"); - } - - final String rootDir = conf.get(HConstants.HBASE_DIR); - if (rootDir == null || rootDir.isEmpty()) { - throw new IllegalArgumentException(HConstants.HBASE_DIR - + " key not found in conf."); - } - - final StringBuilder startPathSB = new StringBuilder(rootDir); - if (!rootDir.endsWith("/")) - startPathSB.append('/'); - startPathSB.append(HConstants.HREGION_LOGDIR_NAME); - if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) - startPathSB.append('/'); - final String startPath = startPathSB.toString(); - - String fullPath; - try { - fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString(); - } catch (IllegalArgumentException e) { - LOG.info("Call to makeQualified failed on " + path + " " + e.getMessage()); - return null; - } - - if (!fullPath.startsWith(startPath)) { - return null; - } - - final String serverNameAndFile = fullPath.substring(startPath.length()); - - if (serverNameAndFile.indexOf('/') < "a,0,0".length()) { - // Either it's a file (not a directory) or it's not a ServerName format - return null; - } - - Path p = new Path(path); - return getServerNameFromWALDirectoryName(p); - } - - /** - * This function returns region server name from a log file name which is in one of the following - * formats: - * <ul> - * <li>hdfs://<name node>/hbase/.logs/<server name>-splitting/...</li> - * <li>hdfs://<name node>/hbase/.logs/<server name>/...</li> - * </ul> - * @param logFile - * @return null if the passed in logFile isn't a valid WAL file path - */ - public static ServerName getServerNameFromWALDirectoryName(Path logFile) { - String logDirName = logFile.getParent().getName(); - // We were passed the directory and not a file in it. - if (logDirName.equals(HConstants.HREGION_LOGDIR_NAME)) { - logDirName = logFile.getName(); - } - ServerName serverName = null; - if (logDirName.endsWith(SPLITTING_EXT)) { - logDirName = logDirName.substring(0, logDirName.length() - SPLITTING_EXT.length()); - } - try { - serverName = ServerName.parseServerName(logDirName); - } catch (IllegalArgumentException ex) { - serverName = null; - LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage()); - } - if (serverName != null && serverName.getStartcode() < 0) { - LOG.warn("Invalid log file path=" + logFile); - serverName = null; - } - return serverName; - } - - public static boolean isMetaFile(Path p) { - return isMetaFile(p.getName()); - } - - public static boolean isMetaFile(String p) { - if (p != null && p.endsWith(META_WAL_PROVIDER_ID)) { - return true; - } - return false; - } - /** * public because of FSHLog. Should be package-private */ public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, - final boolean overwritable) - throws IOException { + final boolean overwritable) throws IOException { // Configuration already does caching for the Class lookup. Class<? extends Writer> logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl", - ProtobufLogWriter.class, Writer.class); + ProtobufLogWriter.class, Writer.class); try { Writer writer = logWriterClass.newInstance(); writer.init(fs, path, conf, overwritable); @@ -380,15 +65,14 @@ public class DefaultWALProvider implements WALProvider { } } - /** - * Get prefix of the log from its name, assuming WAL name in format of - * log_prefix.filenumber.log_suffix @see {@link FSHLog#getCurrentFileName()} - * @param name Name of the WAL to parse - * @return prefix of the log - */ - public static String getWALPrefixFromWALName(String name) { - int endIndex = name.replaceAll(META_WAL_PROVIDER_ID, "").lastIndexOf("."); - return name.substring(0, endIndex); + @Override + protected FSHLog createWAL() throws IOException { + return new FSHLog(FileSystem.get(conf), FSUtils.getRootDir(conf), + getWALDirectoryName(factory.factoryId), HConstants.HREGION_OLDLOGDIR_NAME, conf, listeners, + true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); } + @Override + protected void doInit(Configuration conf) throws IOException { + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index c3d4b2c..028c60b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -215,6 +215,10 @@ class DisabledWALProvider implements WALProvider { public String toString() { return "WAL disabled."; } + + @Override + public void logRollerExited() { + } } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 0b83528..051ce54 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -199,6 +199,13 @@ public interface WAL { String toString(); /** + * In some WAL implementation, we will write WAL entries to new file if sync failed, which means, + * the fail recovery is depended on log roller. So here we tell the WAL that log roller has + * already been exited so the WAL cloud give up recovery. + */ + void logRollerExited(); + + /** * When outside clients need to consume persisted WALs, they rely on a provided * Reader. */ @@ -268,7 +275,5 @@ public interface WAL { public String toString() { return this.key + "=" + this.edit; } - } - } http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 08f42aa..a2761df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -76,7 +76,8 @@ public class WALFactory { static enum Providers { defaultProvider(DefaultWALProvider.class), filesystem(DefaultWALProvider.class), - multiwal(RegionGroupingProvider.class); + multiwal(RegionGroupingProvider.class), + asyncfs(AsyncFSWALProvider.class); Class<? extends WALProvider> clazz; Providers(Class<? extends WALProvider> clazz) { @@ -350,9 +351,10 @@ public class WALFactory { /** * Create a writer for the WAL. + * <p> * should be package-private. public only for tests and * {@link org.apache.hadoop.hbase.regionserver.wal.Compressor} - * @return A WAL writer. Close when done with it. + * @return A WAL writer. Close when done with it. * @throws IOException */ public Writer createWALWriter(final FileSystem fs, final Path path) throws IOException { http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 2c500dc..ad79485 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -20,11 +20,11 @@ package org.apache.hadoop.hbase.wal; import java.io.Closeable; import java.io.IOException; +import java.nio.channels.CompletionHandler; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; - +import org.apache.hadoop.hbase.classification.InterfaceAudience; // imports for things that haven't moved from regionserver.wal yet. import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -80,6 +80,12 @@ public interface WALProvider { long getLength() throws IOException; } + interface AsyncWriter extends Closeable { + <A> void sync(CompletionHandler<Long, A> handler, A attachment); + void append(WAL.Entry entry); + long getLength(); + } + /** * Get number of the log files this provider is managing */ http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java new file mode 100644 index 0000000..7abdef9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -0,0 +1,332 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * Test log deletion as logs are rolled. + */ +public abstract class AbstractTestLogRolling { + private static final Log LOG = LogFactory.getLog(AbstractTestLogRolling.class); + protected HRegionServer server; + protected String tableName; + protected byte[] value; + protected FileSystem fs; + protected MiniDFSCluster dfsCluster; + protected Admin admin; + protected MiniHBaseCluster cluster; + protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + @Rule public final TestName name = new TestName(); + + public AbstractTestLogRolling() { + this.server = null; + this.tableName = null; + + String className = this.getClass().getName(); + StringBuilder v = new StringBuilder(className); + while (v.length() < 1000) { + v.append(className); + } + this.value = Bytes.toBytes(v.toString()); + } + + // Need to override this setup so we can edit the config before it gets sent + // to the HDFS & HBase cluster startup. + @BeforeClass + public static void setUpBeforeClass() throws Exception { + + + /**** configuration for testLogRolling ****/ + // Force a region split after every 768KB + TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L); + + // We roll the log after every 32 writes + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); + + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.logroll.errors.tolerated", 2); + TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); + + // For less frequently updated regions flush after every 2 flushes + TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2); + + // We flush the cache after every 8192 bytes + TEST_UTIL.getConfiguration().setInt( + HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192); + + // Increase the amount of time between client retries + TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000); + + // Reduce thread wake frequency so that other threads can get + // a chance to run. + TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000); + } + + @Before + public void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1, 1, 2); + + cluster = TEST_UTIL.getHBaseCluster(); + dfsCluster = TEST_UTIL.getDFSCluster(); + fs = TEST_UTIL.getTestFileSystem(); + admin = TEST_UTIL.getHBaseAdmin(); + + // disable region rebalancing (interferes with log watching) + cluster.getMaster().balanceSwitch(false); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + protected void startAndWriteData() throws IOException, InterruptedException { + // When the hbase:meta table can be opened, the region servers are running + TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); + this.server = cluster.getRegionServerThreads().get(0).getRegionServer(); + + Table table = createTestTable(this.tableName); + + server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); + for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls + doPut(table, i); + if (i % 32 == 0) { + // After every 32 writes sleep to let the log roller run + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // continue + } + } + } + } + + /** + * Tests that log rolling doesn't hang when no data is written. + */ + @Test(timeout=120000) + public void testLogRollOnNothingWritten() throws Exception { + final Configuration conf = TEST_UTIL.getConfiguration(); + final WALFactory wals = new WALFactory(conf, null, + ServerName.valueOf("test.com",8080, 1).toString()); + final WAL newLog = wals.getWAL(new byte[]{}, null); + try { + // Now roll the log before we write anything. + newLog.rollWriter(true); + } finally { + wals.close(); + } + } + + /** + * Tests that logs are deleted + * @throws IOException + * @throws org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException + */ + @Test + public void testLogRolling() throws Exception { + this.tableName = getName(); + // TODO: Why does this write data take for ever? + startAndWriteData(); + HRegionInfo region = + server.getOnlineRegions(TableName.valueOf(tableName)).get(0).getRegionInfo(); + final WAL log = server.getWAL(region); + LOG.info("after writing there are " + DefaultWALProvider.getNumRolledLogFiles(log) + + " log files"); + + // flush all regions + for (Region r: server.getOnlineRegionsLocalContext()) { + r.flush(true); + } + + // Now roll the log + log.rollWriter(); + + int count = DefaultWALProvider.getNumRolledLogFiles(log); + LOG.info("after flushing all regions and rolling logs there are " + count + " log files"); + assertTrue(("actual count: " + count), count <= 2); + } + + protected String getName() { + return "TestLogRolling-" + name.getMethodName(); + } + + void writeData(Table table, int rownum) throws IOException { + doPut(table, rownum); + + // sleep to let the log roller run (if it needs to) + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // continue + } + } + + void validateData(Table table, int rownum) throws IOException { + String row = "row" + String.format("%1$04d", rownum); + Get get = new Get(Bytes.toBytes(row)); + get.addFamily(HConstants.CATALOG_FAMILY); + Result result = table.get(get); + assertTrue(result.size() == 1); + assertTrue(Bytes.equals(value, + result.getValue(HConstants.CATALOG_FAMILY, null))); + LOG.info("Validated row " + row); + } + + void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) + throws IOException { + for (int i = 0; i < 10; i++) { + Put put = new Put(Bytes.toBytes("row" + + String.format("%1$04d", (start + i)))); + put.addColumn(HConstants.CATALOG_FAMILY, null, value); + table.put(put); + } + Put tmpPut = new Put(Bytes.toBytes("tmprow")); + tmpPut.addColumn(HConstants.CATALOG_FAMILY, null, value); + long startTime = System.currentTimeMillis(); + long remaining = timeout; + while (remaining > 0) { + if (log.isLowReplicationRollEnabled() == expect) { + break; + } else { + // Trigger calling FSHlog#checkLowReplication() + table.put(tmpPut); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + // continue + } + remaining = timeout - (System.currentTimeMillis() - startTime); + } + } + } + + /** + * Tests that logs are deleted when some region has a compaction + * record in WAL and no other records. See HBASE-8597. + */ + @Test + public void testCompactionRecordDoesntBlockRolling() throws Exception { + Table table = null; + + // When the hbase:meta table can be opened, the region servers are running + Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); + try { + table = createTestTable(getName()); + + server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); + Region region = server.getOnlineRegions(table.getName()).get(0); + final WAL log = server.getWAL(region.getRegionInfo()); + Store s = region.getStore(HConstants.CATALOG_FAMILY); + + //have to flush namespace to ensure it doesn't affect wall tests + admin.flush(TableName.NAMESPACE_TABLE_NAME); + + // Put some stuff into table, to make sure we have some files to compact. + for (int i = 1; i <= 2; ++i) { + doPut(table, i); + admin.flush(table.getName()); + } + doPut(table, 3); // don't flush yet, or compaction might trigger before we roll WAL + assertEquals("Should have no WAL after initial writes", 0, + DefaultWALProvider.getNumRolledLogFiles(log)); + assertEquals(2, s.getStorefilesCount()); + + // Roll the log and compact table, to have compaction record in the 2nd WAL. + log.rollWriter(); + assertEquals("Should have WAL; one table is not flushed", 1, + DefaultWALProvider.getNumRolledLogFiles(log)); + admin.flush(table.getName()); + region.compact(false); + // Wait for compaction in case if flush triggered it before us. + Assert.assertNotNull(s); + for (int waitTime = 3000; s.getStorefilesCount() > 1 && waitTime > 0; waitTime -= 200) { + Threads.sleepWithoutInterrupt(200); + } + assertEquals("Compaction didn't happen", 1, s.getStorefilesCount()); + + // Write some value to the table so the WAL cannot be deleted until table is flushed. + doPut(table, 0); // Now 2nd WAL will have both compaction and put record for table. + log.rollWriter(); // 1st WAL deleted, 2nd not deleted yet. + assertEquals("Should have WAL; one table is not flushed", 1, + DefaultWALProvider.getNumRolledLogFiles(log)); + + // Flush table to make latest WAL obsolete; write another record, and roll again. + admin.flush(table.getName()); + doPut(table, 1); + log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. + assertEquals("Should have 1 WALs at the end", 1, + DefaultWALProvider.getNumRolledLogFiles(log)); + } finally { + if (t != null) t.close(); + if (table != null) table.close(); + } + } + + protected void doPut(Table table, int i) throws IOException { + Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); + put.addColumn(HConstants.CATALOG_FAMILY, null, value); + table.put(put); + } + + protected Table createTestTable(String tableName) throws IOException { + // Create the test table and open it + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + admin.createTable(desc); + return TEST_UTIL.getConnection().getTable(desc.getTableName()); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java new file mode 100644 index 0000000..a4267a0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestProtobufLog.java @@ -0,0 +1,209 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.SampleRegionWALObserver; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; + +/** + * WAL tests that can be reused across providers. + */ +public abstract class AbstractTestProtobufLog<W extends Closeable> { + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + protected FileSystem fs; + protected Path dir; + protected WALFactory wals; + + @Rule + public final TestName currentTest = new TestName(); + + @Before + public void setUp() throws Exception { + fs = TEST_UTIL.getDFSCluster().getFileSystem(); + dir = new Path(TEST_UTIL.createRootDir(), currentTest.getMethodName()); + wals = new WALFactory(TEST_UTIL.getConfiguration(), null, currentTest.getMethodName()); + } + + @After + public void tearDown() throws Exception { + wals.close(); + FileStatus[] entries = fs.listStatus(new Path("/")); + for (FileStatus dir : entries) { + fs.delete(dir.getPath(), true); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Make block sizes small. + TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); + // needed for testAppendClose() + TEST_UTIL.getConfiguration().setBoolean("dfs.support.broken.append", true); + TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); + // quicker heartbeat interval for faster DN death notification + TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); + TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); + TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); + + // faster failover with cluster.shutdown();fs.close() idiom + TEST_UTIL.getConfiguration() + .setInt("hbase.ipc.client.connect.max.retries", 1); + TEST_UTIL.getConfiguration().setInt( + "dfs.client.block.recovery.retries", 1); + TEST_UTIL.getConfiguration().setInt( + "hbase.ipc.client.connection.maxidletime", 500); + TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + SampleRegionWALObserver.class.getName()); + TEST_UTIL.startMiniDFSCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Reads the WAL with and without WALTrailer. + * @throws IOException + */ + @Test + public void testWALTrailer() throws IOException { + // read With trailer. + doRead(true); + // read without trailer + doRead(false); + } + + /** + * Appends entries in the WAL and reads it. + * @param withTrailer If 'withTrailer' is true, it calls a close on the WALwriter before reading + * so that a trailer is appended to the WAL. Otherwise, it starts reading after the sync + * call. This means that reader is not aware of the trailer. In this scenario, if the + * reader tries to read the trailer in its next() call, it returns false from + * ProtoBufLogReader. + * @throws IOException + */ + private void doRead(boolean withTrailer) throws IOException { + final int columnCount = 5; + final int recordCount = 5; + final TableName tableName = + TableName.valueOf("tablename"); + final byte[] row = Bytes.toBytes("row"); + long timestamp = System.currentTimeMillis(); + Path path = new Path(dir, "tempwal"); + // delete the log if already exists, for test only + fs.delete(path, true); + W writer = null; + ProtobufLogReader reader = null; + try { + HRegionInfo hri = new HRegionInfo(tableName, + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + HTableDescriptor htd = new HTableDescriptor(tableName); + fs.mkdirs(dir); + // Write log in pb format. + writer = createWriter(path); + for (int i = 0; i < recordCount; ++i) { + WALKey key = new WALKey( + hri.getEncodedNameAsBytes(), tableName, i, timestamp, HConstants.DEFAULT_CLUSTER_ID); + WALEdit edit = new WALEdit(); + for (int j = 0; j < columnCount; ++j) { + if (i == 0) { + htd.addFamily(new HColumnDescriptor("column" + j)); + } + String value = i + "" + j; + edit.add(new KeyValue(row, row, row, timestamp, Bytes.toBytes(value))); + } + append(writer, new WAL.Entry(key, edit)); + } + sync(writer); + if (withTrailer) writer.close(); + + // Now read the log using standard means. + reader = (ProtobufLogReader) wals.createReader(fs, path); + if (withTrailer) { + assertNotNull(reader.trailer); + } else { + assertNull(reader.trailer); + } + for (int i = 0; i < recordCount; ++i) { + WAL.Entry entry = reader.next(); + assertNotNull(entry); + assertEquals(columnCount, entry.getEdit().size()); + assertArrayEquals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName()); + assertEquals(tableName, entry.getKey().getTablename()); + int idx = 0; + for (Cell val : entry.getEdit().getCells()) { + assertTrue(Bytes.equals(row, 0, row.length, val.getRowArray(), val.getRowOffset(), + val.getRowLength())); + String value = i + "" + idx; + assertArrayEquals(Bytes.toBytes(value), CellUtil.cloneValue(val)); + idx++; + } + } + WAL.Entry entry = reader.next(); + assertNull(entry); + } finally { + if (writer != null) { + writer.close(); + } + if (reader != null) { + reader.close(); + } + } + } + + protected abstract W createWriter(Path path) throws IOException; + + protected abstract void append(W writer, WAL.Entry entry) throws IOException; + + protected abstract void sync(W writer) throws IOException; +}
