http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java deleted file mode 100644 index 2225191..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java +++ /dev/null @@ -1,753 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; -import static io.netty.handler.timeout.IdleState.READER_IDLE; -import static org.apache.hadoop.fs.CreateFlag.CREATE; -import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; -import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; -import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.handler.timeout.IdleStateHandler; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.protobuf.CodedOutputStream; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileSystemLinkResolver; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.UnresolvedLinkException; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.ConnectionUtils; -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DFSOutputStream; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; -import org.apache.hadoop.hdfs.protocol.datatransfer.Op; -import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; -import org.apache.hadoop.io.EnumSetWritable; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.DataChecksum; - -/** - * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. - */ [email protected] -public final class FanOutOneBlockAsyncDFSOutputHelper { - - private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputHelper.class); - - private FanOutOneBlockAsyncDFSOutputHelper() { - } - - // use pooled allocator for performance. - private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT; - - // copied from DFSPacket since it is package private. - public static final long HEART_BEAT_SEQNO = -1L; - - // helper class for creating DataChecksum object. - private static final Method CREATE_CHECKSUM; - - // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a - // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may - // get from proto directly, or combined by the reply field of the proto and a ECN object. See - // createPipelineAckStatusGetter for more details. - private interface PipelineAckStatusGetter { - Status get(PipelineAckProto ack); - } - - private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER; - - // StorageType enum is added in hadoop 2.4, but it is moved to another package in hadoop 2.6 and - // the setter method in OpWriteBlockProto is also added in hadoop 2.6. So we need to skip the - // setStorageType call if it is hadoop 2.5 or before. See createStorageTypeSetter for more - // details. - private interface StorageTypeSetter { - OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType); - } - - private static final StorageTypeSetter STORAGE_TYPE_SETTER; - - // helper class for calling create method on namenode. There is a supportedVersions parameter for - // hadoop 2.6 or after. See createFileCreater for more details. - private interface FileCreater { - HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked, - String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, - short replication, long blockSize) throws IOException; - } - - private static final FileCreater FILE_CREATER; - - // helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and - // hadoop 2.5 or after use inodeId. See createLeaseManager for more details. - private interface LeaseManager { - - void begin(DFSClient client, String src, long inodeId); - - void end(DFSClient client, String src, long inodeId); - } - - private static final LeaseManager LEASE_MANAGER; - - // This is used to terminate a recoverFileLease call when FileSystem is already closed. - // isClientRunning is not public so we need to use reflection. - private interface DFSClientAdaptor { - - boolean isClientRunning(DFSClient client); - } - - private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; - - private static DFSClientAdaptor createDFSClientAdaptor() { - try { - final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); - isClientRunningMethod.setAccessible(true); - return new DFSClientAdaptor() { - - @Override - public boolean isClientRunning(DFSClient client) { - try { - return (Boolean) isClientRunningMethod.invoke(client); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } catch (NoSuchMethodException e) { - throw new Error(e); - } - } - - private static LeaseManager createLeaseManager() { - try { - final Method beginFileLeaseMethod = - DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); - beginFileLeaseMethod.setAccessible(true); - final Method endFileLeaseMethod = - DFSClient.class.getDeclaredMethod("endFileLease", long.class); - endFileLeaseMethod.setAccessible(true); - return new LeaseManager() { - - @Override - public void begin(DFSClient client, String src, long inodeId) { - try { - beginFileLeaseMethod.invoke(client, inodeId, null); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public void end(DFSClient client, String src, long inodeId) { - try { - endFileLeaseMethod.invoke(client, inodeId); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } catch (NoSuchMethodException e) { - LOG.warn("No inodeId related lease methods found, should be hadoop 2.4-", e); - } - try { - final Method beginFileLeaseMethod = - DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class); - beginFileLeaseMethod.setAccessible(true); - final Method endFileLeaseMethod = - DFSClient.class.getDeclaredMethod("endFileLease", String.class); - endFileLeaseMethod.setAccessible(true); - return new LeaseManager() { - - @Override - public void begin(DFSClient client, String src, long inodeId) { - try { - beginFileLeaseMethod.invoke(client, src, null); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public void end(DFSClient client, String src, long inodeId) { - try { - endFileLeaseMethod.invoke(client, src); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } catch (NoSuchMethodException e) { - throw new Error(e); - } - } - - private static PipelineAckStatusGetter createPipelineAckStatusGetter() { - try { - final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList"); - @SuppressWarnings("rawtypes") - Class<? extends Enum> ecnClass; - try { - ecnClass = - Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN") - .asSubclass(Enum.class); - } catch (ClassNotFoundException e) { - throw new Error(e); - } - @SuppressWarnings("unchecked") - final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED"); - final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class); - final Method combineHeaderMethod = - PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class); - final Method getStatusFromHeaderMethod = - PipelineAck.class.getMethod("getStatusFromHeader", int.class); - return new PipelineAckStatusGetter() { - - @Override - public Status get(PipelineAckProto ack) { - try { - @SuppressWarnings("unchecked") - List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack); - Integer headerFlag; - if (flagList.isEmpty()) { - Status reply = (Status) getReplyMethod.invoke(ack, 0); - headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply); - } else { - headerFlag = flagList.get(0); - } - return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } catch (NoSuchMethodException e) { - LOG.warn("Can not get expected methods, should be hadoop 2.6-", e); - } - try { - final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class); - return new PipelineAckStatusGetter() { - - @Override - public Status get(PipelineAckProto ack) { - try { - return (Status) getStatusMethod.invoke(ack, 0); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } catch (NoSuchMethodException e) { - throw new Error(e); - } - } - - private static StorageTypeSetter createStorageTypeSetter() { - final Method setStorageTypeMethod; - try { - setStorageTypeMethod = - OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class); - } catch (NoSuchMethodException e) { - LOG.warn("noSetStorageType method found, should be hadoop 2.5-", e); - return new StorageTypeSetter() { - - @Override - public Builder set(Builder builder, Enum<?> storageType) { - return builder; - } - }; - } - ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder(); - for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) { - builder.put(storageTypeProto.name(), storageTypeProto); - } - final ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build(); - return new StorageTypeSetter() { - - @Override - public Builder set(Builder builder, Enum<?> storageType) { - Object protoEnum = name2ProtoEnum.get(storageType.name()); - try { - setStorageTypeMethod.invoke(builder, protoEnum); - } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - throw new RuntimeException(e); - } - return builder; - } - }; - } - - private static FileCreater createFileCreater() { - for (Method method : ClientProtocol.class.getMethods()) { - if (method.getName().equals("create")) { - final Method createMethod = method; - Class<?>[] paramTypes = createMethod.getParameterTypes(); - if (paramTypes[paramTypes.length - 1] == long.class) { - return new FileCreater() { - - @Override - public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked, - String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, - short replication, long blockSize) throws IOException { - try { - return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, - flag, createParent, replication, blockSize); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - Throwables.propagateIfPossible(e.getTargetException(), IOException.class); - throw new RuntimeException(e); - } - } - }; - } else { - try { - Class<?> cryptoProtocolVersionClass = - Class.forName("org.apache.hadoop.crypto.CryptoProtocolVersion"); - Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported"); - final Object supported = supportedMethod.invoke(null); - return new FileCreater() { - - @Override - public HdfsFileStatus create(ClientProtocol namenode, String src, - FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, - boolean createParent, short replication, long blockSize) throws IOException { - try { - return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, - flag, createParent, replication, blockSize, supported); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - Throwables.propagateIfPossible(e.getTargetException(), IOException.class); - throw new RuntimeException(e); - } - } - }; - } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException - | InvocationTargetException e) { - throw new Error(e); - } - } - } - } - throw new Error("No create method found for " + ClientProtocol.class.getName()); - } - - // cancel the processing if DFSClient is already closed. - static final class CancelOnClose implements CancelableProgressable { - - private final DFSClient client; - - public CancelOnClose(DFSClient client) { - this.client = client; - } - - @Override - public boolean progress() { - return DFS_CLIENT_ADAPTOR.isClientRunning(client); - } - } - - static { - try { - CREATE_CHECKSUM = DFSClient.Conf.class.getDeclaredMethod("createChecksum"); - CREATE_CHECKSUM.setAccessible(true); - } catch (NoSuchMethodException e) { - throw new Error(e); - } - - PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter(); - STORAGE_TYPE_SETTER = createStorageTypeSetter(); - FILE_CREATER = createFileCreater(); - LEASE_MANAGER = createLeaseManager(); - DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); - } - - static void beginFileLease(DFSClient client, String src, long inodeId) { - LEASE_MANAGER.begin(client, src, inodeId); - } - - static void endFileLease(DFSClient client, String src, long inodeId) { - LEASE_MANAGER.end(client, src, inodeId); - } - - static DataChecksum createChecksum(DFSClient client) { - try { - return (DataChecksum) CREATE_CHECKSUM.invoke(client.getConf()); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - static Status getStatus(PipelineAckProto ack) { - return PIPELINE_ACK_STATUS_GETTER.get(ack); - } - - private static void processWriteBlockResponse(Channel channel, final DatanodeInfo dnInfo, - final Promise<Channel> promise, final int timeoutMs) { - channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), - new ProtobufVarint32FrameDecoder(), - new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()), - new SimpleChannelInboundHandler<BlockOpResponseProto>() { - - @Override - protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp) - throws Exception { - Status pipelineStatus = resp.getStatus(); - if (PipelineAck.isRestartOOBStatus(pipelineStatus)) { - throw new IOException("datanode " + dnInfo + " is restarting"); - } - String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink(); - if (resp.getStatus() != Status.SUCCESS) { - if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) { - throw new InvalidBlockTokenException("Got access token error" + ", status message " - + resp.getMessage() + ", " + logInfo); - } else { - throw new IOException("Got error" + ", status=" + resp.getStatus().name() - + ", status message " + resp.getMessage() + ", " + logInfo); - } - } - // success - ChannelPipeline p = ctx.pipeline(); - for (ChannelHandler handler; (handler = p.removeLast()) != null;) { - // do not remove all handlers because we may have wrap or unwrap handlers at the header - // of pipeline. - if (handler instanceof IdleStateHandler) { - break; - } - } - // Disable auto read here. Enable it after we setup the streaming pipeline in - // FanOutOneBLockAsyncDFSOutput. - ctx.channel().config().setAutoRead(false); - promise.trySuccess(ctx.channel()); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - promise.tryFailure(new IOException("connection to " + dnInfo + " is closed")); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { - promise - .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); - } else { - super.userEventTriggered(ctx, evt); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - promise.tryFailure(cause); - } - }); - } - - private static void requestWriteBlock(Channel channel, Enum<?> storageType, - OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { - OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build(); - int protoLen = proto.getSerializedSize(); - ByteBuf buffer = - channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); - buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); - buffer.writeByte(Op.WRITE_BLOCK.code); - proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); - channel.writeAndFlush(buffer); - } - - private static void initialize(Configuration conf, final Channel channel, - final DatanodeInfo dnInfo, final Enum<?> storageType, - final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, - DFSClient client, Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) { - Promise<Void> saslPromise = channel.eventLoop().newPromise(); - trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); - saslPromise.addListener(new FutureListener<Void>() { - - @Override - public void operationComplete(Future<Void> future) throws Exception { - if (future.isSuccess()) { - // setup response processing pipeline first, then send request. - processWriteBlockResponse(channel, dnInfo, promise, timeoutMs); - requestWriteBlock(channel, storageType, writeBlockProtoBuilder); - } else { - promise.tryFailure(future.cause()); - } - } - }); - } - - private static List<Future<Channel>> connectToDataNodes(final Configuration conf, - final DFSClient client, String clientName, final LocatedBlock locatedBlock, - long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer, - EventLoop eventLoop) { - Enum<?>[] storageTypes = locatedBlock.getStorageTypes(); - DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); - boolean connectToDnViaHostname = - conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); - final int timeoutMs = - conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT); - ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); - blockCopy.setNumBytes(locatedBlock.getBlockSize()); - ClientOperationHeaderProto header = - ClientOperationHeaderProto - .newBuilder() - .setBaseHeader( - BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy)) - .setToken(PBHelper.convert(locatedBlock.getBlockToken()))) - .setClientName(clientName).build(); - ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); - final OpWriteBlockProto.Builder writeBlockProtoBuilder = - OpWriteBlockProto.newBuilder().setHeader(header) - .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) - .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()) - .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS) - .setRequestedChecksum(checksumProto) - .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); - List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); - for (int i = 0; i < datanodeInfos.length; i++) { - final DatanodeInfo dnInfo = datanodeInfos[i]; - // Use Enum here because StoregType is moved to another package in hadoop 2.6. Use StorageType - // will cause compilation error for hadoop 2.5 or before. - final Enum<?> storageType = storageTypes[i]; - final Promise<Channel> promise = eventLoop.newPromise(); - futureList.add(promise); - String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname); - new Bootstrap().group(eventLoop).channel(NioSocketChannel.class) - .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() { - - @Override - protected void initChannel(Channel ch) throws Exception { - // we need to get the remote address of the channel so we can only move on after - // channel connected. Leave an empty implementation here because netty does not allow - // a null handler. - } - }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, - timeoutMs, client, locatedBlock.getBlockToken(), promise); - } else { - promise.tryFailure(future.cause()); - } - } - }); - } - return futureList; - } - - /** - * Exception other than RemoteException thrown when calling create on namenode - */ - public static class NameNodeException extends IOException { - - private static final long serialVersionUID = 3143237406477095390L; - - public NameNodeException(Throwable cause) { - super(cause); - } - } - - private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, - boolean overwrite, boolean createParent, short replication, long blockSize, - EventLoop eventLoop) throws IOException { - Configuration conf = dfs.getConf(); - FSUtils fsUtils = FSUtils.getInstance(dfs, conf); - DFSClient client = dfs.getClient(); - String clientName = client.getClientName(); - ClientProtocol namenode = client.getNamenode(); - HdfsFileStatus stat; - try { - stat = - FILE_CREATER.create( - namenode, - src, - FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), - clientName, - new EnumSetWritable<CreateFlag>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet - .of(CREATE)), createParent, replication, blockSize); - } catch (Exception e) { - if (e instanceof RemoteException) { - throw (RemoteException) e; - } else { - throw new NameNodeException(e); - } - } - beginFileLease(client, src, stat.getFileId()); - boolean succ = false; - LocatedBlock locatedBlock = null; - List<Future<Channel>> futureList = null; - try { - DataChecksum summer = createChecksum(client); - locatedBlock = - namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), null); - List<Channel> datanodeList = new ArrayList<>(); - futureList = - connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE, - summer, eventLoop); - for (Future<Channel> future : futureList) { - // fail the creation if there are connection failures since we are fail-fast. The upper - // layer should retry itself if needed. - datanodeList.add(future.syncUninterruptibly().getNow()); - } - succ = true; - return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, - src, stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC); - } finally { - if (!succ) { - if (futureList != null) { - for (Future<Channel> f : futureList) { - f.addListener(new FutureListener<Channel>() { - - @Override - public void operationComplete(Future<Channel> future) throws Exception { - if (future.isSuccess()) { - future.getNow().close(); - } - } - }); - } - } - endFileLease(client, src, stat.getFileId()); - fsUtils.recoverFileLease(dfs, new Path(src), conf, new CancelOnClose(client)); - } - } - } - - /** - * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it - * inside {@link EventLoop}. - * @param eventLoop all connections to datanode will use the same event loop. - */ - public static FanOutOneBlockAsyncDFSOutput createOutput(final DistributedFileSystem dfs, Path f, - final boolean overwrite, final boolean createParent, final short replication, - final long blockSize, final EventLoop eventLoop) throws IOException { - return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { - - @Override - public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, - UnresolvedLinkException { - return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, - blockSize, eventLoop); - } - - @Override - public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException { - throw new UnsupportedOperationException(); - } - }.resolve(dfs, f); - } - - public static boolean shouldRetryCreate(RemoteException e) { - // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name. - // For exceptions other than this, we just throw it out. This is same with - // DFSOutputStream.newStreamForCreate. - return e.getClassName().endsWith("RetryStartFileException"); - } - - static void completeFile(DFSClient client, ClientProtocol namenode, String src, - String clientName, ExtendedBlock block, long fileId) { - for (int retry = 0;; retry++) { - try { - if (namenode.complete(src, clientName, block, fileId)) { - endFileLease(client, src, fileId); - return; - } else { - LOG.warn("complete file " + src + " not finished, retry = " + retry); - } - } catch (LeaseExpiredException e) { - LOG.warn("lease for file " + src + " is expired, give up", e); - return; - } catch (Exception e) { - LOG.warn("complete file " + src + " failed, retry = " + retry, e); - } - sleepIgnoreInterrupt(retry); - } - } - - static void sleepIgnoreInterrupt(int retry) { - try { - Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); - } catch (InterruptedException e) { - } - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java deleted file mode 100644 index 341d4ec..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java +++ /dev/null @@ -1,1032 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import static io.netty.handler.timeout.IdleState.READER_IDLE; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.codec.MessageToByteEncoder; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.handler.timeout.IdleStateHandler; -import io.netty.util.concurrent.Promise; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.CallbackHandler; -import javax.security.auth.callback.NameCallback; -import javax.security.auth.callback.PasswordCallback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.sasl.RealmCallback; -import javax.security.sasl.RealmChoiceCallback; -import javax.security.sasl.Sasl; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; - -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; -import com.google.protobuf.CodedOutputStream; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; -import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; -import org.apache.hadoop.security.SaslPropertiesResolver; -import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; - -/** - * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}. - */ [email protected] -public final class FanOutOneBlockAsyncDFSOutputSaslHelper { - - private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputSaslHelper.class); - - private FanOutOneBlockAsyncDFSOutputSaslHelper() { - } - - private static final String SERVER_NAME = "0"; - private static final String PROTOCOL = "hdfs"; - private static final String MECHANISM = "DIGEST-MD5"; - private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; - private static final String NAME_DELIMITER = " "; - private static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = - "dfs.encrypt.data.transfer.cipher.suites"; - private static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding"; - - private interface SaslAdaptor { - - SaslPropertiesResolver getSaslPropsResolver(DFSClient client); - - TrustedChannelResolver getTrustedChannelResolver(DFSClient client); - - AtomicBoolean getFallbackToSimpleAuth(DFSClient client); - - DataEncryptionKey createDataEncryptionKey(DFSClient client); - } - - private static final SaslAdaptor SASL_ADAPTOR; - - private interface CipherHelper { - - List<Object> getCipherOptions(Configuration conf) throws IOException; - - void addCipherOptions(DataTransferEncryptorMessageProto.Builder builder, - List<Object> cipherOptions); - - Object getCipherOption(DataTransferEncryptorMessageProto proto, boolean isNegotiatedQopPrivacy, - SaslClient saslClient) throws IOException; - - Object getCipherSuite(Object cipherOption); - - byte[] getInKey(Object cipherOption); - - byte[] getInIv(Object cipherOption); - - byte[] getOutKey(Object cipherOption); - - byte[] getOutIv(Object cipherOption); - } - - private static final CipherHelper CIPHER_HELPER; - - private static final class CryptoCodec { - - private static final Method CREATE_CODEC; - - private static final Method CREATE_ENCRYPTOR; - - private static final Method CREATE_DECRYPTOR; - - private static final Method INIT_ENCRYPTOR; - - private static final Method INIT_DECRYPTOR; - - private static final Method ENCRYPT; - - private static final Method DECRYPT; - - static { - Class<?> cryptoCodecClass = null; - try { - cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec"); - } catch (ClassNotFoundException e) { - LOG.warn("No CryptoCodec class found, should be hadoop 2.5-", e); - } - if (cryptoCodecClass != null) { - Method getInstanceMethod = null; - for (Method method : cryptoCodecClass.getMethods()) { - if (method.getName().equals("getInstance") && method.getParameterTypes().length == 2) { - getInstanceMethod = method; - break; - } - } - CREATE_CODEC = getInstanceMethod; - try { - CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor"); - CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor"); - - Class<?> encryptorClass = Class.forName("org.apache.hadoop.crypto.Encryptor"); - INIT_ENCRYPTOR = encryptorClass.getMethod("init"); - ENCRYPT = encryptorClass.getMethod("encrypt", ByteBuffer.class, ByteBuffer.class); - - Class<?> decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor"); - INIT_DECRYPTOR = decryptorClass.getMethod("init"); - DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class); - } catch (NoSuchMethodException | ClassNotFoundException e) { - throw new Error(e); - } - } else { - LOG.warn("Can not initialize CryptoCodec, should be hadoop 2.5-"); - CREATE_CODEC = null; - CREATE_ENCRYPTOR = null; - CREATE_DECRYPTOR = null; - INIT_ENCRYPTOR = null; - INIT_DECRYPTOR = null; - ENCRYPT = null; - DECRYPT = null; - } - } - - private final Object encryptor; - - private final Object decryptor; - - public CryptoCodec(Configuration conf, Object cipherOption) { - Object codec; - try { - codec = CREATE_CODEC.invoke(null, conf, CIPHER_HELPER.getCipherSuite(cipherOption)); - encryptor = CREATE_ENCRYPTOR.invoke(codec); - byte[] encKey = CIPHER_HELPER.getInKey(cipherOption); - byte[] encIv = CIPHER_HELPER.getInIv(cipherOption); - INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, encIv.length)); - - decryptor = CREATE_DECRYPTOR.invoke(codec); - byte[] decKey = CIPHER_HELPER.getOutKey(cipherOption); - byte[] decIv = CIPHER_HELPER.getOutIv(cipherOption); - INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, decIv.length)); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) { - try { - ENCRYPT.invoke(encryptor, inBuffer, outBuffer); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) { - try { - DECRYPT.invoke(decryptor, inBuffer, outBuffer); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - } - - private static SaslAdaptor createSaslAdaptor27(Class<?> saslDataTransferClientClass) - throws NoSuchFieldException, NoSuchMethodException { - final Field saslPropsResolverField = - saslDataTransferClientClass.getDeclaredField("saslPropsResolver"); - saslPropsResolverField.setAccessible(true); - final Field trustedChannelResolverField = - saslDataTransferClientClass.getDeclaredField("trustedChannelResolver"); - trustedChannelResolverField.setAccessible(true); - final Field fallbackToSimpleAuthField = - saslDataTransferClientClass.getDeclaredField("fallbackToSimpleAuth"); - fallbackToSimpleAuthField.setAccessible(true); - final Method getSaslDataTransferClientMethod = - DFSClient.class.getMethod("getSaslDataTransferClient"); - final Method newDataEncryptionKeyMethod = DFSClient.class.getMethod("newDataEncryptionKey"); - return new SaslAdaptor() { - - @Override - public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) { - try { - return (TrustedChannelResolver) trustedChannelResolverField - .get(getSaslDataTransferClientMethod.invoke(client)); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) { - try { - return (SaslPropertiesResolver) saslPropsResolverField - .get(getSaslDataTransferClientMethod.invoke(client)); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { - try { - return (AtomicBoolean) fallbackToSimpleAuthField.get(getSaslDataTransferClientMethod - .invoke(client)); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public DataEncryptionKey createDataEncryptionKey(DFSClient client) { - try { - return (DataEncryptionKey) newDataEncryptionKeyMethod.invoke(client); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - - private static SaslAdaptor createSaslAdaptor25() { - try { - final Field trustedChannelResolverField = - DFSClient.class.getDeclaredField("trustedChannelResolver"); - trustedChannelResolverField.setAccessible(true); - final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey"); - return new SaslAdaptor() { - - @Override - public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) { - try { - return (TrustedChannelResolver) trustedChannelResolverField.get(client); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } - - @Override - public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) { - return null; - } - - @Override - public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { - return null; - } - - @Override - public DataEncryptionKey createDataEncryptionKey(DFSClient client) { - try { - return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } catch (NoSuchFieldException | NoSuchMethodException e) { - throw new Error(e); - } - - } - - private static SaslAdaptor createSaslAdaptor() { - Class<?> saslDataTransferClientClass = null; - try { - saslDataTransferClientClass = - Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"); - } catch (ClassNotFoundException e) { - LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-"); - } - try { - return saslDataTransferClientClass != null ? createSaslAdaptor27(saslDataTransferClientClass) - : createSaslAdaptor25(); - } catch (NoSuchFieldException | NoSuchMethodException e) { - throw new Error(e); - } - } - - private static CipherHelper createCipherHelper25() { - return new CipherHelper() { - - @Override - public byte[] getOutKey(Object cipherOption) { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] getOutIv(Object cipherOption) { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] getInKey(Object cipherOption) { - throw new UnsupportedOperationException(); - } - - @Override - public byte[] getInIv(Object cipherOption) { - throw new UnsupportedOperationException(); - } - - @Override - public Object getCipherSuite(Object cipherOption) { - throw new UnsupportedOperationException(); - } - - @Override - public List<Object> getCipherOptions(Configuration conf) { - return null; - } - - @Override - public Object getCipherOption(DataTransferEncryptorMessageProto proto, - boolean isNegotiatedQopPrivacy, SaslClient saslClient) { - return null; - } - - @Override - public void addCipherOptions(Builder builder, List<Object> cipherOptions) { - throw new UnsupportedOperationException(); - } - }; - } - - private static CipherHelper createCipherHelper27(Class<?> cipherOptionClass) - throws ClassNotFoundException, NoSuchMethodException { - @SuppressWarnings("rawtypes") - Class<? extends Enum> cipherSuiteClass = - Class.forName("org.apache.hadoop.crypto.CipherSuite").asSubclass(Enum.class); - @SuppressWarnings("unchecked") - final Enum<?> aesCipherSuite = Enum.valueOf(cipherSuiteClass, "AES_CTR_NOPADDING"); - final Constructor<?> cipherOptionConstructor = - cipherOptionClass.getConstructor(cipherSuiteClass); - final Constructor<?> cipherOptionWithKeyAndIvConstructor = - cipherOptionClass.getConstructor(cipherSuiteClass, byte[].class, byte[].class, - byte[].class, byte[].class); - - final Method getCipherSuiteMethod = cipherOptionClass.getMethod("getCipherSuite"); - final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey"); - final Method getInIvMethod = cipherOptionClass.getMethod("getInIv"); - final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey"); - final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv"); - - final Method convertCipherOptionsMethod = - PBHelper.class.getMethod("convertCipherOptions", List.class); - final Method convertCipherOptionProtosMethod = - PBHelper.class.getMethod("convertCipherOptionProtos", List.class); - final Method addAllCipherOptionMethod = - DataTransferEncryptorMessageProto.Builder.class.getMethod("addAllCipherOption", - Iterable.class); - final Method getCipherOptionListMethod = - DataTransferEncryptorMessageProto.class.getMethod("getCipherOptionList"); - return new CipherHelper() { - - @Override - public byte[] getOutKey(Object cipherOption) { - try { - return (byte[]) getOutKeyMethod.invoke(cipherOption); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public byte[] getOutIv(Object cipherOption) { - try { - return (byte[]) getOutIvMethod.invoke(cipherOption); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public byte[] getInKey(Object cipherOption) { - try { - return (byte[]) getInKeyMethod.invoke(cipherOption); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public byte[] getInIv(Object cipherOption) { - try { - return (byte[]) getInIvMethod.invoke(cipherOption); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public Object getCipherSuite(Object cipherOption) { - try { - return getCipherSuiteMethod.invoke(cipherOption); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @Override - public List<Object> getCipherOptions(Configuration conf) throws IOException { - // Negotiate cipher suites if configured. Currently, the only supported - // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple - // values for future expansion. - String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); - if (cipherSuites == null || cipherSuites.isEmpty()) { - return null; - } - if (!cipherSuites.equals(AES_CTR_NOPADDING)) { - throw new IOException(String.format("Invalid cipher suite, %s=%s", - DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); - } - Object option; - try { - option = cipherOptionConstructor.newInstance(aesCipherSuite); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - List<Object> cipherOptions = Lists.newArrayListWithCapacity(1); - cipherOptions.add(option); - return cipherOptions; - } - - private Object unwrap(Object option, SaslClient saslClient) throws IOException { - byte[] inKey = getInKey(option); - if (inKey != null) { - inKey = saslClient.unwrap(inKey, 0, inKey.length); - } - byte[] outKey = getOutKey(option); - if (outKey != null) { - outKey = saslClient.unwrap(outKey, 0, outKey.length); - } - try { - return cipherOptionWithKeyAndIvConstructor.newInstance(getCipherSuite(option), inKey, - getInIv(option), outKey, getOutIv(option)); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - @SuppressWarnings("unchecked") - @Override - public Object getCipherOption(DataTransferEncryptorMessageProto proto, - boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { - List<Object> cipherOptions; - try { - cipherOptions = - (List<Object>) convertCipherOptionProtosMethod.invoke(null, - getCipherOptionListMethod.invoke(proto)); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - if (cipherOptions == null || cipherOptions.isEmpty()) { - return null; - } - Object cipherOption = cipherOptions.get(0); - return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; - } - - @Override - public void addCipherOptions(Builder builder, List<Object> cipherOptions) { - try { - addAllCipherOptionMethod.invoke(builder, - convertCipherOptionsMethod.invoke(null, cipherOptions)); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - - private static CipherHelper createCipherHelper() { - Class<?> cipherOptionClass; - try { - cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption"); - } catch (ClassNotFoundException e) { - LOG.warn("No CipherOption class found, should be hadoop 2.5-"); - return createCipherHelper25(); - } - try { - return createCipherHelper27(cipherOptionClass); - } catch (NoSuchMethodException | ClassNotFoundException e) { - throw new Error(e); - } - } - - static { - SASL_ADAPTOR = createSaslAdaptor(); - CIPHER_HELPER = createCipherHelper(); - } - - /** - * Sets user name and password when asked by the client-side SASL object. - */ - private static final class SaslClientCallbackHandler implements CallbackHandler { - - private final char[] password; - private final String userName; - - /** - * Creates a new SaslClientCallbackHandler. - * @param userName SASL user name - * @Param password SASL password - */ - public SaslClientCallbackHandler(String userName, char[] password) { - this.password = password; - this.userName = userName; - } - - @Override - public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { - NameCallback nc = null; - PasswordCallback pc = null; - RealmCallback rc = null; - for (Callback callback : callbacks) { - if (callback instanceof RealmChoiceCallback) { - continue; - } else if (callback instanceof NameCallback) { - nc = (NameCallback) callback; - } else if (callback instanceof PasswordCallback) { - pc = (PasswordCallback) callback; - } else if (callback instanceof RealmCallback) { - rc = (RealmCallback) callback; - } else { - throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); - } - } - if (nc != null) { - nc.setName(userName); - } - if (pc != null) { - pc.setPassword(password); - } - if (rc != null) { - rc.setText(rc.getDefaultText()); - } - } - } - - private static final class SaslNegotiateHandler extends ChannelDuplexHandler { - - private final Configuration conf; - - private final Map<String, String> saslProps; - - private final SaslClient saslClient; - - private final int timeoutMs; - - private final Promise<Void> promise; - - private int step = 0; - - public SaslNegotiateHandler(Configuration conf, String username, char[] password, - Map<String, String> saslProps, int timeoutMs, Promise<Void> promise) throws SaslException { - this.conf = conf; - this.saslProps = saslProps; - this.saslClient = - Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, SERVER_NAME, - saslProps, new SaslClientCallbackHandler(username, password)); - this.timeoutMs = timeoutMs; - this.promise = promise; - } - - private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { - sendSaslMessage(ctx, payload, null); - } - - private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List<Object> options) - throws IOException { - DataTransferEncryptorMessageProto.Builder builder = - DataTransferEncryptorMessageProto.newBuilder(); - builder.setStatus(DataTransferEncryptorStatus.SUCCESS); - if (payload != null) { - builder.setPayload(ByteString.copyFrom(payload)); - } - if (options != null) { - CIPHER_HELPER.addCipherOptions(builder, options); - } - DataTransferEncryptorMessageProto proto = builder.build(); - int size = proto.getSerializedSize(); - size += CodedOutputStream.computeRawVarint32Size(size); - ByteBuf buf = ctx.alloc().buffer(size); - proto.writeDelimitedTo(new ByteBufOutputStream(buf)); - ctx.write(buf); - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); - sendSaslMessage(ctx, new byte[0]); - ctx.flush(); - step++; - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - saslClient.dispose(); - } - - private void check(DataTransferEncryptorMessageProto proto) throws IOException { - if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { - throw new InvalidEncryptionKeyException(proto.getMessage()); - } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { - throw new IOException(proto.getMessage()); - } - } - - private String getNegotiatedQop() { - return (String) saslClient.getNegotiatedProperty(Sasl.QOP); - } - - private boolean isNegotiatedQopPrivacy() { - String qop = getNegotiatedQop(); - return qop != null && "auth-conf".equalsIgnoreCase(qop); - } - - private boolean requestedQopContainsPrivacy() { - Set<String> requestedQop = - ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); - return requestedQop.contains("auth-conf"); - } - - private void checkSaslComplete() throws IOException { - if (!saslClient.isComplete()) { - throw new IOException("Failed to complete SASL handshake"); - } - Set<String> requestedQop = - ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); - String negotiatedQop = getNegotiatedQop(); - LOG.debug("Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " - + negotiatedQop); - if (!requestedQop.contains(negotiatedQop)) { - throw new IOException(String.format("SASL handshake completed, but " - + "channel does not have acceptable quality of protection, " - + "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); - } - } - - private boolean useWrap() { - String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); - return qop != null && !"auth".equalsIgnoreCase(qop); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { - if (msg instanceof DataTransferEncryptorMessageProto) { - DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; - check(proto); - byte[] challenge = proto.getPayload().toByteArray(); - byte[] response = saslClient.evaluateChallenge(challenge); - switch (step) { - case 1: { - List<Object> cipherOptions = null; - if (requestedQopContainsPrivacy()) { - cipherOptions = CIPHER_HELPER.getCipherOptions(conf); - } - sendSaslMessage(ctx, response, cipherOptions); - ctx.flush(); - step++; - break; - } - case 2: { - assert response == null; - checkSaslComplete(); - Object cipherOption = - CIPHER_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); - ChannelPipeline p = ctx.pipeline(); - while (p.first() != null) { - p.removeFirst(); - } - if (cipherOption != null) { - CryptoCodec codec = new CryptoCodec(conf, cipherOption); - p.addLast(new EncryptHandler(codec), new DecryptHandler(codec)); - } else { - if (useWrap()) { - p.addLast(new SaslWrapHandler(saslClient), new LengthFieldBasedFrameDecoder( - Integer.MAX_VALUE, 0, 4), new SaslUnwrapHandler(saslClient)); - } - } - promise.trySuccess(null); - break; - } - default: - throw new IllegalArgumentException("Unrecognized negotiation step: " + step); - } - } else { - ctx.fireChannelRead(msg); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - promise.tryFailure(cause); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { - promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); - } else { - super.userEventTriggered(ctx, evt); - } - } - } - - private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { - - private final SaslClient saslClient; - - public SaslUnwrapHandler(SaslClient saslClient) { - this.saslClient = saslClient; - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - saslClient.dispose(); - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - msg.skipBytes(4); - byte[] b = new byte[msg.readableBytes()]; - msg.readBytes(b); - ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); - } - } - - private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { - - private final SaslClient saslClient; - - private CompositeByteBuf cBuf; - - public SaslWrapHandler(SaslClient saslClient) { - this.saslClient = saslClient; - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); - } - - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) - throws Exception { - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - cBuf.addComponent(buf); - cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); - } else { - ctx.write(msg); - } - } - - @Override - public void flush(ChannelHandlerContext ctx) throws Exception { - if (cBuf.isReadable()) { - byte[] b = new byte[cBuf.readableBytes()]; - cBuf.readBytes(b); - cBuf.discardReadComponents(); - byte[] wrapped = saslClient.wrap(b, 0, b.length); - ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); - buf.writeInt(wrapped.length); - buf.writeBytes(wrapped); - ctx.write(buf); - } - ctx.flush(); - } - - @Override - public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - cBuf.release(); - cBuf = null; - } - } - - private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> { - - private final CryptoCodec codec; - - public DecryptHandler(CryptoCodec codec) { - this.codec = codec; - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - ByteBuf inBuf; - boolean release = false; - if (msg.nioBufferCount() == 1) { - inBuf = msg; - } else { - inBuf = ctx.alloc().directBuffer(msg.readableBytes()); - msg.readBytes(inBuf); - release = true; - } - ByteBuffer inBuffer = inBuf.nioBuffer(); - ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); - ByteBuffer outBuffer = outBuf.nioBuffer(); - codec.decrypt(inBuffer, outBuffer); - outBuf.writerIndex(inBuf.readableBytes()); - if (release) { - inBuf.release(); - } - ctx.fireChannelRead(outBuf); - } - } - - private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> { - - private final CryptoCodec codec; - - public EncryptHandler(CryptoCodec codec) { - super(false); - this.codec = codec; - } - - @Override - protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) - throws Exception { - if (preferDirect) { - return ctx.alloc().directBuffer(msg.readableBytes()); - } else { - return ctx.alloc().buffer(msg.readableBytes()); - } - } - - @Override - protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { - ByteBuf inBuf; - boolean release = false; - if (msg.nioBufferCount() == 1) { - inBuf = msg; - } else { - inBuf = ctx.alloc().directBuffer(msg.readableBytes()); - msg.readBytes(inBuf); - release = true; - } - ByteBuffer inBuffer = inBuf.nioBuffer(); - ByteBuffer outBuffer = out.nioBuffer(); - codec.encrypt(inBuffer, outBuffer); - out.writerIndex(inBuf.readableBytes()); - if (release) { - inBuf.release(); - } - } - } - - private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { - return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER - + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); - } - - private static char[] encryptionKeyToPassword(byte[] encryptionKey) { - return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray(); - } - - private static String buildUsername(Token<BlockTokenIdentifier> blockToken) { - return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8); - } - - private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { - return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8) - .toCharArray(); - } - - private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) { - Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); - saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); - saslProps.put(Sasl.SERVER_AUTH, "true"); - saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); - return saslProps; - } - - private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, - String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise) { - try { - channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), - new ProtobufVarint32FrameDecoder(), - new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), - new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise)); - } catch (SaslException e) { - saslPromise.tryFailure(e); - } - } - - static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, - int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, - Promise<Void> saslPromise) { - SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(client); - TrustedChannelResolver trustedChannelResolver = SASL_ADAPTOR.getTrustedChannelResolver(client); - AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(client); - InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); - if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { - saslPromise.trySuccess(null); - return; - } - DataEncryptionKey encryptionKey; - try { - encryptionKey = SASL_ADAPTOR.createDataEncryptionKey(client); - } catch (Exception e) { - saslPromise.tryFailure(e); - return; - } - if (encryptionKey != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " - + dnInfo); - } - doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), - encryptionKeyToPassword(encryptionKey.encryptionKey), - createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise); - } else if (!UserGroupInformation.isSecurityEnabled()) { - if (LOG.isDebugEnabled()) { - LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr - + ", datanodeId = " + dnInfo); - } - saslPromise.trySuccess(null); - } else if (dnInfo.getXferPort() < 1024) { - if (LOG.isDebugEnabled()) { - LOG.debug("SASL client skipping handshake in secured configuration with " - + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); - } - saslPromise.trySuccess(null); - } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { - if (LOG.isDebugEnabled()) { - LOG.debug("SASL client skipping handshake in secured configuration with " - + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); - } - saslPromise.trySuccess(null); - } else if (saslPropsResolver != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("SASL client doing general handshake for addr = " + addr + ", datanodeId = " - + dnInfo); - } - doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), - buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise); - } else { - // It's a secured cluster using non-privileged ports, but no SASL. The only way this can - // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare - // edge case. - if (LOG.isDebugEnabled()) { - LOG.debug("SASL client skipping handshake in secured configuration with no SASL " - + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); - } - saslPromise.trySuccess(null); - } - } - -} http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java new file mode 100644 index 0000000..58b5301 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputFlushHandler.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import java.nio.channels.CompletionHandler; +import java.util.concurrent.ExecutionException; + +public final class FanOutOneBlockAsyncDFSOutputFlushHandler + implements CompletionHandler<Long, Void> { + + private long size; + + private Throwable error; + + private boolean finished; + + @Override + public synchronized void completed(Long result, Void attachment) { + size = result.longValue(); + finished = true; + notifyAll(); + } + + @Override + public synchronized void failed(Throwable exc, Void attachment) { + error = exc; + finished = true; + notifyAll(); + } + + public synchronized long get() throws InterruptedException, ExecutionException { + while (!finished) { + wait(); + } + if (error != null) { + throw new ExecutionException(error); + } + return size; + } + + public void reset() { + size = 0L; + error = null; + finished = false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java new file mode 100644 index 0000000..2be3b28 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.Daemon; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MiscTests.class, MediumTests.class }) +public class TestFanOutOneBlockAsyncDFSOutput { + + private static final Log LOG = LogFactory.getLog(TestFanOutOneBlockAsyncDFSOutput.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static DistributedFileSystem FS; + + private static EventLoopGroup EVENT_LOOP_GROUP; + + private static int READ_TIMEOUT_MS = 2000; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); + TEST_UTIL.startMiniDFSCluster(3); + FS = TEST_UTIL.getDFSCluster().getFileSystem(); + EVENT_LOOP_GROUP = new NioEventLoopGroup(); + } + + @AfterClass + public static void tearDown() throws IOException, InterruptedException { + if (EVENT_LOOP_GROUP != null) { + EVENT_LOOP_GROUP.shutdownGracefully().sync(); + } + TEST_UTIL.shutdownMiniDFSCluster(); + } + + private void ensureAllDatanodeAlive() throws InterruptedException { + // FanOutOneBlockAsyncDFSOutputHelper.createOutput is fail-fast, so we need to make sure that we + // can create a FanOutOneBlockAsyncDFSOutput after a datanode restarting, otherwise some tests + // will fail. + for (;;) { + try { + FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, + new Path("/ensureDatanodeAlive"), true, true, (short) 3, FS.getDefaultBlockSize(), + EVENT_LOOP_GROUP.next()); + out.close(); + break; + } catch (IOException e) { + Thread.sleep(100); + } + } + } + + static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f, + final FanOutOneBlockAsyncDFSOutput out) + throws IOException, InterruptedException, ExecutionException { + final byte[] b = new byte[10]; + ThreadLocalRandom.current().nextBytes(b); + final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); + eventLoop.execute(new Runnable() { + + @Override + public void run() { + out.write(b, 0, b.length); + out.flush(null, handler, false); + } + }); + assertEquals(b.length, handler.get()); + out.close(); + assertEquals(b.length, dfs.getFileStatus(f).getLen()); + byte[] actual = new byte[b.length]; + try (FSDataInputStream in = dfs.open(f)) { + in.readFully(actual); + } + assertArrayEquals(b, actual); + } + + @Test + public void test() throws IOException, InterruptedException, ExecutionException { + Path f = new Path("/" + name.getMethodName()); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); + writeAndVerify(eventLoop, FS, f, out); + } + + @Test + public void testRecover() throws IOException, InterruptedException, ExecutionException { + Path f = new Path("/" + name.getMethodName()); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); + final byte[] b = new byte[10]; + ThreadLocalRandom.current().nextBytes(b); + final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); + eventLoop.execute(new Runnable() { + + @Override + public void run() { + out.write(b, 0, b.length); + out.flush(null, handler, false); + } + }); + handler.get(); + // restart one datanode which causes one connection broken + TEST_UTIL.getDFSCluster().restartDataNode(0); + try { + handler.reset(); + eventLoop.execute(new Runnable() { + + @Override + public void run() { + out.write(b, 0, b.length); + out.flush(null, handler, false); + } + }); + try { + handler.get(); + fail("flush should fail"); + } catch (ExecutionException e) { + // we restarted one datanode so the flush should fail + LOG.info("expected exception caught", e); + } + out.recoverAndClose(null); + assertEquals(b.length, FS.getFileStatus(f).getLen()); + byte[] actual = new byte[b.length]; + try (FSDataInputStream in = FS.open(f)) { + in.readFully(actual); + } + assertArrayEquals(b, actual); + } finally { + ensureAllDatanodeAlive(); + } + } + + @Test + public void testHeartbeat() throws IOException, InterruptedException, ExecutionException { + Path f = new Path("/" + name.getMethodName()); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); + Thread.sleep(READ_TIMEOUT_MS * 2); + // the connection to datanode should still alive. + writeAndVerify(eventLoop, FS, f, out); + } + + /** + * This is important for fencing when recover from RS crash. + */ + @Test + public void testCreateParentFailed() throws IOException { + Path f = new Path("/" + name.getMethodName() + "/test"); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + try { + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FS.getDefaultBlockSize(), eventLoop); + fail("should fail with parent does not exist"); + } catch (RemoteException e) { + LOG.info("expected exception caught", e); + assertTrue(e.unwrapRemoteException() instanceof FileNotFoundException); + } + } + + @Test + public void testConnectToDatanodeFailed() + throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException, + InvocationTargetException, InterruptedException, NoSuchFieldException { + Field xceiverServerDaemonField = DataNode.class.getDeclaredField("dataXceiverServer"); + xceiverServerDaemonField.setAccessible(true); + Class<?> xceiverServerClass = Class + .forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer"); + Method numPeersMethod = xceiverServerClass.getDeclaredMethod("getNumPeers"); + numPeersMethod.setAccessible(true); + // make one datanode broken + TEST_UTIL.getDFSCluster().getDataNodes().get(0).shutdownDatanode(true); + try { + Path f = new Path("/test"); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + try { + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FS.getDefaultBlockSize(), eventLoop); + fail("should fail with connection error"); + } catch (IOException e) { + LOG.info("expected exception caught", e); + } + for (DataNode dn : TEST_UTIL.getDFSCluster().getDataNodes()) { + Daemon daemon = (Daemon) xceiverServerDaemonField.get(dn); + assertEquals(0, numPeersMethod.invoke(daemon.getRunnable())); + } + } finally { + TEST_UTIL.getDFSCluster().restartDataNode(0); + ensureAllDatanodeAlive(); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java new file mode 100644 index 0000000..04cb0ef --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; +import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MiscTests.class, SmallTests.class }) +public class TestLocalAsyncOutput { + + private static EventLoopGroup GROUP = new NioEventLoopGroup(); + + private static final HBaseCommonTestingUtility TEST_UTIL = new HBaseCommonTestingUtility(); + + @AfterClass + public static void tearDownAfterClass() throws IOException { + TEST_UTIL.cleanupTestDir(); + GROUP.shutdownGracefully(); + } + + @Test + public void test() throws IOException, InterruptedException, ExecutionException { + Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); + FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); + AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, + fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP.next()); + byte[] b = new byte[10]; + ThreadLocalRandom.current().nextBytes(b); + FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); + out.write(b); + out.flush(null, handler, true); + assertEquals(b.length, handler.get()); + out.close(); + assertEquals(b.length, fs.getFileStatus(f).getLen()); + byte[] actual = new byte[b.length]; + try (FSDataInputStream in = fs.open(f)) { + in.readFully(actual); + } + assertArrayEquals(b, actual); + } +}
