Repository: hbase Updated Branches: refs/heads/master a1cc2c4bf -> 515c499f9
HBASE-16110 AsyncFS WAL doesn't work with Hadoop 2.8+ Signed-off-by: Sean Busbey <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/515c499f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/515c499f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/515c499f Branch: refs/heads/master Commit: 515c499f951b864035c5772906b2c0750d9a608f Parents: a1cc2c4 Author: zhangduo <[email protected]> Authored: Tue Jul 12 11:15:08 2016 +0800 Committer: Sean Busbey <[email protected]> Committed: Mon Jul 18 06:54:20 2016 -0500 ---------------------------------------------------------------------- .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 4 +- .../FanOutOneBlockAsyncDFSOutputHelper.java | 565 ++++++++++++------- .../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 213 +++---- 3 files changed, 470 insertions(+), 312 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/515c499f/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 8dd7f5e..9aab924 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.asyncfs; import static io.netty.handler.timeout.IdleState.READER_IDLE; import static io.netty.handler.timeout.IdleState.WRITER_IDLE; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.READ_TIMEOUT; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus; @@ -71,7 +72,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.util.DataChecksum; /** @@ -339,7 +339,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.alloc = alloc; this.buf = alloc.directBuffer(); this.state = State.STREAMING; - setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT)); + setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/515c499f/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 2e88ff2..51c48ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -99,15 +99,15 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProt import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -128,8 +128,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { // copied from DFSPacket since it is package private. public static final long HEART_BEAT_SEQNO = -1L; - // helper class for creating DataChecksum object. - private static final Method CREATE_CHECKSUM; + // Timeouts for communicating with DataNode for streaming writes/reads + public static final int READ_TIMEOUT = 60 * 1000; + public static final int READ_TIMEOUT_EXTENSION = 5 * 1000; + public static final int WRITE_TIMEOUT = 8 * 60 * 1000; // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may @@ -161,6 +163,17 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final FileCreater FILE_CREATER; + // helper class for calling add block method on namenode. There is a addBlockFlags parameter for + // hadoop 2.8 or later. See createBlockAdder for more details. + private interface BlockAdder { + + LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, + ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes) + throws IOException; + } + + private static final BlockAdder BLOCK_ADDER; + // helper class for add or remove lease from DFSClient. Hadoop 2.4 use src as the Map's key, and // hadoop 2.5 or after use inodeId. See createLeaseManager for more details. private interface LeaseManager { @@ -181,156 +194,182 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR; - private static DFSClientAdaptor createDFSClientAdaptor() { - try { - final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); - isClientRunningMethod.setAccessible(true); - return new DFSClientAdaptor() { + // helper class for convert protos. + private interface PBHelper { - @Override - public boolean isClientRunning(DFSClient client) { - try { - return (Boolean) isClientRunningMethod.invoke(client); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } + ExtendedBlockProto convert(final ExtendedBlock b); + + TokenProto convert(Token<?> tok); + } + + private static final PBHelper PB_HELPER; + + // helper class for creating data checksum. + private interface ChecksumCreater { + DataChecksum createChecksum(Object conf); + } + + private static final ChecksumCreater CHECKSUM_CREATER; + + private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException { + final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); + isClientRunningMethod.setAccessible(true); + return new DFSClientAdaptor() { + + @Override + public boolean isClientRunning(DFSClient client) { + try { + return (Boolean) isClientRunningMethod.invoke(client); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); } - }; - } catch (NoSuchMethodException e) { - throw new Error(e); - } + } + }; } - private static LeaseManager createLeaseManager() { - try { - final Method beginFileLeaseMethod = - DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); - beginFileLeaseMethod.setAccessible(true); - final Method endFileLeaseMethod = - DFSClient.class.getDeclaredMethod("endFileLease", long.class); - endFileLeaseMethod.setAccessible(true); - return new LeaseManager() { + private static LeaseManager createLeaseManager25() throws NoSuchMethodException { + final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease", + long.class, DFSOutputStream.class); + beginFileLeaseMethod.setAccessible(true); + final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class); + endFileLeaseMethod.setAccessible(true); + return new LeaseManager() { - @Override - public void begin(DFSClient client, String src, long inodeId) { - try { - beginFileLeaseMethod.invoke(client, inodeId, null); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } + @Override + public void begin(DFSClient client, String src, long inodeId) { + try { + beginFileLeaseMethod.invoke(client, inodeId, null); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); } + } - @Override - public void end(DFSClient client, String src, long inodeId) { - try { - endFileLeaseMethod.invoke(client, inodeId); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } + @Override + public void end(DFSClient client, String src, long inodeId) { + try { + endFileLeaseMethod.invoke(client, inodeId); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); } - }; - } catch (NoSuchMethodException e) { - LOG.warn("No inodeId related lease methods found, should be hadoop 2.4-", e); - } - try { - final Method beginFileLeaseMethod = - DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class); - beginFileLeaseMethod.setAccessible(true); - final Method endFileLeaseMethod = - DFSClient.class.getDeclaredMethod("endFileLease", String.class); - endFileLeaseMethod.setAccessible(true); - return new LeaseManager() { + } + }; + } - @Override - public void begin(DFSClient client, String src, long inodeId) { - try { - beginFileLeaseMethod.invoke(client, src, null); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } + private static LeaseManager createLeaseManager24() throws NoSuchMethodException { + final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease", + String.class, DFSOutputStream.class); + beginFileLeaseMethod.setAccessible(true); + final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", + String.class); + endFileLeaseMethod.setAccessible(true); + return new LeaseManager() { + + @Override + public void begin(DFSClient client, String src, long inodeId) { + try { + beginFileLeaseMethod.invoke(client, src, null); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); } + } - @Override - public void end(DFSClient client, String src, long inodeId) { - try { - endFileLeaseMethod.invoke(client, src); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } + @Override + public void end(DFSClient client, String src, long inodeId) { + try { + endFileLeaseMethod.invoke(client, src); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); } - }; + } + }; + } + + private static LeaseManager createLeaseManager() throws NoSuchMethodException { + try { + return createLeaseManager25(); } catch (NoSuchMethodException e) { - throw new Error(e); + LOG.debug("No inodeId related lease methods found, should be hadoop 2.4-", e); } + return createLeaseManager24(); } - private static PipelineAckStatusGetter createPipelineAckStatusGetter() { + private static PipelineAckStatusGetter createPipelineAckStatusGetter27() + throws NoSuchMethodException { + final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList"); + @SuppressWarnings("rawtypes") + Class<? extends Enum> ecnClass; try { - final Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList"); - @SuppressWarnings("rawtypes") - Class<? extends Enum> ecnClass; - try { - ecnClass = - Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN") - .asSubclass(Enum.class); - } catch (ClassNotFoundException e) { - throw new Error(e); - } - @SuppressWarnings("unchecked") - final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED"); - final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class); - final Method combineHeaderMethod = - PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class); - final Method getStatusFromHeaderMethod = - PipelineAck.class.getMethod("getStatusFromHeader", int.class); - return new PipelineAckStatusGetter() { + ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN") + .asSubclass(Enum.class); + } catch (ClassNotFoundException e) { + final String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " + + "update your WAL Provider to not make use of the 'asyncfs' provider. See " + + "HBASE-16110 for more information."; + LOG.error(msg, e); + throw new Error(msg, e); + } + @SuppressWarnings("unchecked") + final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED"); + final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class); + final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass, + Status.class); + final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader", + int.class); + return new PipelineAckStatusGetter() { - @Override - public Status get(PipelineAckProto ack) { - try { - @SuppressWarnings("unchecked") - List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack); - Integer headerFlag; - if (flagList.isEmpty()) { - Status reply = (Status) getReplyMethod.invoke(ack, 0); - headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply); - } else { - headerFlag = flagList.get(0); - } - return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); + @Override + public Status get(PipelineAckProto ack) { + try { + @SuppressWarnings("unchecked") + List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack); + Integer headerFlag; + if (flagList.isEmpty()) { + Status reply = (Status) getReplyMethod.invoke(ack, 0); + headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply); + } else { + headerFlag = flagList.get(0); } + return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); } - }; - } catch (NoSuchMethodException e) { - LOG.warn("Can not get expected methods, should be hadoop 2.6-", e); - } - try { - final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class); - return new PipelineAckStatusGetter() { + } + }; + } - @Override - public Status get(PipelineAckProto ack) { - try { - return (Status) getStatusMethod.invoke(ack, 0); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } + private static PipelineAckStatusGetter createPipelineAckStatusGetter26() + throws NoSuchMethodException { + final Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class); + return new PipelineAckStatusGetter() { + + @Override + public Status get(PipelineAckProto ack) { + try { + return (Status) getStatusMethod.invoke(ack, 0); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); } - }; + } + }; + } + + private static PipelineAckStatusGetter createPipelineAckStatusGetter() + throws NoSuchMethodException { + try { + return createPipelineAckStatusGetter27(); } catch (NoSuchMethodException e) { - throw new Error(e); + LOG.debug("Can not get expected methods, should be hadoop 2.6-", e); } + return createPipelineAckStatusGetter26(); } private static StorageTypeSetter createStorageTypeSetter() { final Method setStorageTypeMethod; try { - setStorageTypeMethod = - OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class); + setStorageTypeMethod = OpWriteBlockProto.Builder.class.getMethod("setStorageType", + StorageTypeProto.class); } catch (NoSuchMethodException e) { - LOG.warn("noSetStorageType method found, should be hadoop 2.5-", e); + LOG.debug("noSetStorageType method found, should be hadoop 2.5-", e); return new StorageTypeSetter() { @Override @@ -359,7 +398,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { }; } - private static FileCreater createFileCreater() { + private static FileCreater createFileCreater() throws ClassNotFoundException, + NoSuchMethodException, IllegalAccessException, InvocationTargetException { for (Method method : ClientProtocol.class.getMethods()) { if (method.getName().equals("create")) { final Method createMethod = method; @@ -372,8 +412,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize) throws IOException { try { - return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, - flag, createParent, replication, blockSize); + return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag, + createParent, replication, blockSize); } catch (IllegalAccessException e) { throw new RuntimeException(e); } catch (InvocationTargetException e) { @@ -383,36 +423,159 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { } }; } else { - try { - Class<?> cryptoProtocolVersionClass = - Class.forName("org.apache.hadoop.crypto.CryptoProtocolVersion"); - Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported"); - final Object supported = supportedMethod.invoke(null); - return new FileCreater() { + Class<?> cryptoProtocolVersionClass = Class + .forName("org.apache.hadoop.crypto.CryptoProtocolVersion"); + Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported"); + final Object supported = supportedMethod.invoke(null); + return new FileCreater() { - @Override - public HdfsFileStatus create(ClientProtocol namenode, String src, - FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, - boolean createParent, short replication, long blockSize) throws IOException { - try { - return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, - flag, createParent, replication, blockSize, supported); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - Throwables.propagateIfPossible(e.getTargetException(), IOException.class); - throw new RuntimeException(e); - } + @Override + public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked, + String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, + short replication, long blockSize) throws IOException { + try { + return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag, + createParent, replication, blockSize, supported); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + Throwables.propagateIfPossible(e.getTargetException(), IOException.class); + throw new RuntimeException(e); + } + } + }; + } + } + } + throw new NoSuchMethodException("Can not find create method in ClientProtocol"); + } + + private static BlockAdder createBlockAdder() throws NoSuchMethodException { + for (Method method : ClientProtocol.class.getMethods()) { + if (method.getName().equals("addBlock")) { + final Method addBlockMethod = method; + Class<?>[] paramTypes = addBlockMethod.getParameterTypes(); + if (paramTypes[paramTypes.length - 1] == String[].class) { + return new BlockAdder() { + + @Override + public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, + ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, + String[] favoredNodes) throws IOException { + try { + return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous, + excludeNodes, fileId, favoredNodes); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + Throwables.propagateIfPossible(e.getTargetException(), IOException.class); + throw new RuntimeException(e); + } + } + }; + } else { + return new BlockAdder() { + + @Override + public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, + ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, + String[] favoredNodes) throws IOException { + try { + return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous, + excludeNodes, fileId, favoredNodes, null); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + Throwables.propagateIfPossible(e.getTargetException(), IOException.class); + throw new RuntimeException(e); } - }; - } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException - | InvocationTargetException e) { - throw new Error(e); + } + }; + } + } + } + throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol"); + } + + private static PBHelper createPBHelper() throws NoSuchMethodException { + Class<?> helperClass; + try { + helperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient"); + } catch (ClassNotFoundException e) { + LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e); + helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; + } + final Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class); + final Method convertTokenMethod = helperClass.getMethod("convert", Token.class); + return new PBHelper() { + + @Override + public ExtendedBlockProto convert(ExtendedBlock b) { + try { + return (ExtendedBlockProto) convertEBMethod.invoke(null, b); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public TokenProto convert(Token<?> tok) { + try { + return (TokenProto) convertTokenMethod.invoke(null, tok); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } + + private static ChecksumCreater createChecksumCreater28(Class<?> confClass) + throws NoSuchMethodException { + for (Method method : confClass.getMethods()) { + if (method.getName().equals("createChecksum")) { + final Method createChecksumMethod = method; + return new ChecksumCreater() { + + @Override + public DataChecksum createChecksum(Object conf) { + try { + return (DataChecksum) createChecksumMethod.invoke(conf, (Object) null); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } } + }; + } + } + throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf"); + } + + private static ChecksumCreater createChecksumCreater27(Class<?> confClass) + throws NoSuchMethodException { + final Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum"); + createChecksumMethod.setAccessible(true); + return new ChecksumCreater() { + + @Override + public DataChecksum createChecksum(Object conf) { + try { + return (DataChecksum) createChecksumMethod.invoke(conf); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); } } + }; + } + + private static ChecksumCreater createChecksumCreater() + throws NoSuchMethodException, ClassNotFoundException { + try { + return createChecksumCreater28( + Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf")); + } catch (ClassNotFoundException e) { + LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e); } - throw new Error("No create method found for " + ClientProtocol.class.getName()); + return createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf")); } // cancel the processing if DFSClient is already closed. @@ -432,17 +595,21 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { static { try { - CREATE_CHECKSUM = DFSClient.Conf.class.getDeclaredMethod("createChecksum"); - CREATE_CHECKSUM.setAccessible(true); - } catch (NoSuchMethodException e) { - throw new Error(e); + PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter(); + STORAGE_TYPE_SETTER = createStorageTypeSetter(); + FILE_CREATER = createFileCreater(); + BLOCK_ADDER = createBlockAdder(); + LEASE_MANAGER = createLeaseManager(); + DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); + PB_HELPER = createPBHelper(); + CHECKSUM_CREATER = createChecksumCreater(); + } catch (Exception e) { + final String msg = "Couldn't properly initialize access to HDFS internals. Please " + + "update your WAL Provider to not make use of the 'asyncfs' provider. See " + + "HBASE-16110 for more information."; + LOG.error(msg, e); + throw new Error(msg, e); } - - PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter(); - STORAGE_TYPE_SETTER = createStorageTypeSetter(); - FILE_CREATER = createFileCreater(); - LEASE_MANAGER = createLeaseManager(); - DFS_CLIENT_ADAPTOR = createDFSClientAdaptor(); } static void beginFileLease(DFSClient client, String src, long inodeId) { @@ -454,11 +621,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { } static DataChecksum createChecksum(DFSClient client) { - try { - return (DataChecksum) CREATE_CHECKSUM.invoke(client.getConf()); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } + return CHECKSUM_CREATER.createChecksum(client.getConf()); } static Status getStatus(PipelineAckProto ack) { @@ -530,8 +693,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build(); int protoLen = proto.getSerializedSize(); - ByteBuf buffer = - channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); + ByteBuf buffer = channel.alloc() + .buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); buffer.writeByte(Op.WRITE_BLOCK.code); proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); @@ -540,8 +703,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { private static void initialize(Configuration conf, final Channel channel, final DatanodeInfo dnInfo, final Enum<?> storageType, - final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, - DFSClient client, Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) { + final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, DFSClient client, + Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) { Promise<Void> saslPromise = channel.eventLoop().newPromise(); trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); saslPromise.addListener(new FutureListener<Void>() { @@ -560,32 +723,26 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { } private static List<Future<Channel>> connectToDataNodes(final Configuration conf, - final DFSClient client, String clientName, final LocatedBlock locatedBlock, - long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer, - EventLoop eventLoop) { + final DFSClient client, String clientName, final LocatedBlock locatedBlock, long maxBytesRcvd, + long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoop eventLoop) { Enum<?>[] storageTypes = locatedBlock.getStorageTypes(); DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); - boolean connectToDnViaHostname = - conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); - final int timeoutMs = - conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT); + boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, + DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); + final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT); ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); blockCopy.setNumBytes(locatedBlock.getBlockSize()); - ClientOperationHeaderProto header = - ClientOperationHeaderProto - .newBuilder() - .setBaseHeader( - BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy)) - .setToken(PBHelper.convert(locatedBlock.getBlockToken()))) - .setClientName(clientName).build(); + ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() + .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy)) + .setToken(PB_HELPER.convert(locatedBlock.getBlockToken()))) + .setClientName(clientName).build(); ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); - final OpWriteBlockProto.Builder writeBlockProtoBuilder = - OpWriteBlockProto.newBuilder().setHeader(header) - .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) - .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()) - .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS) - .setRequestedChecksum(checksumProto) - .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); + final OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder() + .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) + .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()) + .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS) + .setRequestedChecksum(checksumProto) + .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); for (int i = 0; i < datanodeInfos.length; i++) { final DatanodeInfo dnInfo = datanodeInfos[i]; @@ -642,14 +799,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { ClientProtocol namenode = client.getNamenode(); HdfsFileStatus stat; try { - stat = - FILE_CREATER.create( - namenode, - src, - FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), - clientName, - new EnumSetWritable<CreateFlag>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet - .of(CREATE)), createParent, replication, blockSize); + stat = FILE_CREATER.create(namenode, src, + FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, + new EnumSetWritable<CreateFlag>( + overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), + createParent, replication, blockSize); } catch (Exception e) { if (e instanceof RemoteException) { throw (RemoteException) e; @@ -663,12 +817,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { List<Future<Channel>> futureList = null; try { DataChecksum summer = createChecksum(client); - locatedBlock = - namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), null); + locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null, null, + stat.getFileId(), null); List<Channel> datanodeList = new ArrayList<>(); - futureList = - connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE, - summer, eventLoop); + futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, + PIPELINE_SETUP_CREATE, summer, eventLoop); for (Future<Channel> future : futureList) { // fail the creation if there are connection failures since we are fail-fast. The upper // layer should retry itself if needed. @@ -712,8 +865,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper { return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { @Override - public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, - UnresolvedLinkException { + public FanOutOneBlockAsyncDFSOutput doCall(Path p) + throws IOException, UnresolvedLinkException { return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, blockSize, eventLoop); } http://git-wip-us.apache.org/repos/asf/hbase/blob/515c499f/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java index 33e8841..0546253 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -86,7 +86,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; -import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.security.SaslPropertiesResolver; @@ -112,8 +111,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { private static final String NAME_DELIMITER = " "; @VisibleForTesting - static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = - "dfs.encrypt.data.transfer.cipher.suites"; + static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = "dfs.encrypt.data.transfer.cipher.suites"; @VisibleForTesting static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding"; @@ -185,7 +183,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { try { cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec"); } catch (ClassNotFoundException e) { - LOG.warn("No CryptoCodec class found, should be hadoop 2.5-", e); + LOG.debug("No CryptoCodec class found, should be hadoop 2.5-", e); } if (cryptoCodecClass != null) { Method getInstanceMethod = null; @@ -195,8 +193,12 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { break; } } - CREATE_CODEC = getInstanceMethod; try { + if (getInstanceMethod == null) { + throw new NoSuchMethodException( + "Can not find suitable getInstance method in CryptoCodec"); + } + CREATE_CODEC = getInstanceMethod; CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor"); CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor"); @@ -207,11 +209,14 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { Class<?> decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor"); INIT_DECRYPTOR = decryptorClass.getMethod("init", byte[].class, byte[].class); DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class); - } catch (NoSuchMethodException | ClassNotFoundException e) { - throw new Error(e); + } catch (Exception e) { + final String msg = "Couldn't properly initialize access to HDFS internals. Please " + + "update your WAL Provider to not make use of the 'asyncfs' provider. See " + + "HBASE-16110 for more information."; + LOG.error(msg, e); + throw new Error(msg, e); } } else { - LOG.warn("Can not initialize CryptoCodec, should be hadoop 2.5-"); CREATE_CODEC = null; CREATE_ENCRYPTOR = null; CREATE_DECRYPTOR = null; @@ -329,62 +334,53 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { }; } - private static SaslAdaptor createSaslAdaptor25() { - try { - final Field trustedChannelResolverField = DFSClient.class - .getDeclaredField("trustedChannelResolver"); - trustedChannelResolverField.setAccessible(true); - final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey"); - return new SaslAdaptor() { - - @Override - public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) { - try { - return (TrustedChannelResolver) trustedChannelResolverField.get(client); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } + private static SaslAdaptor createSaslAdaptor25() + throws NoSuchFieldException, NoSuchMethodException { + final Field trustedChannelResolverField = DFSClient.class + .getDeclaredField("trustedChannelResolver"); + trustedChannelResolverField.setAccessible(true); + final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey"); + return new SaslAdaptor() { - @Override - public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) { - return null; + @Override + public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) { + try { + return (TrustedChannelResolver) trustedChannelResolverField.get(client); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); } + } - @Override - public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { - return null; - } + @Override + public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) { + return null; + } - @Override - public DataEncryptionKey createDataEncryptionKey(DFSClient client) { - try { - return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } catch (NoSuchFieldException | NoSuchMethodException e) { - throw new Error(e); - } + @Override + public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { + return null; + } + @Override + public DataEncryptionKey createDataEncryptionKey(DFSClient client) { + try { + return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; } - private static SaslAdaptor createSaslAdaptor() { - Class<?> saslDataTransferClientClass = null; + private static SaslAdaptor createSaslAdaptor() + throws NoSuchFieldException, NoSuchMethodException { try { - saslDataTransferClientClass = Class - .forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"); + return createSaslAdaptor27( + Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient")); } catch (ClassNotFoundException e) { - LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-"); - } - try { - return saslDataTransferClientClass != null ? createSaslAdaptor27(saslDataTransferClientClass) - : createSaslAdaptor25(); - } catch (NoSuchFieldException | NoSuchMethodException e) { - throw new Error(e); + LOG.debug("No SaslDataTransferClient class found, should be hadoop 2.5-", e); } + return createSaslAdaptor25(); } private static CipherOptionHelper createCipherHelper25() { @@ -451,9 +447,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey"); final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv"); - final Method convertCipherOptionsMethod = PBHelper.class.getMethod("convertCipherOptions", + Class<?> pbHelperClass; + try { + pbHelperClass = Class.forName("org.apache.hadoop.hdfs.protocolPB.PBHelperClient"); + } catch (ClassNotFoundException e) { + LOG.debug("No PBHelperClient class found, should be hadoop 2.7-", e); + pbHelperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class; + } + final Method convertCipherOptionsMethod = pbHelperClass.getMethod("convertCipherOptions", List.class); - final Method convertCipherOptionProtosMethod = PBHelper.class + final Method convertCipherOptionProtosMethod = pbHelperClass .getMethod("convertCipherOptionProtos", List.class); final Method addAllCipherOptionMethod = DataTransferEncryptorMessageProto.Builder.class .getMethod("addAllCipherOption", Iterable.class); @@ -577,19 +580,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { }; } - private static CipherOptionHelper createCipherHelper() { + private static CipherOptionHelper createCipherHelper() + throws ClassNotFoundException, NoSuchMethodException { Class<?> cipherOptionClass; try { cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption"); } catch (ClassNotFoundException e) { - LOG.warn("No CipherOption class found, should be hadoop 2.5-"); + LOG.debug("No CipherOption class found, should be hadoop 2.5-", e); return createCipherHelper25(); } - try { - return createCipherHelper27(cipherOptionClass); - } catch (NoSuchMethodException | ClassNotFoundException e) { - throw new Error(e); - } + return createCipherHelper27(cipherOptionClass); } private static TransparentCryptoHelper createTransparentCryptoHelper25() { @@ -646,25 +646,30 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { }; } - private static TransparentCryptoHelper createTransparentCryptoHelper() { + private static TransparentCryptoHelper createTransparentCryptoHelper() + throws NoSuchMethodException, ClassNotFoundException { Class<?> feInfoClass; try { feInfoClass = Class.forName("org.apache.hadoop.fs.FileEncryptionInfo"); } catch (ClassNotFoundException e) { - LOG.warn("No FileEncryptionInfo class found, should be hadoop 2.5-"); + LOG.debug("No FileEncryptionInfo class found, should be hadoop 2.5-", e); return createTransparentCryptoHelper25(); } - try { - return createTransparentCryptoHelper27(feInfoClass); - } catch (NoSuchMethodException | ClassNotFoundException e) { - throw new Error(e); - } + return createTransparentCryptoHelper27(feInfoClass); } static { - SASL_ADAPTOR = createSaslAdaptor(); - CIPHER_OPTION_HELPER = createCipherHelper(); - TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper(); + try { + SASL_ADAPTOR = createSaslAdaptor(); + CIPHER_OPTION_HELPER = createCipherHelper(); + TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper(); + } catch (Exception e) { + final String msg = "Couldn't properly initialize access to HDFS internals. Please " + + "update your WAL Provider to not make use of the 'asyncfs' provider. See " + + "HBASE-16110 for more information."; + LOG.error(msg, e); + throw new Error(msg, e); + } } /** @@ -828,40 +833,40 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper { byte[] challenge = proto.getPayload().toByteArray(); byte[] response = saslClient.evaluateChallenge(challenge); switch (step) { - case 1: { - List<Object> cipherOptions = null; - if (requestedQopContainsPrivacy()) { - cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf); - } - sendSaslMessage(ctx, response, cipherOptions); - ctx.flush(); - step++; - break; + case 1: { + List<Object> cipherOptions = null; + if (requestedQopContainsPrivacy()) { + cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf); } - case 2: { - assert response == null; - checkSaslComplete(); - Object cipherOption = - CIPHER_OPTION_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); - ChannelPipeline p = ctx.pipeline(); - while (p.first() != null) { - p.removeFirst(); - } - if (cipherOption != null) { - CryptoCodec codec = new CryptoCodec(conf, cipherOption); - p.addLast(new EncryptHandler(codec), new DecryptHandler(codec)); - } else { - if (useWrap()) { - p.addLast(new SaslWrapHandler(saslClient), - new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), - new SaslUnwrapHandler(saslClient)); - } + sendSaslMessage(ctx, response, cipherOptions); + ctx.flush(); + step++; + break; + } + case 2: { + assert response == null; + checkSaslComplete(); + Object cipherOption = CIPHER_OPTION_HELPER.getCipherOption(proto, + isNegotiatedQopPrivacy(), saslClient); + ChannelPipeline p = ctx.pipeline(); + while (p.first() != null) { + p.removeFirst(); + } + if (cipherOption != null) { + CryptoCodec codec = new CryptoCodec(conf, cipherOption); + p.addLast(new EncryptHandler(codec), new DecryptHandler(codec)); + } else { + if (useWrap()) { + p.addLast(new SaslWrapHandler(saslClient), + new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4), + new SaslUnwrapHandler(saslClient)); } - promise.trySuccess(null); - break; } - default: - throw new IllegalArgumentException("Unrecognized negotiation step: " + step); + promise.trySuccess(null); + break; + } + default: + throw new IllegalArgumentException("Unrecognized negotiation step: " + step); } } else { ctx.fireChannelRead(msg);
