Repository: hbase Updated Branches: refs/heads/master a3b4575f7 -> 6e9d355b1
HBASE-15264 Implement a fan out HDFS OutputStream Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6e9d355b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6e9d355b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6e9d355b Branch: refs/heads/master Commit: 6e9d355b12a1e666f4d05be02775a01b6754d063 Parents: a3b4575 Author: zhangduo <zhang...@apache.org> Authored: Wed Feb 24 20:47:38 2016 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Thu Feb 25 10:07:27 2016 +0800 ---------------------------------------------------------------------- .../util/FanOutOneBlockAsyncDFSOutput.java | 533 +++++++++++++++ .../FanOutOneBlockAsyncDFSOutputHelper.java | 672 +++++++++++++++++++ ...anOutOneBlockAsyncDFSOutputFlushHandler.java | 61 ++ .../util/TestFanOutOneBlockAsyncDFSOutput.java | 190 ++++++ 4 files changed, 1456 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9d355b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java new file mode 100644 index 0000000..b10f180 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java @@ -0,0 +1,533 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; +import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.completeFile; +import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; +import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.getStatus; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.channels.CompletionHandler; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.Deque; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; +import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.util.DataChecksum; + +import com.google.common.base.Supplier; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.EventLoop; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; + +/** + * An asynchronous HDFS output stream implementation which fans out data to datanode and only + * supports writing file with only one block. + * <p> + * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly + * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the + * method. And we place it here under util package because we want to make it independent of WAL + * implementation thus easier to move it to HDFS project finally. + * <p> + * Note that, all connections to datanode will run in the same {@link EventLoop} which means we only + * need one thread here. But be careful, we do some blocking operations in {@link #close()} and + * {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside + * {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)}, + * {@link #buffered()} and {@link #flush(Object, CompletionHandler, boolean)}, if you call them + * outside {@link EventLoop}, there will be an extra context-switch. + * <p> + * Advantages compare to DFSOutputStream: + * <ol> + * <li>The fan out mechanism. This will reduce the latency.</li> + * <li>The asynchronous WAL could also run in the same EventLoop, we could just call write and flush + * inside the EventLoop thread, so generally we only have one thread to do all the things.</li> + * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer + * ASAP.</li> + * <li>We could benefit from netty's ByteBuf management mechanism.</li> + * </ol> + */ +@InterfaceAudience.Private +public class FanOutOneBlockAsyncDFSOutput implements Closeable { + + private final Configuration conf; + + private final FSUtils fsUtils; + + private final DistributedFileSystem dfs; + + private final DFSClient client; + + private final ClientProtocol namenode; + + private final String clientName; + + private final String src; + + private final long fileId; + + private final LocatedBlock locatedBlock; + + private final EventLoop eventLoop; + + private final List<Channel> datanodeList; + + private final DataChecksum summer; + + private final ByteBufAllocator alloc; + + private static final class Callback { + + public final Promise<Void> promise; + + public final long ackedLength; + + public final Set<Channel> unfinishedReplicas; + + public Callback(Promise<Void> promise, long ackedLength, Collection<Channel> replicas) { + this.promise = promise; + this.ackedLength = ackedLength; + if (replicas.isEmpty()) { + this.unfinishedReplicas = Collections.emptySet(); + } else { + this.unfinishedReplicas = Collections + .newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size())); + this.unfinishedReplicas.addAll(replicas); + } + } + } + + private final Deque<Callback> waitingAckQueue = new ArrayDeque<>(); + + // this could be different from acked block length because a packet can not start at the middle of + // a chunk. + private long nextPacketOffsetInBlock = 0L; + + private long nextPacketSeqno = 0L; + + private ByteBuf buf; + + private enum State { + STREAMING, CLOSING, BROKEN, CLOSED + } + + private State state; + + private void completed(Channel channel) { + if (waitingAckQueue.isEmpty()) { + return; + } + for (Callback c : waitingAckQueue) { + if (c.unfinishedReplicas.remove(channel)) { + if (c.unfinishedReplicas.isEmpty()) { + c.promise.trySuccess(null); + // since we will remove the Callback entry from waitingAckQueue if its unfinishedReplicas + // is empty, so this could only happen at the head of waitingAckQueue, so we just call + // removeFirst here. + waitingAckQueue.removeFirst(); + // also wake up flush requests which have the same length. + for (Callback cb; (cb = waitingAckQueue.peekFirst()) != null;) { + if (cb.ackedLength == c.ackedLength) { + cb.promise.trySuccess(null); + waitingAckQueue.removeFirst(); + } else { + break; + } + } + } + return; + } + } + } + + private void failed(Channel channel, Supplier<Throwable> errorSupplier) { + if (state == State.BROKEN || state == State.CLOSED) { + return; + } + if (state == State.CLOSING) { + Callback c = waitingAckQueue.peekFirst(); + if (c == null || !c.unfinishedReplicas.contains(channel)) { + // nothing, the endBlock request has already finished. + return; + } + } + // disable further write, and fail all pending ack. + state = State.BROKEN; + Throwable error = errorSupplier.get(); + for (Callback c : waitingAckQueue) { + c.promise.tryFailure(error); + } + waitingAckQueue.clear(); + for (Channel ch : datanodeList) { + ch.close(); + } + } + + private void setupReceiver(final int timeoutMs) { + SimpleChannelInboundHandler<PipelineAckProto> ackHandler = new SimpleChannelInboundHandler<PipelineAckProto>() { + + @Override + public boolean isSharable() { + return true; + } + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, PipelineAckProto ack) + throws Exception { + final Status reply = getStatus(ack); + if (reply != Status.SUCCESS) { + failed(ctx.channel(), new Supplier<Throwable>() { + + @Override + public Throwable get() { + return new IOException("Bad response " + reply + " for block " + + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()); + } + }); + return; + } + if (PipelineAck.isRestartOOBStatus(reply)) { + failed(ctx.channel(), new Supplier<Throwable>() { + + @Override + public Throwable get() { + return new IOException("Restart response " + reply + " for block " + + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()); + } + }); + return; + } + if (ack.getSeqno() == HEART_BEAT_SEQNO) { + return; + } + completed(ctx.channel()); + } + + @Override + public void channelInactive(final ChannelHandlerContext ctx) throws Exception { + failed(ctx.channel(), new Supplier<Throwable>() { + + @Override + public Throwable get() { + return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"); + } + }); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) + throws Exception { + failed(ctx.channel(), new Supplier<Throwable>() { + + @Override + public Throwable get() { + return cause; + } + }); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + if (e.state() == IdleState.READER_IDLE) { + failed(ctx.channel(), new Supplier<Throwable>() { + + @Override + public Throwable get() { + return new IOException("Timeout(" + timeoutMs + "ms) waiting for response"); + } + }); + } else if (e.state() == IdleState.WRITER_IDLE) { + PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); + int len = heartbeat.getSerializedSize(); + ByteBuf buf = alloc.buffer(len); + heartbeat.putInBuffer(buf.nioBuffer(0, len)); + buf.writerIndex(len); + ctx.channel().writeAndFlush(buf); + } + return; + } + super.userEventTriggered(ctx, evt); + } + + }; + for (Channel ch : datanodeList) { + ch.pipeline().addLast( + new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), + new ProtobufVarint32FrameDecoder(), + new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler); + ch.config().setAutoRead(true); + } + } + + FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs, + DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, + LocatedBlock locatedBlock, EventLoop eventLoop, List<Channel> datanodeList, + DataChecksum summer, ByteBufAllocator alloc) { + this.conf = conf; + this.fsUtils = fsUtils; + this.dfs = dfs; + this.client = client; + this.namenode = namenode; + this.fileId = fileId; + this.clientName = clientName; + this.src = src; + this.locatedBlock = locatedBlock; + this.eventLoop = eventLoop; + this.datanodeList = datanodeList; + this.summer = summer; + this.alloc = alloc; + this.buf = alloc.directBuffer(); + this.state = State.STREAMING; + setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT)); + } + + /** + * Just call write(b, 0, b.length). + * @see #write(byte[], int, int) + */ + public void write(byte[] b) { + write(b, 0, b.length); + } + + /** + * Copy the data into the buffer. Note that you need to call + * {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually. + */ + public void write(final byte[] b, final int off, final int len) { + if (eventLoop.inEventLoop()) { + buf.ensureWritable(len).writeBytes(b, off, len); + } else { + eventLoop.submit(new Runnable() { + + @Override + public void run() { + buf.ensureWritable(len).writeBytes(b, off, len); + } + }).syncUninterruptibly(); + } + } + + /** + * Return the current size of buffered data. + */ + public int buffered() { + if (eventLoop.inEventLoop()) { + return buf.readableBytes(); + } else { + return eventLoop.submit(new Callable<Integer>() { + + @Override + public Integer call() throws Exception { + return buf.readableBytes(); + } + }).syncUninterruptibly().getNow().intValue(); + } + } + + public DatanodeInfo[] getPipeline() { + return locatedBlock.getLocations(); + } + + private <A> void flush0(final A attachment, final CompletionHandler<Long, ? super A> handler, + boolean syncBlock) { + if (state != State.STREAMING) { + handler.failed(new IOException("stream already broken"), attachment); + return; + } + int dataLen = buf.readableBytes(); + final long ackedLength = nextPacketOffsetInBlock + dataLen; + if (ackedLength == locatedBlock.getBlock().getNumBytes()) { + // no new data, just return + handler.completed(locatedBlock.getBlock().getNumBytes(), attachment); + return; + } + Promise<Void> promise = eventLoop.newPromise(); + promise.addListener(new FutureListener<Void>() { + + @Override + public void operationComplete(Future<Void> future) throws Exception { + if (future.isSuccess()) { + locatedBlock.getBlock().setNumBytes(ackedLength); + handler.completed(ackedLength, attachment); + } else { + handler.failed(future.cause(), attachment); + } + } + }); + Callback c = waitingAckQueue.peekLast(); + if (c != null && ackedLength == c.ackedLength) { + // just append it to the tail of waiting ack queue,, do not issue new hflush request. + waitingAckQueue + .addLast(new Callback(promise, ackedLength, Collections.<Channel> emptyList())); + return; + } + int chunkLen = summer.getBytesPerChecksum(); + int trailingPartialChunkLen = dataLen % chunkLen; + int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0); + int checksumLen = numChecks * summer.getChecksumSize(); + ByteBuf checksumBuf = alloc.directBuffer(checksumLen); + summer.calculateChunkedSums(buf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen)); + checksumBuf.writerIndex(checksumLen); + PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock, + nextPacketSeqno, false, dataLen, syncBlock); + int headerLen = header.getSerializedSize(); + ByteBuf headerBuf = alloc.buffer(headerLen); + header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); + headerBuf.writerIndex(headerLen); + + waitingAckQueue.addLast(new Callback(promise, ackedLength, datanodeList)); + for (Channel ch : datanodeList) { + ch.write(headerBuf.duplicate().retain()); + ch.write(checksumBuf.duplicate().retain()); + ch.writeAndFlush(buf.duplicate().retain()); + } + checksumBuf.release(); + headerBuf.release(); + ByteBuf newBuf = alloc.directBuffer().ensureWritable(trailingPartialChunkLen); + if (trailingPartialChunkLen != 0) { + buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen); + } + buf.release(); + this.buf = newBuf; + nextPacketOffsetInBlock += dataLen - trailingPartialChunkLen; + nextPacketSeqno++; + } + + /** + * Flush the buffer out to datanodes. + * @param attachment will be passed to handler when completed. + * @param handler will set the acked length as result when completed. + * @param syncBlock will call hsync if true, otherwise hflush. + */ + public <A> void flush(final A attachment, final CompletionHandler<Long, ? super A> handler, + final boolean syncBlock) { + if (eventLoop.inEventLoop()) { + flush0(attachment, handler, syncBlock); + } else { + eventLoop.execute(new Runnable() { + + @Override + public void run() { + flush0(attachment, handler, syncBlock); + } + }); + } + } + + private void endBlock(Promise<Void> promise, long size) { + if (state != State.STREAMING) { + promise.tryFailure(new IOException("stream already broken")); + return; + } + if (!waitingAckQueue.isEmpty()) { + promise.tryFailure(new IllegalStateException("should call flush first before calling close")); + return; + } + state = State.CLOSING; + PacketHeader header = new PacketHeader(4, size, nextPacketSeqno, true, 0, false); + buf.release(); + buf = null; + int headerLen = header.getSerializedSize(); + ByteBuf headerBuf = alloc.buffer(headerLen); + header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); + headerBuf.writerIndex(headerLen); + waitingAckQueue.add(new Callback(promise, size, datanodeList)); + for (Channel ch : datanodeList) { + ch.writeAndFlush(headerBuf.duplicate().retain()); + } + headerBuf.release(); + } + + /** + * The close method when error occurred. Now we just call recoverFileLease. + */ + public void recoverAndClose(CancelableProgressable reporter) throws IOException { + assert !eventLoop.inEventLoop(); + for (Channel ch : datanodeList) { + ch.closeFuture().awaitUninterruptibly(); + } + endFileLease(client, src, fileId); + fsUtils.recoverFileLease(dfs, new Path(src), conf, + reporter == null ? new CancelOnClose(client) : reporter); + } + + /** + * End the current block and complete file at namenode. You should call + * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. + */ + @Override + public void close() throws IOException { + assert !eventLoop.inEventLoop(); + final Promise<Void> promise = eventLoop.newPromise(); + eventLoop.execute(new Runnable() { + + @Override + public void run() { + endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes()); + } + }); + promise.addListener(new FutureListener<Void>() { + + @Override + public void operationComplete(Future<Void> future) throws Exception { + for (Channel ch : datanodeList) { + ch.close(); + } + } + }).syncUninterruptibly(); + for (Channel ch : datanodeList) { + ch.closeFuture().awaitUninterruptibly(); + } + completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9d355b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java new file mode 100644 index 0000000..d34bbb0 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; +import static org.apache.hadoop.fs.CreateFlag.CREATE; +import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; +import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemLinkResolver; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.Op; +import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.DataChecksum; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.CodedOutputStream; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoop; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; + +/** + * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. + */ +@InterfaceAudience.Private +public class FanOutOneBlockAsyncDFSOutputHelper { + + private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputHelper.class); + + private FanOutOneBlockAsyncDFSOutputHelper() { + } + + // use pooled allocator for performance. + private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; + + // copied from DFSPacket since it is package private. + public static final long HEART_BEAT_SEQNO = -1L; + + // helper class for creating DataChecksum object. + private static final Method CREATE_CHECKSUM; + + // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a + // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may + // get from proto directly, or combined by the reply field of the proto and a ECN object. See + // createPipelineAckStatusGetter for more details. + private interface PipelineAckStatusGetter { + Status get(PipelineAckProto ack); + } + + private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER; + + // StorageType enum is added in hadoop 2.4, but it is moved to another package in hadoop 2.6 and + // the setter method in OpWriteBlockProto is also added in hadoop 2.6. So we need to skip the + // setStorageType call if it is hadoop 2.5 or before. See createStorageTypeSetter for more + // details. + private interface StorageTypeSetter { + OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType); + } + + private static final StorageTypeSetter STORAGE_TYPE_SETTER; + + // helper class for calling create method on namenode. There is a supportedVersions parameter for + // hadoop 2.6 or after. See createFileCreater for more details. + private interface FileCreater { + HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked, + String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, + short replication, long blockSize) throws IOException; + } + + private static final FileCreater FILE_CREATER; + + // helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and + // hadoop 2.5 or after use inodeId. See createLeaseManager for more details. + private interface LeaseManager { + + void begin(DFSClient client, String src, long inodeId); + + void end(DFSClient client, String src, long inodeId); + } + + private static final LeaseManager LEASE_MANAGER; + + // This is used to terminate a recoverFileLease call when FileSystem is already closed. + // isClientRunning is not public so we need to use reflection. + private interface DFSClientAdaptor { + boolean isClientRunning(DFSClient client); + } + + private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; + + private static DFSClientAdaptor createDFSClientAdaptor() { + try { + final Method method = DFSClient.class.getDeclaredMethod("isClientRunning"); + method.setAccessible(true); + return new DFSClientAdaptor() { + + @Override + public boolean isClientRunning(DFSClient client) { + try { + return (Boolean) method.invoke(client); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } catch (NoSuchMethodException e) { + throw new Error(e); + } + } + + private static LeaseManager createLeaseManager() { + try { + final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease", + long.class, DFSOutputStream.class); + beginFileLeaseMethod.setAccessible(true); + final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", + long.class); + endFileLeaseMethod.setAccessible(true); + return new LeaseManager() { + + @Override + public void begin(DFSClient client, String src, long inodeId) { + try { + beginFileLeaseMethod.invoke(client, inodeId, null); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public void end(DFSClient client, String src, long inodeId) { + try { + endFileLeaseMethod.invoke(client, inodeId); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } catch (NoSuchMethodException e) { + LOG.warn("No inodeId related lease methods found, should be hadoop 2.4-", e); + } + try { + final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease", + String.class, DFSOutputStream.class); + beginFileLeaseMethod.setAccessible(true); + final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", + String.class); + endFileLeaseMethod.setAccessible(true); + return new LeaseManager() { + + @Override + public void begin(DFSClient client, String src, long inodeId) { + try { + beginFileLeaseMethod.invoke(client, src, null); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public void end(DFSClient client, String src, long inodeId) { + try { + endFileLeaseMethod.invoke(client, src); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } catch (NoSuchMethodException e) { + throw new Error(e); + } + } + + private static PipelineAckStatusGetter createPipelineAckStatusGetter() { + try { + final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList"); + @SuppressWarnings("rawtypes") + Class<? extends Enum> ecnClass; + try { + ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN") + .asSubclass(Enum.class); + } catch (ClassNotFoundException e) { + throw new Error(e); + } + @SuppressWarnings("unchecked") + final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED"); + final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class); + final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass, + Status.class); + final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader", + int.class); + return new PipelineAckStatusGetter() { + + @Override + public Status get(PipelineAckProto ack) { + try { + @SuppressWarnings("unchecked") + List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack); + Integer headerFlag; + if (flagList.isEmpty()) { + Status reply = (Status) getReplyMethod.invoke(ack, 0); + headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply); + } else { + headerFlag = flagList.get(0); + } + return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } catch (NoSuchMethodException e) { + LOG.warn("Can not get expected methods, should be hadoop 2.6-", e); + } + try { + final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class); + return new PipelineAckStatusGetter() { + + @Override + public Status get(PipelineAckProto ack) { + try { + return (Status) getStatusMethod.invoke(ack, 0); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } catch (NoSuchMethodException e) { + throw new Error(e); + } + } + + private static StorageTypeSetter createStorageTypeSetter() { + final Method setStorageTypeMethod; + try { + setStorageTypeMethod = OpWriteBlockProto.Builder.class.getMethod("setStorageType", + StorageTypeProto.class); + } catch (NoSuchMethodException e) { + LOG.warn("noSetStorageType method found, should be hadoop 2.5-", e); + return new StorageTypeSetter() { + + @Override + public Builder set(Builder builder, Enum<?> storageType) { + return builder; + } + }; + } + ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder(); + for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) { + builder.put(storageTypeProto.name(), storageTypeProto); + } + final ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build(); + return new StorageTypeSetter() { + + @Override + public Builder set(Builder builder, Enum<?> storageType) { + Object protoEnum = name2ProtoEnum.get(storageType.name()); + try { + setStorageTypeMethod.invoke(builder, protoEnum); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + throw new RuntimeException(e); + } + return builder; + } + }; + } + + private static FileCreater createFileCreater() { + for (Method method : ClientProtocol.class.getMethods()) { + if (method.getName().equals("create")) { + final Method createMethod = method; + Class<?>[] paramTypes = createMethod.getParameterTypes(); + if (paramTypes[paramTypes.length - 1] == long.class) { + return new FileCreater() { + + @Override + public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked, + String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, + short replication, long blockSize) throws IOException { + try { + return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag, + createParent, replication, blockSize); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + Throwables.propagateIfPossible(e.getTargetException(), IOException.class); + throw new RuntimeException(e); + } + } + }; + } else { + try { + Class<?> cryptoProtocolVersionClass = Class + .forName("org.apache.hadoop.crypto.CryptoProtocolVersion"); + Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported"); + final Object supported = supportedMethod.invoke(null); + return new FileCreater() { + + @Override + public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked, + String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, + short replication, long blockSize) throws IOException { + try { + return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, + flag, createParent, replication, blockSize, supported); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + Throwables.propagateIfPossible(e.getTargetException(), IOException.class); + throw new RuntimeException(e); + } + } + }; + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InvocationTargetException e) { + throw new Error(e); + } + } + } + } + throw new Error("No create method found for " + ClientProtocol.class.getName()); + } + + // cancel the processing if DFSClient is already closed. + static final class CancelOnClose implements CancelableProgressable { + + private final DFSClient client; + + public CancelOnClose(DFSClient client) { + this.client = client; + } + + @Override + public boolean progress() { + return DFS_CLIENT_ADAPTOR.isClientRunning(client); + } + + } + + static { + try { + CREATE_CHECKSUM = DFSClient.Conf.class.getDeclaredMethod("createChecksum"); + CREATE_CHECKSUM.setAccessible(true); + } catch (NoSuchMethodException e) { + throw new Error(e); + } + + PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter(); + STORAGE_TYPE_SETTER = createStorageTypeSetter(); + FILE_CREATER = createFileCreater(); + LEASE_MANAGER = createLeaseManager(); + DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); + } + + static void beginFileLease(DFSClient client, String src, long inodeId) { + LEASE_MANAGER.begin(client, src, inodeId); + } + + static void endFileLease(DFSClient client, String src, long inodeId) { + LEASE_MANAGER.end(client, src, inodeId); + } + + static DataChecksum createChecksum(DFSClient client) { + try { + return (DataChecksum) CREATE_CHECKSUM.invoke(client.getConf()); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + static Status getStatus(PipelineAckProto ack) { + return PIPELINE_ACK_STATUS_GETTER.get(ack); + } + + private static void processWriteBlockResponse(Channel channel, final DatanodeInfo dnInfo, + final Promise<Channel> promise, final int timeoutMs) { + channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), + new ProtobufVarint32FrameDecoder(), + new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()), + new SimpleChannelInboundHandler<BlockOpResponseProto>() { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp) + throws Exception { + Status pipelineStatus = resp.getStatus(); + if (PipelineAck.isRestartOOBStatus(pipelineStatus)) { + throw new IOException("datanode " + dnInfo + " is restarting"); + } + String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink(); + if (resp.getStatus() != Status.SUCCESS) { + if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) { + throw new InvalidBlockTokenException("Got access token error" + ", status message " + + resp.getMessage() + ", " + logInfo); + } else { + throw new IOException("Got error" + ", status=" + resp.getStatus().name() + + ", status message " + resp.getMessage() + ", " + logInfo); + } + } + // success + ChannelPipeline p = ctx.pipeline(); + while (p.first() != null) { + p.removeFirst(); + } + // Disable auto read here. Enable it after we setup the streaming pipeline in + // FanOutOneBLockAsyncDFSOutput. + ctx.channel().config().setAutoRead(false); + promise.trySuccess(ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + promise.tryFailure(new IOException("connection to " + dnInfo + " is closed")); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent + && ((IdleStateEvent) evt).state() == IdleState.READER_IDLE) { + promise + .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); + } else { + super.userEventTriggered(ctx, evt); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + promise.tryFailure(cause); + } + }); + } + + private static void requestWriteBlock(Channel channel, Enum<?> storageType, + OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { + // TODO: SASL negotiation. should be done using a netty Handler. + OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build(); + int protoLen = proto.getSerializedSize(); + ByteBuf buffer = channel.alloc() + .buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); + buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); + buffer.writeByte(Op.WRITE_BLOCK.code); + proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); + channel.writeAndFlush(buffer); + } + + private static List<Future<Channel>> connectToDataNodes(Configuration conf, String clientName, + LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, BlockConstructionStage stage, + DataChecksum summer, EventLoop eventLoop) { + Enum<?>[] storageTypes = locatedBlock.getStorageTypes(); + DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); + boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, + DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); + final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, + HdfsServerConstants.READ_TIMEOUT); + ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); + blockCopy.setNumBytes(locatedBlock.getBlockSize()); + ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() + .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy)) + .setToken(PBHelper.convert(locatedBlock.getBlockToken()))) + .setClientName(clientName).build(); + ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); + final OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder() + .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) + .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()) + .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS) + .setRequestedChecksum(checksumProto) + .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); + List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); + for (int i = 0; i < datanodeInfos.length; i++) { + final DatanodeInfo dnInfo = datanodeInfos[i]; + // Use Enum here because StoregType is moved to another package in hadoop 2.6. Use StorageType + // will cause compilation error for hadoop 2.5 or before. + final Enum<?> storageType = storageTypes[i]; + final Promise<Channel> promise = eventLoop.newPromise(); + futureList.add(promise); + String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); + new Bootstrap().group(eventLoop).channel(NioSocketChannel.class) + .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { + + @Override + protected void initChannel(Channel ch) throws Exception { + processWriteBlockResponse(ch, dnInfo, promise, timeoutMs); + } + }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + requestWriteBlock(future.channel(), storageType, writeBlockProtoBuilder); + } else { + promise.tryFailure(future.cause()); + } + } + }); + } + return futureList; + } + + private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, + boolean overwrite, boolean createParent, short replication, long blockSize, + EventLoop eventLoop) throws IOException { + Configuration conf = dfs.getConf(); + FSUtils fsUtils = FSUtils.getInstance(dfs, conf); + DFSClient client = dfs.getClient(); + String clientName = client.getClientName(); + ClientProtocol namenode = client.getNamenode(); + HdfsFileStatus stat = FILE_CREATER.create(namenode, src, + FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, + new EnumSetWritable<CreateFlag>( + overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), + createParent, replication, blockSize); + beginFileLease(client, src, stat.getFileId()); + boolean succ = false; + LocatedBlock locatedBlock = null; + List<Channel> datanodeList = new ArrayList<>(); + try { + DataChecksum summer = createChecksum(client); + locatedBlock = namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), + null); + for (Future<Channel> future : connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L, + PIPELINE_SETUP_CREATE, summer, eventLoop)) { + // fail the creation if there are connection failures since we are fail-fast. The upper + // layer should retry itself if needed. + datanodeList.add(future.syncUninterruptibly().getNow()); + } + succ = true; + return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src, + stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC); + } finally { + if (!succ) { + for (Channel c : datanodeList) { + c.close(); + } + endFileLease(client, src, stat.getFileId()); + fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client)); + } + } + } + + /** + * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it + * inside {@link EventLoop}. + * @param eventLoop all connections to datanode will use the same event loop. + */ + public static FanOutOneBlockAsyncDFSOutput createOutput(final DistributedFileSystem dfs, Path f, + final boolean overwrite, final boolean createParent, final short replication, + final long blockSize, final EventLoop eventLoop) throws IOException { + return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { + + @Override + public FanOutOneBlockAsyncDFSOutput doCall(Path p) + throws IOException, UnresolvedLinkException { + return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, + blockSize, eventLoop); + } + + @Override + public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException { + throw new UnsupportedOperationException(); + } + }.resolve(dfs, f); + } + + static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, + ExtendedBlock block, long fileId) { + for (int retry = 0;; retry++) { + try { + if (namenode.complete(src, clientName, block, fileId)) { + endFileLease(client, src, fileId); + return; + } else { + LOG.warn("complete file " + src + " not finished, retry = " + retry); + } + } catch (LeaseExpiredException e) { + LOG.warn("lease for file " + src + " is expired, give up", e); + return; + } catch (Exception e) { + LOG.warn("complete file " + src + " failed, retry = " + retry, e); + } + sleepIgnoreInterrupt(retry); + } + } + + static void sleepIgnoreInterrupt(int retry) { + try { + Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); + } catch (InterruptedException e) { + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9d355b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputFlushHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputFlushHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputFlushHandler.java new file mode 100644 index 0000000..cbd0761 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputFlushHandler.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.nio.channels.CompletionHandler; +import java.util.concurrent.ExecutionException; + +public final class FanOutOneBlockAsyncDFSOutputFlushHandler + implements CompletionHandler<Long, Void> { + + private long size; + + private Throwable error; + + private boolean finished; + + @Override + public synchronized void completed(Long result, Void attachment) { + size = result.longValue(); + finished = true; + notifyAll(); + } + + @Override + public synchronized void failed(Throwable exc, Void attachment) { + error = exc; + finished = true; + notifyAll(); + } + + public synchronized long get() throws InterruptedException, ExecutionException { + while (!finished) { + wait(); + } + if (error != null) { + throw new ExecutionException(error); + } + return size; + } + + public void reset() { + size = 0L; + error = null; + finished = false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/6e9d355b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java new file mode 100644 index 0000000..0e9f42e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MiscTests.class, MediumTests.class }) +public class TestFanOutOneBlockAsyncDFSOutput { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static DistributedFileSystem FS; + + private static EventLoopGroup EVENT_LOOP_GROUP; + + private static int READ_TIMEOUT_MS = 2000; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUp() throws Exception { + Logger.getLogger("org.apache.hadoop.hdfs.StateChange").setLevel(Level.DEBUG); + Logger.getLogger("BlockStateChange").setLevel(Level.DEBUG); + TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); + TEST_UTIL.startMiniDFSCluster(3); + FS = TEST_UTIL.getDFSCluster().getFileSystem(); + EVENT_LOOP_GROUP = new NioEventLoopGroup(); + } + + @AfterClass + public static void tearDown() throws IOException, InterruptedException { + if (EVENT_LOOP_GROUP != null) { + EVENT_LOOP_GROUP.shutdownGracefully().sync(); + } + TEST_UTIL.shutdownMiniDFSCluster(); + } + + private void writeAndVerify(EventLoop eventLoop, Path f, final FanOutOneBlockAsyncDFSOutput out) + throws IOException, InterruptedException, ExecutionException { + final byte[] b = new byte[10]; + ThreadLocalRandom.current().nextBytes(b); + final FanOutOneBlockAsyncDFSOutputFlushHandler handler = + new FanOutOneBlockAsyncDFSOutputFlushHandler(); + eventLoop.execute(new Runnable() { + + @Override + public void run() { + out.write(b, 0, b.length); + out.flush(null, handler, false); + } + }); + assertEquals(b.length, handler.get()); + out.close(); + assertEquals(b.length, FS.getFileStatus(f).getLen()); + byte[] actual = new byte[b.length]; + try (FSDataInputStream in = FS.open(f)) { + in.readFully(actual); + } + assertArrayEquals(b, actual); + } + + @Test + public void test() throws IOException, InterruptedException, ExecutionException { + Path f = new Path("/" + name.getMethodName()); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + final FanOutOneBlockAsyncDFSOutput out = + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FS.getDefaultBlockSize(), eventLoop); + writeAndVerify(eventLoop, f, out); + } + + @Test + public void testRecover() throws IOException, InterruptedException, ExecutionException { + Path f = new Path("/" + name.getMethodName()); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + final FanOutOneBlockAsyncDFSOutput out = + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FS.getDefaultBlockSize(), eventLoop); + final byte[] b = new byte[10]; + ThreadLocalRandom.current().nextBytes(b); + final FanOutOneBlockAsyncDFSOutputFlushHandler handler = + new FanOutOneBlockAsyncDFSOutputFlushHandler(); + eventLoop.execute(new Runnable() { + + @Override + public void run() { + out.write(b, 0, b.length); + out.flush(null, handler, false); + } + }); + handler.get(); + // restart one datanode which causes one connection broken + TEST_UTIL.getDFSCluster().restartDataNode(0); + handler.reset(); + eventLoop.execute(new Runnable() { + + @Override + public void run() { + out.write(b, 0, b.length); + out.flush(null, handler, false); + } + }); + try { + handler.get(); + fail("flush should fail"); + } catch (ExecutionException e) { + // we restarted one datanode so the flush should fail + e.printStackTrace(); + } + out.recoverAndClose(null); + assertEquals(b.length, FS.getFileStatus(f).getLen()); + byte[] actual = new byte[b.length]; + try (FSDataInputStream in = FS.open(f)) { + in.readFully(actual); + } + assertArrayEquals(b, actual); + } + + @Test + public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { + Path f = new Path("/" + name.getMethodName()); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + final FanOutOneBlockAsyncDFSOutput out = + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FS.getDefaultBlockSize(), eventLoop); + Thread.sleep(READ_TIMEOUT_MS * 2); + // the connection to datanode should still alive. + writeAndVerify(eventLoop, f, out); + } + + /** + * This is important for fencing when recover from RS crash. + */ + @Test + public void testCreateParentFailed() throws IOException { + Path f = new Path("/" + name.getMethodName() + "/test"); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + try { + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FS.getDefaultBlockSize(), eventLoop); + fail("should fail with parent does not exist"); + } catch (RemoteException e) { + assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException); + } + } +}