http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java new file mode 100644 index 0000000..870262e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -0,0 +1,1033 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs; + +import static io.netty.handler.timeout.IdleState.READER_IDLE; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Promise; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.RealmChoiceCallback; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.security.SaslPropertiesResolver; +import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; + +/** + * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}. + */ [email protected] +public final class FanOutOneBlockAsyncDFSOutputSaslHelper { + + private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputSaslHelper.class); + + private FanOutOneBlockAsyncDFSOutputSaslHelper() { + } + + private static final String SERVER_NAME = "0"; + private static final String PROTOCOL = "hdfs"; + private static final String MECHANISM = "DIGEST-MD5"; + private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; + private static final String NAME_DELIMITER = " "; + private static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = + "dfs.encrypt.data.transfer.cipher.suites"; + private static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding"; + + private interface SaslAdaptor { + + SaslPropertiesResolver getSaslPropsResolver(DFSClient client); + + TrustedChannelResolver getTrustedChannelResolver(DFSClient client); + + AtomicBoolean getFallbackToSimpleAuth(DFSClient client); + + DataEncryptionKey createDataEncryptionKey(DFSClient client); + } + + private static final SaslAdaptor SASL_ADAPTOR; + + private interface CipherHelper { + + List<Object> getCipherOptions(Configuration conf) throws IOException; + + void addCipherOptions(DataTransferEncryptorMessageProto.Builder builder, + List<Object> cipherOptions); + + Object getCipherOption(DataTransferEncryptorMessageProto proto, boolean isNegotiatedQopPrivacy, + SaslClient saslClient) throws IOException; + + Object getCipherSuite(Object cipherOption); + + byte[] getInKey(Object cipherOption); + + byte[] getInIv(Object cipherOption); + + byte[] getOutKey(Object cipherOption); + + byte[] getOutIv(Object cipherOption); + } + + private static final CipherHelper CIPHER_HELPER; + + private static final class CryptoCodec { + + private static final Method CREATE_CODEC; + + private static final Method CREATE_ENCRYPTOR; + + private static final Method CREATE_DECRYPTOR; + + private static final Method INIT_ENCRYPTOR; + + private static final Method INIT_DECRYPTOR; + + private static final Method ENCRYPT; + + private static final Method DECRYPT; + + static { + Class<?> cryptoCodecClass = null; + try { + cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec"); + } catch (ClassNotFoundException e) { + LOG.warn("No CryptoCodec class found, should be hadoop 2.5-", e); + } + if (cryptoCodecClass != null) { + Method getInstanceMethod = null; + for (Method method : cryptoCodecClass.getMethods()) { + if (method.getName().equals("getInstance") && method.getParameterTypes().length == 2) { + getInstanceMethod = method; + break; + } + } + CREATE_CODEC = getInstanceMethod; + try { + CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor"); + CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor"); + + Class<?> encryptorClass = Class.forName("org.apache.hadoop.crypto.Encryptor"); + INIT_ENCRYPTOR = encryptorClass.getMethod("init"); + ENCRYPT = encryptorClass.getMethod("encrypt", ByteBuffer.class, ByteBuffer.class); + + Class<?> decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor"); + INIT_DECRYPTOR = decryptorClass.getMethod("init"); + DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class); + } catch (NoSuchMethodException | ClassNotFoundException e) { + throw new Error(e); + } + } else { + LOG.warn("Can not initialize CryptoCodec, should be hadoop 2.5-"); + CREATE_CODEC = null; + CREATE_ENCRYPTOR = null; + CREATE_DECRYPTOR = null; + INIT_ENCRYPTOR = null; + INIT_DECRYPTOR = null; + ENCRYPT = null; + DECRYPT = null; + } + } + + private final Object encryptor; + + private final Object decryptor; + + public CryptoCodec(Configuration conf, Object cipherOption) { + Object codec; + try { + codec = CREATE_CODEC.invoke(null, conf, CIPHER_HELPER.getCipherSuite(cipherOption)); + encryptor = CREATE_ENCRYPTOR.invoke(codec); + byte[] encKey = CIPHER_HELPER.getInKey(cipherOption); + byte[] encIv = CIPHER_HELPER.getInIv(cipherOption); + INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, encIv.length)); + + decryptor = CREATE_DECRYPTOR.invoke(codec); + byte[] decKey = CIPHER_HELPER.getOutKey(cipherOption); + byte[] decIv = CIPHER_HELPER.getOutIv(cipherOption); + INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, decIv.length)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) { + try { + ENCRYPT.invoke(encryptor, inBuffer, outBuffer); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) { + try { + DECRYPT.invoke(decryptor, inBuffer, outBuffer); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + } + + private static SaslAdaptor createSaslAdaptor27(Class<?> saslDataTransferClientClass) + throws NoSuchFieldException, NoSuchMethodException { + final Field saslPropsResolverField = + saslDataTransferClientClass.getDeclaredField("saslPropsResolver"); + saslPropsResolverField.setAccessible(true); + final Field trustedChannelResolverField = + saslDataTransferClientClass.getDeclaredField("trustedChannelResolver"); + trustedChannelResolverField.setAccessible(true); + final Field fallbackToSimpleAuthField = + saslDataTransferClientClass.getDeclaredField("fallbackToSimpleAuth"); + fallbackToSimpleAuthField.setAccessible(true); + final Method getSaslDataTransferClientMethod = + DFSClient.class.getMethod("getSaslDataTransferClient"); + final Method newDataEncryptionKeyMethod = DFSClient.class.getMethod("newDataEncryptionKey"); + return new SaslAdaptor() { + + @Override + public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) { + try { + return (TrustedChannelResolver) trustedChannelResolverField + .get(getSaslDataTransferClientMethod.invoke(client)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) { + try { + return (SaslPropertiesResolver) saslPropsResolverField + .get(getSaslDataTransferClientMethod.invoke(client)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { + try { + return (AtomicBoolean) fallbackToSimpleAuthField.get(getSaslDataTransferClientMethod + .invoke(client)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public DataEncryptionKey createDataEncryptionKey(DFSClient client) { + try { + return (DataEncryptionKey) newDataEncryptionKeyMethod.invoke(client); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } + + private static SaslAdaptor createSaslAdaptor25() { + try { + final Field trustedChannelResolverField = + DFSClient.class.getDeclaredField("trustedChannelResolver"); + trustedChannelResolverField.setAccessible(true); + final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey"); + return new SaslAdaptor() { + + @Override + public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) { + try { + return (TrustedChannelResolver) trustedChannelResolverField.get(client); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) { + return null; + } + + @Override + public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { + return null; + } + + @Override + public DataEncryptionKey createDataEncryptionKey(DFSClient client) { + try { + return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } catch (NoSuchFieldException | NoSuchMethodException e) { + throw new Error(e); + } + + } + + private static SaslAdaptor createSaslAdaptor() { + Class<?> saslDataTransferClientClass = null; + try { + saslDataTransferClientClass = + Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"); + } catch (ClassNotFoundException e) { + LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-"); + } + try { + return saslDataTransferClientClass != null ? createSaslAdaptor27(saslDataTransferClientClass) + : createSaslAdaptor25(); + } catch (NoSuchFieldException | NoSuchMethodException e) { + throw new Error(e); + } + } + + private static CipherHelper createCipherHelper25() { + return new CipherHelper() { + + @Override + public byte[] getOutKey(Object cipherOption) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getOutIv(Object cipherOption) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getInKey(Object cipherOption) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getInIv(Object cipherOption) { + throw new UnsupportedOperationException(); + } + + @Override + public Object getCipherSuite(Object cipherOption) { + throw new UnsupportedOperationException(); + } + + @Override + public List<Object> getCipherOptions(Configuration conf) { + return null; + } + + @Override + public Object getCipherOption(DataTransferEncryptorMessageProto proto, + boolean isNegotiatedQopPrivacy, SaslClient saslClient) { + return null; + } + + @Override + public void addCipherOptions(Builder builder, List<Object> cipherOptions) { + throw new UnsupportedOperationException(); + } + }; + } + + private static CipherHelper createCipherHelper27(Class<?> cipherOptionClass) + throws ClassNotFoundException, NoSuchMethodException { + @SuppressWarnings("rawtypes") + Class<? extends Enum> cipherSuiteClass = + Class.forName("org.apache.hadoop.crypto.CipherSuite").asSubclass(Enum.class); + @SuppressWarnings("unchecked") + final Enum<?> aesCipherSuite = Enum.valueOf(cipherSuiteClass, "AES_CTR_NOPADDING"); + final Constructor<?> cipherOptionConstructor = + cipherOptionClass.getConstructor(cipherSuiteClass); + final Constructor<?> cipherOptionWithKeyAndIvConstructor = + cipherOptionClass.getConstructor(cipherSuiteClass, byte[].class, byte[].class, + byte[].class, byte[].class); + + final Method getCipherSuiteMethod = cipherOptionClass.getMethod("getCipherSuite"); + final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey"); + final Method getInIvMethod = cipherOptionClass.getMethod("getInIv"); + final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey"); + final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv"); + + final Method convertCipherOptionsMethod = + PBHelper.class.getMethod("convertCipherOptions", List.class); + final Method convertCipherOptionProtosMethod = + PBHelper.class.getMethod("convertCipherOptionProtos", List.class); + final Method addAllCipherOptionMethod = + DataTransferEncryptorMessageProto.Builder.class.getMethod("addAllCipherOption", + Iterable.class); + final Method getCipherOptionListMethod = + DataTransferEncryptorMessageProto.class.getMethod("getCipherOptionList"); + return new CipherHelper() { + + @Override + public byte[] getOutKey(Object cipherOption) { + try { + return (byte[]) getOutKeyMethod.invoke(cipherOption); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public byte[] getOutIv(Object cipherOption) { + try { + return (byte[]) getOutIvMethod.invoke(cipherOption); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public byte[] getInKey(Object cipherOption) { + try { + return (byte[]) getInKeyMethod.invoke(cipherOption); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public byte[] getInIv(Object cipherOption) { + try { + return (byte[]) getInIvMethod.invoke(cipherOption); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object getCipherSuite(Object cipherOption) { + try { + return getCipherSuiteMethod.invoke(cipherOption); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public List<Object> getCipherOptions(Configuration conf) throws IOException { + // Negotiate cipher suites if configured. Currently, the only supported + // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple + // values for future expansion. + String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); + if (cipherSuites == null || cipherSuites.isEmpty()) { + return null; + } + if (!cipherSuites.equals(AES_CTR_NOPADDING)) { + throw new IOException(String.format("Invalid cipher suite, %s=%s", + DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); + } + Object option; + try { + option = cipherOptionConstructor.newInstance(aesCipherSuite); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + List<Object> cipherOptions = Lists.newArrayListWithCapacity(1); + cipherOptions.add(option); + return cipherOptions; + } + + private Object unwrap(Object option, SaslClient saslClient) throws IOException { + byte[] inKey = getInKey(option); + if (inKey != null) { + inKey = saslClient.unwrap(inKey, 0, inKey.length); + } + byte[] outKey = getOutKey(option); + if (outKey != null) { + outKey = saslClient.unwrap(outKey, 0, outKey.length); + } + try { + return cipherOptionWithKeyAndIvConstructor.newInstance(getCipherSuite(option), inKey, + getInIv(option), outKey, getOutIv(option)); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + @Override + public Object getCipherOption(DataTransferEncryptorMessageProto proto, + boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { + List<Object> cipherOptions; + try { + cipherOptions = + (List<Object>) convertCipherOptionProtosMethod.invoke(null, + getCipherOptionListMethod.invoke(proto)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + if (cipherOptions == null || cipherOptions.isEmpty()) { + return null; + } + Object cipherOption = cipherOptions.get(0); + return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; + } + + @Override + public void addCipherOptions(Builder builder, List<Object> cipherOptions) { + try { + addAllCipherOptionMethod.invoke(builder, + convertCipherOptionsMethod.invoke(null, cipherOptions)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } + + private static CipherHelper createCipherHelper() { + Class<?> cipherOptionClass; + try { + cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption"); + } catch (ClassNotFoundException e) { + LOG.warn("No CipherOption class found, should be hadoop 2.5-"); + return createCipherHelper25(); + } + try { + return createCipherHelper27(cipherOptionClass); + } catch (NoSuchMethodException | ClassNotFoundException e) { + throw new Error(e); + } + } + + static { + SASL_ADAPTOR = createSaslAdaptor(); + CIPHER_HELPER = createCipherHelper(); + } + + /** + * Sets user name and password when asked by the client-side SASL object. + */ + private static final class SaslClientCallbackHandler implements CallbackHandler { + + private final char[] password; + private final String userName; + + /** + * Creates a new SaslClientCallbackHandler. + * @param userName SASL user name + * @Param password SASL password + */ + public SaslClientCallbackHandler(String userName, char[] password) { + this.password = password; + this.userName = userName; + } + + @Override + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + RealmCallback rc = null; + for (Callback callback : callbacks) { + if (callback instanceof RealmChoiceCallback) { + continue; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + rc = (RealmCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); + } + } + if (nc != null) { + nc.setName(userName); + } + if (pc != null) { + pc.setPassword(password); + } + if (rc != null) { + rc.setText(rc.getDefaultText()); + } + } + } + + private static final class SaslNegotiateHandler extends ChannelDuplexHandler { + + private final Configuration conf; + + private final Map<String, String> saslProps; + + private final SaslClient saslClient; + + private final int timeoutMs; + + private final Promise<Void> promise; + + private int step = 0; + + public SaslNegotiateHandler(Configuration conf, String username, char[] password, + Map<String, String> saslProps, int timeoutMs, Promise<Void> promise) throws SaslException { + this.conf = conf; + this.saslProps = saslProps; + this.saslClient = + Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, SERVER_NAME, + saslProps, new SaslClientCallbackHandler(username, password)); + this.timeoutMs = timeoutMs; + this.promise = promise; + } + + private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { + sendSaslMessage(ctx, payload, null); + } + + private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List<Object> options) + throws IOException { + DataTransferEncryptorMessageProto.Builder builder = + DataTransferEncryptorMessageProto.newBuilder(); + builder.setStatus(DataTransferEncryptorStatus.SUCCESS); + if (payload != null) { + builder.setPayload(ByteString.copyFrom(payload)); + } + if (options != null) { + CIPHER_HELPER.addCipherOptions(builder, options); + } + DataTransferEncryptorMessageProto proto = builder.build(); + int size = proto.getSerializedSize(); + size += CodedOutputStream.computeRawVarint32Size(size); + ByteBuf buf = ctx.alloc().buffer(size); + proto.writeDelimitedTo(new ByteBufOutputStream(buf)); + ctx.write(buf); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); + sendSaslMessage(ctx, new byte[0]); + ctx.flush(); + step++; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + saslClient.dispose(); + } + + private void check(DataTransferEncryptorMessageProto proto) throws IOException { + if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + throw new InvalidEncryptionKeyException(proto.getMessage()); + } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { + throw new IOException(proto.getMessage()); + } + } + + private String getNegotiatedQop() { + return (String) saslClient.getNegotiatedProperty(Sasl.QOP); + } + + private boolean isNegotiatedQopPrivacy() { + String qop = getNegotiatedQop(); + return qop != null && "auth-conf".equalsIgnoreCase(qop); + } + + private boolean requestedQopContainsPrivacy() { + Set<String> requestedQop = + ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); + return requestedQop.contains("auth-conf"); + } + + private void checkSaslComplete() throws IOException { + if (!saslClient.isComplete()) { + throw new IOException("Failed to complete SASL handshake"); + } + Set<String> requestedQop = + ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); + String negotiatedQop = getNegotiatedQop(); + LOG.debug("Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + + negotiatedQop); + if (!requestedQop.contains(negotiatedQop)) { + throw new IOException(String.format("SASL handshake completed, but " + + "channel does not have acceptable quality of protection, " + + "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); + } + } + + private boolean useWrap() { + String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + return qop != null && !"auth".equalsIgnoreCase(qop); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { + if (msg instanceof DataTransferEncryptorMessageProto) { + DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; + check(proto); + byte[] challenge = proto.getPayload().toByteArray(); + byte[] response = saslClient.evaluateChallenge(challenge); + switch (step) { + case 1: { + List<Object> cipherOptions = null; + if (requestedQopContainsPrivacy()) { + cipherOptions = CIPHER_HELPER.getCipherOptions(conf); + } + sendSaslMessage(ctx, response, cipherOptions); + ctx.flush(); + step++; + break; + } + case 2: { + assert response == null; + checkSaslComplete(); + Object cipherOption = + CIPHER_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); + ChannelPipeline p = ctx.pipeline(); + while (p.first() != null) { + p.removeFirst(); + } + if (cipherOption != null) { + CryptoCodec codec = new CryptoCodec(conf, cipherOption); + p.addLast(new EncryptHandler(codec), new DecryptHandler(codec)); + } else { + if (useWrap()) { + p.addLast(new SaslWrapHandler(saslClient), new LengthFieldBasedFrameDecoder( + Integer.MAX_VALUE, 0, 4), new SaslUnwrapHandler(saslClient)); + } + } + promise.trySuccess(null); + break; + } + default: + throw new IllegalArgumentException("Unrecognized negotiation step: " + step); + } + } else { + ctx.fireChannelRead(msg); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + promise.tryFailure(cause); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { + promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); + } else { + super.userEventTriggered(ctx, evt); + } + } + } + + private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { + + private final SaslClient saslClient; + + public SaslUnwrapHandler(SaslClient saslClient) { + this.saslClient = saslClient; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + saslClient.dispose(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + msg.skipBytes(4); + byte[] b = new byte[msg.readableBytes()]; + msg.readBytes(b); + ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); + } + } + + private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { + + private final SaslClient saslClient; + + private CompositeByteBuf cBuf; + + public SaslWrapHandler(SaslClient saslClient) { + this.saslClient = saslClient; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + cBuf.addComponent(buf); + cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); + } else { + ctx.write(msg); + } + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + if (cBuf.isReadable()) { + byte[] b = new byte[cBuf.readableBytes()]; + cBuf.readBytes(b); + cBuf.discardReadComponents(); + byte[] wrapped = saslClient.wrap(b, 0, b.length); + ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); + buf.writeInt(wrapped.length); + buf.writeBytes(wrapped); + ctx.write(buf); + } + ctx.flush(); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + cBuf.release(); + cBuf = null; + } + } + + private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> { + + private final CryptoCodec codec; + + public DecryptHandler(CryptoCodec codec) { + this.codec = codec; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + ByteBuf inBuf; + boolean release = false; + if (msg.nioBufferCount() == 1) { + inBuf = msg; + } else { + inBuf = ctx.alloc().directBuffer(msg.readableBytes()); + msg.readBytes(inBuf); + release = true; + } + ByteBuffer inBuffer = inBuf.nioBuffer(); + ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); + ByteBuffer outBuffer = outBuf.nioBuffer(); + codec.decrypt(inBuffer, outBuffer); + outBuf.writerIndex(inBuf.readableBytes()); + if (release) { + inBuf.release(); + } + ctx.fireChannelRead(outBuf); + } + } + + private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> { + + private final CryptoCodec codec; + + public EncryptHandler(CryptoCodec codec) { + super(false); + this.codec = codec; + } + + @Override + protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) + throws Exception { + if (preferDirect) { + return ctx.alloc().directBuffer(msg.readableBytes()); + } else { + return ctx.alloc().buffer(msg.readableBytes()); + } + } + + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { + ByteBuf inBuf; + boolean release = false; + if (msg.nioBufferCount() == 1) { + inBuf = msg; + } else { + inBuf = ctx.alloc().directBuffer(msg.readableBytes()); + msg.readBytes(inBuf); + release = true; + } + ByteBuffer inBuffer = inBuf.nioBuffer(); + ByteBuffer outBuffer = out.nioBuffer(); + codec.encrypt(inBuffer, outBuffer); + out.writerIndex(inBuf.readableBytes()); + if (release) { + inBuf.release(); + } + } + } + + private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { + return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER + + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); + } + + private static char[] encryptionKeyToPassword(byte[] encryptionKey) { + return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray(); + } + + private static String buildUsername(Token<BlockTokenIdentifier> blockToken) { + return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8); + } + + private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { + return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8) + .toCharArray(); + } + + private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) { + Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); + saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); + saslProps.put(Sasl.SERVER_AUTH, "true"); + saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); + return saslProps; + } + + private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, + String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise) { + try { + channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), + new ProtobufVarint32FrameDecoder(), + new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), + new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise)); + } catch (SaslException e) { + saslPromise.tryFailure(e); + } + } + + static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, + int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, + Promise<Void> saslPromise) { + SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(client); + TrustedChannelResolver trustedChannelResolver = SASL_ADAPTOR.getTrustedChannelResolver(client); + AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(client); + InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); + if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { + saslPromise.trySuccess(null); + return; + } + DataEncryptionKey encryptionKey; + try { + encryptionKey = SASL_ADAPTOR.createDataEncryptionKey(client); + } catch (Exception e) { + saslPromise.tryFailure(e); + return; + } + if (encryptionKey != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + + dnInfo); + } + doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), + encryptionKeyToPassword(encryptionKey.encryptionKey), + createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise); + } else if (!UserGroupInformation.isSecurityEnabled()) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr + + ", datanodeId = " + dnInfo); + } + saslPromise.trySuccess(null); + } else if (dnInfo.getXferPort() < 1024) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client skipping handshake in secured configuration with " + + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); + } + saslPromise.trySuccess(null); + } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client skipping handshake in secured configuration with " + + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); + } + saslPromise.trySuccess(null); + } else if (saslPropsResolver != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client doing general handshake for addr = " + addr + ", datanodeId = " + + dnInfo); + } + doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), + buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise); + } else { + // It's a secured cluster using non-privileged ports, but no SASL. The only way this can + // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare + // edge case. + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client skipping handshake in secured configuration with no SASL " + + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); + } + saslPromise.trySuccess(null); + } + } + +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index b80f2c9..d5bccf0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.HConstants.REGION_SERVER_HANDLER_COUNT; -import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; +import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import io.netty.channel.EventLoop; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; @@ -37,8 +40,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -48,8 +49,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; -import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutput; -import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException; +import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; +import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; @@ -209,7 +210,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { .newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Close-WAL-Writer-%d").build()); - private volatile FanOutOneBlockAsyncDFSOutput hdfsOut; + private volatile AsyncFSOutput fsOut; private final Deque<FSWALEntry> waitingAppendEntries = new ArrayDeque<FSWALEntry>(); @@ -663,7 +664,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { final AsyncWriter oldWriter = this.writer; this.writer = nextWriter; if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) { - this.hdfsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); + this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); } this.fileLengthAtLastSync = 0L; boolean scheduleTask; @@ -721,7 +722,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { @Override DatanodeInfo[] getPipeline() { - FanOutOneBlockAsyncDFSOutput output = this.hdfsOut; + AsyncFSOutput output = this.fsOut; return output != null ? output.getPipeline() : new DatanodeInfo[0]; } http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 894f3dd..886b172 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import com.google.common.base.Throwables; +import com.google.common.primitives.Ints; + +import io.netty.channel.EventLoop; + import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; @@ -29,18 +34,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; +import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; -import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutput; -import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hdfs.DistributedFileSystem; - -import com.google.common.base.Throwables; -import com.google.common.primitives.Ints; - -import io.netty.channel.EventLoop; /** * AsyncWriter for protobuf-based WAL. @@ -97,7 +96,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements private final EventLoop eventLoop; - private FanOutOneBlockAsyncDFSOutput output; + private AsyncFSOutput output; private ByteArrayOutputStream buf; @@ -149,16 +148,15 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements this.output = null; } - public FanOutOneBlockAsyncDFSOutput getOutput() { + public AsyncFSOutput getOutput() { return this.output; } @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException { - this.output = - FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, path, - overwritable, false, replication, blockSize, eventLoop); + this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, + blockSize, eventLoop); this.buf = new ByteArrayOutputStream(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/1eac103e/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java deleted file mode 100644 index bdbf865..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java +++ /dev/null @@ -1,533 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import static io.netty.handler.timeout.IdleState.READER_IDLE; -import static io.netty.handler.timeout.IdleState.WRITER_IDLE; -import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; -import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.completeFile; -import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; -import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.getStatus; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.EventLoop; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.handler.timeout.IdleStateHandler; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.channels.CompletionHandler; -import java.util.ArrayDeque; -import java.util.Collection; -import java.util.Collections; -import java.util.Deque; -import java.util.IdentityHashMap; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import com.google.common.base.Supplier; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose; -import org.apache.hadoop.hdfs.DFSClient; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.util.DataChecksum; - -/** - * An asynchronous HDFS output stream implementation which fans out data to datanode and only - * supports writing file with only one block. - * <p> - * Use the createOutput method in {@link FanOutOneBlockAsyncDFSOutputHelper} to create. The mainly - * usage of this class is implementing WAL, so we only expose a little HDFS configurations in the - * method. And we place it here under util package because we want to make it independent of WAL - * implementation thus easier to move it to HDFS project finally. - * <p> - * Note that, all connections to datanode will run in the same {@link EventLoop} which means we only - * need one thread here. But be careful, we do some blocking operations in {@link #close()} and - * {@link #recoverAndClose(CancelableProgressable)} methods, so do not call them inside - * {@link EventLoop}. And for {@link #write(byte[])} {@link #write(byte[], int, int)}, - * {@link #buffered()} and {@link #flush(Object, CompletionHandler, boolean)}, if you call them - * outside {@link EventLoop}, there will be an extra context-switch. - * <p> - * Advantages compare to DFSOutputStream: - * <ol> - * <li>The fan out mechanism. This will reduce the latency.</li> - * <li>The asynchronous WAL could also run in the same EventLoop, we could just call write and flush - * inside the EventLoop thread, so generally we only have one thread to do all the things.</li> - * <li>Fail-fast when connection to datanode error. The WAL implementation could open new writer - * ASAP.</li> - * <li>We could benefit from netty's ByteBuf management mechanism.</li> - * </ol> - */ [email protected] -public class FanOutOneBlockAsyncDFSOutput implements Closeable { - - private final Configuration conf; - - private final FSUtils fsUtils; - - private final DistributedFileSystem dfs; - - private final DFSClient client; - - private final ClientProtocol namenode; - - private final String clientName; - - private final String src; - - private final long fileId; - - private final LocatedBlock locatedBlock; - - private final EventLoop eventLoop; - - private final List<Channel> datanodeList; - - private final DataChecksum summer; - - private final ByteBufAllocator alloc; - - private static final class Callback { - - public final Promise<Void> promise; - - public final long ackedLength; - - public final Set<Channel> unfinishedReplicas; - - public Callback(Promise<Void> promise, long ackedLength, Collection<Channel> replicas) { - this.promise = promise; - this.ackedLength = ackedLength; - if (replicas.isEmpty()) { - this.unfinishedReplicas = Collections.emptySet(); - } else { - this.unfinishedReplicas = Collections - .newSetFromMap(new IdentityHashMap<Channel, Boolean>(replicas.size())); - this.unfinishedReplicas.addAll(replicas); - } - } - } - - private final Deque<Callback> waitingAckQueue = new ArrayDeque<>(); - - // this could be different from acked block length because a packet can not start at the middle of - // a chunk. - private long nextPacketOffsetInBlock = 0L; - - private long nextPacketSeqno = 0L; - - private ByteBuf buf; - - private enum State { - STREAMING, CLOSING, BROKEN, CLOSED - } - - private State state; - - private void completed(Channel channel) { - if (waitingAckQueue.isEmpty()) { - return; - } - for (Callback c : waitingAckQueue) { - if (c.unfinishedReplicas.remove(channel)) { - if (c.unfinishedReplicas.isEmpty()) { - c.promise.trySuccess(null); - // since we will remove the Callback entry from waitingAckQueue if its unfinishedReplicas - // is empty, so this could only happen at the head of waitingAckQueue, so we just call - // removeFirst here. - waitingAckQueue.removeFirst(); - // also wake up flush requests which have the same length. - for (Callback cb; (cb = waitingAckQueue.peekFirst()) != null;) { - if (cb.ackedLength == c.ackedLength) { - cb.promise.trySuccess(null); - waitingAckQueue.removeFirst(); - } else { - break; - } - } - } - return; - } - } - } - - private void failed(Channel channel, Supplier<Throwable> errorSupplier) { - if (state == State.BROKEN || state == State.CLOSED) { - return; - } - if (state == State.CLOSING) { - Callback c = waitingAckQueue.peekFirst(); - if (c == null || !c.unfinishedReplicas.contains(channel)) { - // nothing, the endBlock request has already finished. - return; - } - } - // disable further write, and fail all pending ack. - state = State.BROKEN; - Throwable error = errorSupplier.get(); - for (Callback c : waitingAckQueue) { - c.promise.tryFailure(error); - } - waitingAckQueue.clear(); - for (Channel ch : datanodeList) { - ch.close(); - } - } - - private void setupReceiver(final int timeoutMs) { - SimpleChannelInboundHandler<PipelineAckProto> ackHandler = new SimpleChannelInboundHandler<PipelineAckProto>() { - - @Override - public boolean isSharable() { - return true; - } - - @Override - protected void channelRead0(final ChannelHandlerContext ctx, PipelineAckProto ack) - throws Exception { - final Status reply = getStatus(ack); - if (reply != Status.SUCCESS) { - failed(ctx.channel(), new Supplier<Throwable>() { - - @Override - public Throwable get() { - return new IOException("Bad response " + reply + " for block " - + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()); - } - }); - return; - } - if (PipelineAck.isRestartOOBStatus(reply)) { - failed(ctx.channel(), new Supplier<Throwable>() { - - @Override - public Throwable get() { - return new IOException("Restart response " + reply + " for block " - + locatedBlock.getBlock() + " from datanode " + ctx.channel().remoteAddress()); - } - }); - return; - } - if (ack.getSeqno() == HEART_BEAT_SEQNO) { - return; - } - completed(ctx.channel()); - } - - @Override - public void channelInactive(final ChannelHandlerContext ctx) throws Exception { - failed(ctx.channel(), new Supplier<Throwable>() { - - @Override - public Throwable get() { - return new IOException("Connection to " + ctx.channel().remoteAddress() + " closed"); - } - }); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, final Throwable cause) - throws Exception { - failed(ctx.channel(), new Supplier<Throwable>() { - - @Override - public Throwable get() { - return cause; - } - }); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent) { - IdleStateEvent e = (IdleStateEvent) evt; - if (e.state() == READER_IDLE) { - failed(ctx.channel(), new Supplier<Throwable>() { - - @Override - public Throwable get() { - return new IOException("Timeout(" + timeoutMs + "ms) waiting for response"); - } - }); - } else if (e.state() == WRITER_IDLE) { - PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); - int len = heartbeat.getSerializedSize(); - ByteBuf buf = alloc.buffer(len); - heartbeat.putInBuffer(buf.nioBuffer(0, len)); - buf.writerIndex(len); - ctx.channel().writeAndFlush(buf); - } - return; - } - super.userEventTriggered(ctx, evt); - } - - }; - for (Channel ch : datanodeList) { - ch.pipeline().addLast( - new IdleStateHandler(timeoutMs, timeoutMs / 2, 0, TimeUnit.MILLISECONDS), - new ProtobufVarint32FrameDecoder(), - new ProtobufDecoder(PipelineAckProto.getDefaultInstance()), ackHandler); - ch.config().setAutoRead(true); - } - } - - FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, DistributedFileSystem dfs, - DFSClient client, ClientProtocol namenode, String clientName, String src, long fileId, - LocatedBlock locatedBlock, EventLoop eventLoop, List<Channel> datanodeList, - DataChecksum summer, ByteBufAllocator alloc) { - this.conf = conf; - this.fsUtils = fsUtils; - this.dfs = dfs; - this.client = client; - this.namenode = namenode; - this.fileId = fileId; - this.clientName = clientName; - this.src = src; - this.locatedBlock = locatedBlock; - this.eventLoop = eventLoop; - this.datanodeList = datanodeList; - this.summer = summer; - this.alloc = alloc; - this.buf = alloc.directBuffer(); - this.state = State.STREAMING; - setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT)); - } - - /** - * Just call write(b, 0, b.length). - * @see #write(byte[], int, int) - */ - public void write(byte[] b) { - write(b, 0, b.length); - } - - /** - * Copy the data into the buffer. Note that you need to call - * {@link #flush(Object, CompletionHandler, boolean)} to flush the buffer manually. - */ - public void write(final byte[] b, final int off, final int len) { - if (eventLoop.inEventLoop()) { - buf.ensureWritable(len).writeBytes(b, off, len); - } else { - eventLoop.submit(new Runnable() { - - @Override - public void run() { - buf.ensureWritable(len).writeBytes(b, off, len); - } - }).syncUninterruptibly(); - } - } - - /** - * Return the current size of buffered data. - */ - public int buffered() { - if (eventLoop.inEventLoop()) { - return buf.readableBytes(); - } else { - return eventLoop.submit(new Callable<Integer>() { - - @Override - public Integer call() throws Exception { - return buf.readableBytes(); - } - }).syncUninterruptibly().getNow().intValue(); - } - } - - public DatanodeInfo[] getPipeline() { - return locatedBlock.getLocations(); - } - - private <A> void flush0(final A attachment, final CompletionHandler<Long, ? super A> handler, - boolean syncBlock) { - if (state != State.STREAMING) { - handler.failed(new IOException("stream already broken"), attachment); - return; - } - int dataLen = buf.readableBytes(); - final long ackedLength = nextPacketOffsetInBlock + dataLen; - if (ackedLength == locatedBlock.getBlock().getNumBytes()) { - // no new data, just return - handler.completed(locatedBlock.getBlock().getNumBytes(), attachment); - return; - } - Promise<Void> promise = eventLoop.newPromise(); - promise.addListener(new FutureListener<Void>() { - - @Override - public void operationComplete(Future<Void> future) throws Exception { - if (future.isSuccess()) { - locatedBlock.getBlock().setNumBytes(ackedLength); - handler.completed(ackedLength, attachment); - } else { - handler.failed(future.cause(), attachment); - } - } - }); - Callback c = waitingAckQueue.peekLast(); - if (c != null && ackedLength == c.ackedLength) { - // just append it to the tail of waiting ack queue,, do not issue new hflush request. - waitingAckQueue - .addLast(new Callback(promise, ackedLength, Collections.<Channel> emptyList())); - return; - } - int chunkLen = summer.getBytesPerChecksum(); - int trailingPartialChunkLen = dataLen % chunkLen; - int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0); - int checksumLen = numChecks * summer.getChecksumSize(); - ByteBuf checksumBuf = alloc.directBuffer(checksumLen); - summer.calculateChunkedSums(buf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen)); - checksumBuf.writerIndex(checksumLen); - PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock, - nextPacketSeqno, false, dataLen, syncBlock); - int headerLen = header.getSerializedSize(); - ByteBuf headerBuf = alloc.buffer(headerLen); - header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); - headerBuf.writerIndex(headerLen); - - waitingAckQueue.addLast(new Callback(promise, ackedLength, datanodeList)); - for (Channel ch : datanodeList) { - ch.write(headerBuf.duplicate().retain()); - ch.write(checksumBuf.duplicate().retain()); - ch.writeAndFlush(buf.duplicate().retain()); - } - checksumBuf.release(); - headerBuf.release(); - ByteBuf newBuf = alloc.directBuffer().ensureWritable(trailingPartialChunkLen); - if (trailingPartialChunkLen != 0) { - buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen); - } - buf.release(); - this.buf = newBuf; - nextPacketOffsetInBlock += dataLen - trailingPartialChunkLen; - nextPacketSeqno++; - } - - /** - * Flush the buffer out to datanodes. - * @param attachment will be passed to handler when completed. - * @param handler will set the acked length as result when completed. - * @param syncBlock will call hsync if true, otherwise hflush. - */ - public <A> void flush(final A attachment, final CompletionHandler<Long, ? super A> handler, - final boolean syncBlock) { - if (eventLoop.inEventLoop()) { - flush0(attachment, handler, syncBlock); - } else { - eventLoop.execute(new Runnable() { - - @Override - public void run() { - flush0(attachment, handler, syncBlock); - } - }); - } - } - - private void endBlock(Promise<Void> promise, long size) { - if (state != State.STREAMING) { - promise.tryFailure(new IOException("stream already broken")); - return; - } - if (!waitingAckQueue.isEmpty()) { - promise.tryFailure(new IllegalStateException("should call flush first before calling close")); - return; - } - state = State.CLOSING; - PacketHeader header = new PacketHeader(4, size, nextPacketSeqno, true, 0, false); - buf.release(); - buf = null; - int headerLen = header.getSerializedSize(); - ByteBuf headerBuf = alloc.buffer(headerLen); - header.putInBuffer(headerBuf.nioBuffer(0, headerLen)); - headerBuf.writerIndex(headerLen); - waitingAckQueue.add(new Callback(promise, size, datanodeList)); - for (Channel ch : datanodeList) { - ch.writeAndFlush(headerBuf.duplicate().retain()); - } - headerBuf.release(); - } - - /** - * The close method when error occurred. Now we just call recoverFileLease. - */ - public void recoverAndClose(CancelableProgressable reporter) throws IOException { - assert !eventLoop.inEventLoop(); - for (Channel ch : datanodeList) { - ch.closeFuture().awaitUninterruptibly(); - } - endFileLease(client, src, fileId); - fsUtils.recoverFileLease(dfs, new Path(src), conf, - reporter == null ? new CancelOnClose(client) : reporter); - } - - /** - * End the current block and complete file at namenode. You should call - * {@link #recoverAndClose(CancelableProgressable)} if this method throws an exception. - */ - @Override - public void close() throws IOException { - assert !eventLoop.inEventLoop(); - final Promise<Void> promise = eventLoop.newPromise(); - eventLoop.execute(new Runnable() { - - @Override - public void run() { - endBlock(promise, nextPacketOffsetInBlock + buf.readableBytes()); - } - }); - promise.addListener(new FutureListener<Void>() { - - @Override - public void operationComplete(Future<Void> future) throws Exception { - for (Channel ch : datanodeList) { - ch.close(); - } - } - }).syncUninterruptibly(); - for (Channel ch : datanodeList) { - ch.closeFuture().awaitUninterruptibly(); - } - completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId); - } -}
