Repository: hbase Updated Branches: refs/heads/master 2e813f106 -> 11467ef11
HBASE-19394 Support multi-homing env for the publication of RS status with multicast (hbase.status.published) (Toshihiro Suzuki) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/11467ef1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/11467ef1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/11467ef1 Branch: refs/heads/master Commit: 11467ef11126c77b3692479730752ee38dff20a6 Parents: 2e813f1 Author: tedyu <[email protected]> Authored: Tue Dec 12 07:38:15 2017 -0800 Committer: tedyu <[email protected]> Committed: Tue Dec 12 07:38:15 2017 -0800 ---------------------------------------------------------------------- .../hbase/client/ClusterStatusListener.java | 37 ++++--- .../org/apache/hadoop/hbase/HConstants.java | 22 +++- .../hbase/master/ClusterStatusPublisher.java | 100 +++++++++++-------- 3 files changed, 97 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/11467ef1/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java index 12e9a60..7f20436 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterStatusListener.java @@ -20,16 +20,6 @@ package org.apache.hadoop.hbase.client; -import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap; -import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufInputStream; -import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext; -import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption; -import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler; -import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel; import java.io.Closeable; import java.io.IOException; @@ -48,13 +38,23 @@ import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Threads; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap; +import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBufInputStream; +import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext; +import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; /** * A class that receives the cluster status, and provide it as a set of service to the client. @@ -104,7 +104,7 @@ class ClusterStatusListener implements Closeable { * Called to connect. * * @param conf Configuration to use. - * @throws IOException + * @throws IOException if failing to connect */ void connect(Configuration conf) throws IOException; } @@ -197,6 +197,7 @@ class ClusterStatusListener implements Closeable { HConstants.DEFAULT_STATUS_MULTICAST_BIND_ADDRESS); int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT, HConstants.DEFAULT_STATUS_MULTICAST_PORT); + String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME); InetAddress ina; try { @@ -219,7 +220,13 @@ class ClusterStatusListener implements Closeable { throw ExceptionUtil.asInterrupt(e); } - NetworkInterface ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress()); + NetworkInterface ni; + if (niName != null) { + ni = NetworkInterface.getByName(niName); + } else { + ni = NetworkInterface.getByInetAddress(Addressing.getIpAddress()); + } + channel.joinGroup(ina, ni, null, channel.newPromise()); } http://git-wip-us.apache.org/repos/asf/hbase/blob/11467ef1/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 14ce089..0adce07 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -28,8 +28,8 @@ import java.util.UUID; import java.util.regex.Pattern; import org.apache.commons.lang3.ArrayUtils; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; /** * HConstants holds a bunch of HBase-related constants @@ -550,7 +550,8 @@ public final class HConstants { /** * Timestamp to use when we want to refer to the latest cell. * - * On client side, this is the timestamp set by default when no timestamp is specified, to refer to the latest. + * On client side, this is the timestamp set by default when no timestamp is specified, + * to refer to the latest. * On server side, this acts as a notation. * (1) For a cell of Put, which has this notation, * its timestamp will be replaced with server's current time. @@ -559,7 +560,8 @@ public final class HConstants { * a. When the count of cell it gets is less than the count of cell to delete, * the timestamp of Delete cell will be replaced with server's current time. * b. When the count of cell it gets is equal to the count of cell to delete, - * the timestamp of Delete cell will be replaced with the latest timestamp of cell it gets. + * the timestamp of Delete cell will be replaced with the latest timestamp of cell it + * gets. * (c. It is invalid and an exception will be thrown, * if the count of cell it gets is greater than the count of cell to delete, * as the max version of Get is set to the count of cell to delete.) @@ -576,7 +578,7 @@ public final class HConstants { * Special! Used in fake Cells only. Should never be the timestamp on an actual Cell returned to * a client. * @deprecated Should not be public since hbase-1.3.0. For internal use only. Move internal to - * Scanners flagged as special timestamp value never to be returned as timestamp on a Cell. + * Scanners flagged as special timestamp value never to be returned as timestamp on a Cell. */ @Deprecated public static final long OLDEST_TIMESTAMP = Long.MIN_VALUE; @@ -1157,6 +1159,18 @@ public final class HConstants { public static final String STATUS_MULTICAST_PORT = "hbase.status.multicast.address.port"; public static final int DEFAULT_STATUS_MULTICAST_PORT = 16100; + /** + * The network interface name to use for the multicast messages. + */ + public static final String STATUS_MULTICAST_NI_NAME = "hbase.status.multicast.ni.name"; + + /** + * The address to use for binding the local socket for sending multicast. Defaults to 0.0.0.0. + */ + public static final String STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS = + "hbase.status.multicast.publisher.bind.address.ip"; + public static final String DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS = "0.0.0.0"; + public static final long NO_NONCE = 0; /** Default cipher for encryption */ http://git-wip-us.apache.org/repos/asf/hbase/blob/11467ef1/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java index 63cc96e..cbf4b1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ClusterStatusPublisher.java @@ -21,22 +21,6 @@ package org.apache.hadoop.hbase.master; -import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap; -import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.ChannelFactory; -import org.apache.hadoop.hbase.shaded.io.netty.buffer.Unpooled; -import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; -import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelException; -import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext; -import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption; -import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.InternetProtocolFamily; -import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel; -import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.MessageToMessageEncoder; -import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil; - import java.io.Closeable; import java.io.IOException; import java.net.Inet6Address; @@ -58,9 +42,6 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -68,6 +49,25 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap; +import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.ChannelFactory; +import org.apache.hadoop.hbase.shaded.io.netty.buffer.Unpooled; +import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelException; +import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext; +import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.InternetProtocolFamily; +import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel; +import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.MessageToMessageEncoder; +import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; /** @@ -252,6 +252,9 @@ public class ClusterStatusPublisher extends ScheduledChore { HConstants.DEFAULT_STATUS_MULTICAST_ADDRESS); int port = conf.getInt(HConstants.STATUS_MULTICAST_PORT, HConstants.DEFAULT_STATUS_MULTICAST_PORT); + String bindAddress = conf.get(HConstants.STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS, + HConstants.DEFAULT_STATUS_MULTICAST_PUBLISHER_BIND_ADDRESS); + String niName = conf.get(HConstants.STATUS_MULTICAST_NI_NAME); final InetAddress ina; try { @@ -264,24 +267,34 @@ public class ClusterStatusPublisher extends ScheduledChore { final InetSocketAddress isa = new InetSocketAddress(mcAddress, port); InternetProtocolFamily family; - InetAddress localAddress; - if (ina instanceof Inet6Address) { - localAddress = Addressing.getIp6Address(); - family = InternetProtocolFamily.IPv6; - }else{ - localAddress = Addressing.getIp4Address(); - family = InternetProtocolFamily.IPv4; + NetworkInterface ni; + if (niName != null) { + if (ina instanceof Inet6Address) { + family = InternetProtocolFamily.IPv6; + } else { + family = InternetProtocolFamily.IPv4; + } + ni = NetworkInterface.getByName(niName); + } else { + InetAddress localAddress; + if (ina instanceof Inet6Address) { + localAddress = Addressing.getIp6Address(); + family = InternetProtocolFamily.IPv6; + } else { + localAddress = Addressing.getIp4Address(); + family = InternetProtocolFamily.IPv4; + } + ni = NetworkInterface.getByInetAddress(localAddress); } - NetworkInterface ni = NetworkInterface.getByInetAddress(localAddress); Bootstrap b = new Bootstrap(); b.group(group) - .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family)) - .option(ChannelOption.SO_REUSEADDR, true) - .handler(new ClusterStatusEncoder(isa)); + .channelFactory(new HBaseDatagramChannelFactory<Channel>(NioDatagramChannel.class, family)) + .option(ChannelOption.SO_REUSEADDR, true) + .handler(new ClusterStatusEncoder(isa)); try { - channel = (DatagramChannel) b.bind(new InetSocketAddress(0)).sync().channel(); + channel = (DatagramChannel) b.bind(bindAddress, 0).sync().channel(); channel.joinGroup(ina, ni, null, channel.newPromise()).sync(); channel.connect(isa).sync(); } catch (InterruptedException e) { @@ -290,33 +303,34 @@ public class ClusterStatusPublisher extends ScheduledChore { } } - private static final class HBaseDatagramChannelFactory<T extends Channel> implements ChannelFactory<T> { + private static final class HBaseDatagramChannelFactory<T extends Channel> + implements ChannelFactory<T> { private final Class<? extends T> clazz; private InternetProtocolFamily family; HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) { - this.clazz = clazz; - this.family = family; + this.clazz = clazz; + this.family = family; } @Override public T newChannel() { - try { - return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(), - new Class[] { InternetProtocolFamily.class }, new Object[] { family }); + try { + return ReflectionUtils.instantiateWithCustomCtor(clazz.getName(), + new Class[] { InternetProtocolFamily.class }, new Object[] { family }); - } catch (Throwable t) { - throw new ChannelException("Unable to create Channel from class " + clazz, t); - } + } catch (Throwable t) { + throw new ChannelException("Unable to create Channel from class " + clazz, t); + } } @Override public String toString() { - return StringUtil.simpleClassName(clazz) + ".class"; + return StringUtil.simpleClassName(clazz) + ".class"; } - } + } - private static class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus> { + private static final class ClusterStatusEncoder extends MessageToMessageEncoder<ClusterStatus> { final private InetSocketAddress isa; private ClusterStatusEncoder(InetSocketAddress isa) {
