HBASE-14866 VerifyReplication and ReplicationAdmin should use full peer configuration for peer connection
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5dec5ad2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5dec5ad2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5dec5ad2 Branch: refs/heads/branch-1 Commit: 5dec5ad250322ba3dab8ff9800e82c039e4dce2e Parents: 967873b Author: Gary Helmling <[email protected]> Authored: Wed Dec 9 16:47:25 2015 -0800 Committer: Gary Helmling <[email protected]> Committed: Wed Dec 9 16:47:25 2015 -0800 ---------------------------------------------------------------------- .../client/replication/ReplicationAdmin.java | 14 +- .../replication/ReplicationPeersZKImpl.java | 7 +- .../replication/ReplicationStateZKBase.java | 3 +- .../apache/hadoop/hbase/zookeeper/ZKConfig.java | 350 ------------- .../apache/hadoop/hbase/zookeeper/ZKUtil.java | 124 ----- .../hadoop/hbase/zookeeper/TestZKUtil.java | 11 - .../apache/hadoop/hbase/HBaseConfiguration.java | 83 +++- .../apache/hadoop/hbase/zookeeper/ZKConfig.java | 495 +++++++++++++++++++ .../hadoop/hbase/TestHBaseConfiguration.java | 10 +- .../hadoop/hbase/zookeeper/TestZKConfig.java | 126 +++++ .../hadoop/hbase/mapreduce/SyncTable.java | 15 +- .../hbase/mapreduce/TableMapReduceUtil.java | 35 +- .../hbase/mapreduce/TableOutputFormat.java | 22 +- .../replication/VerifyReplication.java | 25 +- .../hbase/util/ServerRegionReplicaUtil.java | 4 +- .../org/apache/hadoop/hbase/TestZooKeeper.java | 65 --- .../replication/TestReplicationAdmin.java | 36 +- .../replication/TestReplicationEndpoint.java | 10 +- .../replication/TestReplicationStateBasic.java | 4 +- .../replication/TestReplicationStateZKImpl.java | 5 +- .../TestRegionReplicaReplicationEndpoint.java | 8 +- .../hadoop/hbase/zookeeper/TestZKConfig.java | 56 --- 22 files changed, 818 insertions(+), 690 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 3a83d13..24a3dcb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl; import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -627,7 +626,8 @@ public class ReplicationAdmin implements Closeable { } } - private List<ReplicationPeer> listValidReplicationPeers() { + @VisibleForTesting + List<ReplicationPeer> listValidReplicationPeers() { Map<String, ReplicationPeerConfig> peers = listPeerConfigs(); if (peers == null || peers.size() <= 0) { return null; @@ -635,18 +635,16 @@ public class ReplicationAdmin implements Closeable { List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size()); for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) { String peerId = peerEntry.getKey(); - String clusterKey = peerEntry.getValue().getClusterKey(); - Configuration peerConf = new Configuration(this.connection.getConfiguration()); Stat s = null; try { - ZKUtil.applyClusterKeyToConf(peerConf, clusterKey); Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId); + Configuration peerConf = pair.getSecond(); ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst()); s = zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT), null); if (null == s) { - LOG.info(peerId + ' ' + clusterKey + " is invalid now."); + LOG.info(peerId + ' ' + pair.getFirst().getClusterKey() + " is invalid now."); continue; } validPeers.add(peer); @@ -664,10 +662,6 @@ public class ReplicationAdmin implements Closeable { LOG.warn("Failed to get valid replication peers due to InterruptedException."); LOG.debug("Failure details to get valid replication peers.", e); continue; - } catch (IOException e) { - LOG.warn("Failed to get valid replication peers due to IOException."); - LOG.debug("Failure details to get valid replication peers.", e); - continue; } } return validPeers; http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index a7d7dda..7099bfc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; @@ -318,11 +319,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re return null; } - Configuration otherConf = new Configuration(this.conf); + Configuration otherConf; try { - if (peerConfig.getClusterKey() != null && !peerConfig.getClusterKey().isEmpty()) { - ZKUtil.applyClusterKeyToConf(otherConf, peerConfig.getClusterKey()); - } + otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); } catch (IOException e) { LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e); return null; http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index 1691b3f..4fbac0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; @@ -69,7 +70,7 @@ public abstract class ReplicationStateZKBase { String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); - this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf); + this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf); this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName); this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName); this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName); http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java deleted file mode 100644 index 15752c2..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java +++ /dev/null @@ -1,350 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.zookeeper; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map.Entry; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; - -/** - * Utility methods for reading, and building the ZooKeeper configuration. - * - * The order and priority for reading the config are as follows: - * (1). zoo.cfg if ""hbase.config.read.zookeeper.config" is true - * (2). Property with "hbase.zookeeper.property." prefix from HBase XML - * (3). other zookeeper related properties in HBASE XML - */ [email protected] -public class ZKConfig { - private static final Log LOG = LogFactory.getLog(ZKConfig.class); - - private static final String VARIABLE_START = "${"; - private static final int VARIABLE_START_LENGTH = VARIABLE_START.length(); - private static final String VARIABLE_END = "}"; - private static final int VARIABLE_END_LENGTH = VARIABLE_END.length(); - - /** - * Make a Properties object holding ZooKeeper config. - * Parses the corresponding config options from the HBase XML configs - * and generates the appropriate ZooKeeper properties. - * @param conf Configuration to read from. - * @return Properties holding mappings representing ZooKeeper config file. - */ - public static Properties makeZKProps(Configuration conf) { - Properties zkProperties = makeZKPropsFromZooCfg(conf); - - if (zkProperties == null) { - // Otherwise, use the configuration options from HBase's XML files. - zkProperties = makeZKPropsFromHbaseConfig(conf); - } - return zkProperties; - } - - /** - * Parses the corresponding config options from the zoo.cfg file - * and make a Properties object holding the Zookeeper config. - * - * @param conf Configuration to read from. - * @return Properties holding mappings representing the ZooKeeper config file or null if - * the HBASE_CONFIG_READ_ZOOKEEPER_CONFIG is false or the file does not exist. - */ - private static Properties makeZKPropsFromZooCfg(Configuration conf) { - if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG, false)) { - LOG.warn( - "Parsing ZooKeeper's " + HConstants.ZOOKEEPER_CONFIG_NAME + - " file for ZK properties " + - "has been deprecated. Please instead place all ZK related HBase " + - "configuration under the hbase-site.xml, using prefixes " + - "of the form '" + HConstants.ZK_CFG_PROPERTY_PREFIX + "', and " + - "set property '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG + - "' to false"); - // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read - // it and grab its configuration properties. - ClassLoader cl = HQuorumPeer.class.getClassLoader(); - final InputStream inputStream = - cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME); - if (inputStream != null) { - try { - return parseZooCfg(conf, inputStream); - } catch (IOException e) { - LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME + - ", loading from XML files", e); - } - } - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Skipped reading ZK properties file '" + HConstants.ZOOKEEPER_CONFIG_NAME + - "' since '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG + "' was not set to true"); - } - } - - return null; - } - - /** - * Make a Properties object holding ZooKeeper config. - * Parses the corresponding config options from the HBase XML configs - * and generates the appropriate ZooKeeper properties. - * - * @param conf Configuration to read from. - * @return Properties holding mappings representing ZooKeeper config file. - */ - private static Properties makeZKPropsFromHbaseConfig(Configuration conf) { - Properties zkProperties = new Properties(); - - // Directly map all of the hbase.zookeeper.property.KEY properties. - // Synchronize on conf so no loading of configs while we iterate - synchronized (conf) { - for (Entry<String, String> entry : conf) { - String key = entry.getKey(); - if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { - String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN); - String value = entry.getValue(); - // If the value has variables substitutions, need to do a get. - if (value.contains(VARIABLE_START)) { - value = conf.get(key); - } - zkProperties.put(zkKey, value); - } - } - } - - // If clientPort is not set, assign the default. - if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) { - zkProperties.put(HConstants.CLIENT_PORT_STR, - HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); - } - - // Create the server.X properties. - int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888); - int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888); - - final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, - HConstants.LOCALHOST); - String serverHost; - String address; - String key; - for (int i = 0; i < serverHosts.length; ++i) { - if (serverHosts[i].contains(":")) { - serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':')); - } else { - serverHost = serverHosts[i]; - } - address = serverHost + ":" + peerPort + ":" + leaderPort; - key = "server." + i; - zkProperties.put(key, address); - } - - return zkProperties; - } - - /** - * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in. - * This method is used for testing so we can pass our own InputStream. - * @param conf HBaseConfiguration to use for injecting variables. - * @param inputStream InputStream to read from. - * @return Properties parsed from config stream with variables substituted. - * @throws IOException if anything goes wrong parsing config - * @deprecated in 0.96 onwards. HBase will no longer rely on zoo.cfg - * availability. - */ - @Deprecated - public static Properties parseZooCfg(Configuration conf, - InputStream inputStream) throws IOException { - Properties properties = new Properties(); - try { - properties.load(inputStream); - } catch (IOException e) { - final String msg = "fail to read properties from " - + HConstants.ZOOKEEPER_CONFIG_NAME; - LOG.fatal(msg); - throw new IOException(msg, e); - } - for (Entry<Object, Object> entry : properties.entrySet()) { - String value = entry.getValue().toString().trim(); - String key = entry.getKey().toString().trim(); - StringBuilder newValue = new StringBuilder(); - int varStart = value.indexOf(VARIABLE_START); - int varEnd = 0; - while (varStart != -1) { - varEnd = value.indexOf(VARIABLE_END, varStart); - if (varEnd == -1) { - String msg = "variable at " + varStart + " has no end marker"; - LOG.fatal(msg); - throw new IOException(msg); - } - String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd); - - String substituteValue = System.getProperty(variable); - if (substituteValue == null) { - substituteValue = conf.get(variable); - } - if (substituteValue == null) { - String msg = "variable " + variable + " not set in system property " - + "or hbase configs"; - LOG.fatal(msg); - throw new IOException(msg); - } - - newValue.append(substituteValue); - - varEnd += VARIABLE_END_LENGTH; - varStart = value.indexOf(VARIABLE_START, varEnd); - } - // Special case for 'hbase.cluster.distributed' property being 'true' - if (key.startsWith("server.")) { - boolean mode = - conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); - if (mode == HConstants.CLUSTER_IS_DISTRIBUTED && value.startsWith(HConstants.LOCALHOST)) { - String msg = "The server in zoo.cfg cannot be set to localhost " + - "in a fully-distributed setup because it won't be reachable. " + - "See \"Getting Started\" for more information."; - LOG.fatal(msg); - throw new IOException(msg); - } - } - newValue.append(value.substring(varEnd)); - properties.setProperty(key, newValue.toString()); - } - return properties; - } - - /** - * Return the ZK Quorum servers string given zk properties returned by - * makeZKProps - * @param properties - * @return Quorum servers String - */ - private static String getZKQuorumServersString(Properties properties) { - String clientPort = null; - List<String> servers = new ArrayList<String>(); - - // The clientPort option may come after the server.X hosts, so we need to - // grab everything and then create the final host:port comma separated list. - boolean anyValid = false; - for (Entry<Object,Object> property : properties.entrySet()) { - String key = property.getKey().toString().trim(); - String value = property.getValue().toString().trim(); - if (key.equals("clientPort")) { - clientPort = value; - } - else if (key.startsWith("server.")) { - String host = value.substring(0, value.indexOf(':')); - servers.add(host); - anyValid = true; - } - } - - if (!anyValid) { - LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME); - return null; - } - - if (clientPort == null) { - LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME); - return null; - } - - if (servers.isEmpty()) { - LOG.fatal("No servers were found in provided ZooKeeper configuration. " + - "HBase must have a ZooKeeper cluster configured for its " + - "operation. Ensure that you've configured '" + - HConstants.ZOOKEEPER_QUORUM + "' properly."); - return null; - } - - StringBuilder hostPortBuilder = new StringBuilder(); - for (int i = 0; i < servers.size(); ++i) { - String host = servers.get(i); - if (i > 0) { - hostPortBuilder.append(','); - } - hostPortBuilder.append(host); - hostPortBuilder.append(':'); - hostPortBuilder.append(clientPort); - } - - return hostPortBuilder.toString(); - } - - /** - * Return the ZK Quorum servers string given the specified configuration - * - * @param conf - * @return Quorum servers String - */ - private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) { - String defaultClientPort = Integer.toString( - conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT)); - - // Build the ZK quorum server string with "server:clientport" list, separated by ',' - final String[] serverHosts = - conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST); - return buildQuorumServerString(serverHosts, defaultClientPort); - } - - /** - * Build the ZK quorum server string with "server:clientport" list, separated by ',' - * - * @param serverHosts a list of servers for ZK quorum - * @param clientPort the default client port - * @return the string for a list of "server:port" separated by "," - */ - public static String buildQuorumServerString(String[] serverHosts, String clientPort) { - StringBuilder quorumStringBuilder = new StringBuilder(); - String serverHost; - for (int i = 0; i < serverHosts.length; ++i) { - if (serverHosts[i].contains(":")) { - serverHost = serverHosts[i]; // just use the port specified from the input - } else { - serverHost = serverHosts[i] + ":" + clientPort; - } - if (i > 0) { - quorumStringBuilder.append(','); - } - quorumStringBuilder.append(serverHost); - } - return quorumStringBuilder.toString(); - } - - /** - * Return the ZK Quorum servers string given the specified configuration. - * @param conf - * @return Quorum servers - */ - public static String getZKQuorumServersString(Configuration conf) { - // First try zoo.cfg; if not applicable, then try config XML. - Properties zkProperties = makeZKPropsFromZooCfg(conf); - - if (zkProperties != null) { - return getZKQuorumServersString(zkProperties); - } - - return getZKQuorumServersStringFromHbaseConfig(conf); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index ffbe2db..bf803be 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -76,7 +76,6 @@ import org.apache.zookeeper.proto.DeleteRequest; import org.apache.zookeeper.proto.SetDataRequest; import org.apache.zookeeper.server.ZooKeeperSaslServer; -import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.InvalidProtocolBufferException; /** @@ -96,25 +95,6 @@ public class ZKUtil { public static final char ZNODE_PATH_SEPARATOR = '/'; private static int zkDumpConnectionTimeOut; - // The Quorum for the ZK cluster can have one the following format (see examples below): - // (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort) - // (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server, - // in this case, the clientPort would be ignored) - // (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use - // the clientPort; otherwise, it would use the specified port) - @VisibleForTesting - public static class ZKClusterKey { - public String quorumString; - public int clientPort; - public String znodeParent; - - ZKClusterKey(String quorumString, int clientPort, String znodeParent) { - this.quorumString = quorumString; - this.clientPort = clientPort; - this.znodeParent = znodeParent; - } - } - /** * Creates a new connection to ZooKeeper, pulling settings and ensemble config * from the specified configuration object using methods from {@link ZKConfig}. @@ -365,110 +345,6 @@ public class ZKUtil { return path.substring(path.lastIndexOf("/")+1); } - /** - * Get the key to the ZK ensemble for this configuration without - * adding a name at the end - * @param conf Configuration to use to build the key - * @return ensemble key without a name - */ - public static String getZooKeeperClusterKey(Configuration conf) { - return getZooKeeperClusterKey(conf, null); - } - - /** - * Get the key to the ZK ensemble for this configuration and append - * a name at the end - * @param conf Configuration to use to build the key - * @param name Name that should be appended at the end if not empty or null - * @return ensemble key with a name (if any) - */ - public static String getZooKeeperClusterKey(Configuration conf, String name) { - String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll( - "[\\t\\n\\x0B\\f\\r]", ""); - StringBuilder builder = new StringBuilder(ensemble); - builder.append(":"); - builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)); - builder.append(":"); - builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); - if (name != null && !name.isEmpty()) { - builder.append(","); - builder.append(name); - } - return builder.toString(); - } - - /** - * Apply the settings in the given key to the given configuration, this is - * used to communicate with distant clusters - * @param conf configuration object to configure - * @param key string that contains the 3 required configuratins - * @throws IOException - */ - public static void applyClusterKeyToConf(Configuration conf, String key) - throws IOException{ - ZKClusterKey zkClusterKey = transformClusterKey(key); - conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.quorumString); - conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.clientPort); - conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.znodeParent); - } - - /** - * Separate the given key into the three configurations it should contain: - * hbase.zookeeper.quorum, hbase.zookeeper.client.port - * and zookeeper.znode.parent - * @param key - * @return the three configuration in the described order - * @throws IOException - */ - public static ZKClusterKey transformClusterKey(String key) throws IOException { - String[] parts = key.split(":"); - - if (parts.length == 3) { - return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]); - } - - if (parts.length > 3) { - // The quorum could contain client port in server:clientport format, try to transform more. - String zNodeParent = parts [parts.length - 1]; - String clientPort = parts [parts.length - 2]; - - // The first part length is the total length minus the lengths of other parts and minus 2 ":" - int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2; - String quorumStringInput = key.substring(0, endQuorumIndex); - String[] serverHosts = quorumStringInput.split(","); - - // The common case is that every server has its own client port specified - this means - // that (total parts - the ZNodeParent part - the ClientPort part) is equal to - // (the number of "," + 1) - "+ 1" because the last server has no ",". - if ((parts.length - 2) == (serverHosts.length + 1)) { - return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent); - } - - // For the uncommon case that some servers has no port specified, we need to build the - // server:clientport list using default client port for servers without specified port. - return new ZKClusterKey( - ZKConfig.buildQuorumServerString(serverHosts, clientPort), - Integer.parseInt(clientPort), - zNodeParent); - } - - throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" + - HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":" - + HConstants.ZOOKEEPER_ZNODE_PARENT); - } - - /** - * Standardize the ZK quorum string: make it a "server:clientport" list, separated by ',' - * @param quorumStringInput a string contains a list of servers for ZK quorum - * @param clientPort the default client port - * @return the string for a list of "server:port" separated by "," - */ - @VisibleForTesting - public static String standardizeQuorumServerString(String quorumStringInput, String clientPort) { - String[] serverHosts = quorumStringInput.split(","); - return ZKConfig.buildQuorumServerString(serverHosts, clientPort); - } - // // Existence checks and watches // http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java index 72de935..eb629f2 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java @@ -41,17 +41,6 @@ import org.junit.experimental.categories.Category; public class TestZKUtil { @Test - public void testGetZooKeeperClusterKey() { - Configuration conf = HBaseConfiguration.create(); - conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n"); - conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333"); - conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase"); - String clusterKey = ZKUtil.getZooKeeperClusterKey(conf, "test"); - Assert.assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n")); - Assert.assertEquals("localhost:3333:hbase,test", clusterKey); - } - - @Test public void testCreateACL() throws ZooKeeperConnectionException, IOException { Configuration conf = HBaseConfiguration.create(); conf.set(Superusers.SUPERUSER_CONF_KEY, "user1,@group1,user2,@group2,user3"); http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java index 505912e..94d4483 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java @@ -20,15 +20,16 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Map.Entry; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; /** * Adds HBase configuration files to a Configuration @@ -113,7 +114,7 @@ public class HBaseConfiguration extends Configuration { * @param srcConf the source configuration **/ public static void merge(Configuration destConf, Configuration srcConf) { - for (Entry<String, String> e : srcConf) { + for (Map.Entry<String, String> e : srcConf) { destConf.set(e.getKey(), e.getValue()); } } @@ -127,7 +128,7 @@ public class HBaseConfiguration extends Configuration { */ public static Configuration subset(Configuration srcConf, String prefix) { Configuration newConf = new Configuration(false); - for (Entry<String, String> entry : srcConf) { + for (Map.Entry<String, String> entry : srcConf) { if (entry.getKey().startsWith(prefix)) { String newKey = entry.getKey().substring(prefix.length()); // avoid entries that would produce an empty key @@ -140,6 +141,18 @@ public class HBaseConfiguration extends Configuration { } /** + * Sets all the entries in the provided {@code Map<String, String>} as properties in the + * given {@code Configuration}. Each property will have the specified prefix prepended, + * so that the configuration entries are keyed by {@code prefix + entry.getKey()}. + */ + public static void setWithPrefix(Configuration conf, String prefix, + Iterable<Map.Entry<String, String>> properties) { + for (Map.Entry<String, String> entry : properties) { + conf.set(prefix + entry.getKey(), entry.getValue()); + } + } + + /** * @return whether to show HBase Configuration in servlet */ public static boolean isShowConfInServlet() { @@ -233,7 +246,67 @@ public class HBaseConfiguration extends Configuration { return passwd; } - /** For debugging. Dump configurations to system output as xml format. + /** + * Generates a {@link Configuration} instance by applying the ZooKeeper cluster key + * to the base Configuration. Note that additional configuration properties may be needed + * for a remote cluster, so it is preferable to use + * {@link #createClusterConf(Configuration, String, String)}. + * + * @param baseConf the base configuration to use, containing prefixed override properties + * @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none + * + * @return the merged configuration with override properties and cluster key applied + * + * @see #createClusterConf(Configuration, String, String) + */ + public static Configuration createClusterConf(Configuration baseConf, String clusterKey) + throws IOException { + return createClusterConf(baseConf, clusterKey, null); + } + + /** + * Generates a {@link Configuration} instance by applying property overrides prefixed by + * a cluster profile key to the base Configuration. Override properties are extracted by + * the {@link #subset(Configuration, String)} method, then the merged on top of the base + * Configuration and returned. + * + * @param baseConf the base configuration to use, containing prefixed override properties + * @param clusterKey the ZooKeeper quorum cluster key to apply, or {@code null} if none + * @param overridePrefix the property key prefix to match for override properties, + * or {@code null} if none + * @return the merged configuration with override properties and cluster key applied + */ + public static Configuration createClusterConf(Configuration baseConf, String clusterKey, + String overridePrefix) throws IOException { + Configuration clusterConf = HBaseConfiguration.create(baseConf); + if (clusterKey != null && !clusterKey.isEmpty()) { + applyClusterKeyToConf(clusterConf, clusterKey); + } + + if (overridePrefix != null && !overridePrefix.isEmpty()) { + Configuration clusterSubset = HBaseConfiguration.subset(clusterConf, overridePrefix); + HBaseConfiguration.merge(clusterConf, clusterSubset); + } + return clusterConf; + } + + /** + * Apply the settings in the given key to the given configuration, this is + * used to communicate with distant clusters + * @param conf configuration object to configure + * @param key string that contains the 3 required configuratins + * @throws IOException + */ + private static void applyClusterKeyToConf(Configuration conf, String key) + throws IOException{ + ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key); + conf.set(HConstants.ZOOKEEPER_QUORUM, zkClusterKey.getQuorumString()); + conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClusterKey.getClientPort()); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkClusterKey.getZnodeParent()); + } + + /** + * For debugging. Dump configurations to system output as xml format. * Master and RS configurations can also be dumped using * http services. e.g. "curl http://master:16010/dump" */ http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java new file mode 100644 index 0000000..787b5cc --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java @@ -0,0 +1,495 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.Properties; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Utility methods for reading, and building the ZooKeeper configuration. + * + * The order and priority for reading the config are as follows: + * (1). zoo.cfg if ""hbase.config.read.zookeeper.config" is true + * (2). Property with "hbase.zookeeper.property." prefix from HBase XML + * (3). other zookeeper related properties in HBASE XML + */ [email protected] +public final class ZKConfig { + private static final Log LOG = LogFactory.getLog(ZKConfig.class); + + private static final String VARIABLE_START = "${"; + private static final int VARIABLE_START_LENGTH = VARIABLE_START.length(); + private static final String VARIABLE_END = "}"; + private static final int VARIABLE_END_LENGTH = VARIABLE_END.length(); + + private ZKConfig() { + } + + /** + * Make a Properties object holding ZooKeeper config. + * Parses the corresponding config options from the HBase XML configs + * and generates the appropriate ZooKeeper properties. + * @param conf Configuration to read from. + * @return Properties holding mappings representing ZooKeeper config file. + */ + public static Properties makeZKProps(Configuration conf) { + Properties zkProperties = makeZKPropsFromZooCfg(conf); + + if (zkProperties == null) { + // Otherwise, use the configuration options from HBase's XML files. + zkProperties = makeZKPropsFromHbaseConfig(conf); + } + return zkProperties; + } + + /** + * Parses the corresponding config options from the zoo.cfg file + * and make a Properties object holding the Zookeeper config. + * + * @param conf Configuration to read from. + * @return Properties holding mappings representing the ZooKeeper config file or null if + * the HBASE_CONFIG_READ_ZOOKEEPER_CONFIG is false or the file does not exist. + */ + private static Properties makeZKPropsFromZooCfg(Configuration conf) { + if (conf.getBoolean(HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG, false)) { + LOG.warn( + "Parsing ZooKeeper's " + HConstants.ZOOKEEPER_CONFIG_NAME + + " file for ZK properties " + + "has been deprecated. Please instead place all ZK related HBase " + + "configuration under the hbase-site.xml, using prefixes " + + "of the form '" + HConstants.ZK_CFG_PROPERTY_PREFIX + "', and " + + "set property '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG + + "' to false"); + // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read + // it and grab its configuration properties. + ClassLoader cl = ZKConfig.class.getClassLoader(); + final InputStream inputStream = + cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME); + if (inputStream != null) { + try { + return parseZooCfg(conf, inputStream); + } catch (IOException e) { + LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME + + ", loading from XML files", e); + } + } + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Skipped reading ZK properties file '" + HConstants.ZOOKEEPER_CONFIG_NAME + + "' since '" + HConstants.HBASE_CONFIG_READ_ZOOKEEPER_CONFIG + "' was not set to true"); + } + } + + return null; + } + + /** + * Make a Properties object holding ZooKeeper config. + * Parses the corresponding config options from the HBase XML configs + * and generates the appropriate ZooKeeper properties. + * + * @param conf Configuration to read from. + * @return Properties holding mappings representing ZooKeeper config file. + */ + private static Properties makeZKPropsFromHbaseConfig(Configuration conf) { + Properties zkProperties = new Properties(); + + // Directly map all of the hbase.zookeeper.property.KEY properties. + // Synchronize on conf so no loading of configs while we iterate + synchronized (conf) { + for (Entry<String, String> entry : conf) { + String key = entry.getKey(); + if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { + String zkKey = key.substring(HConstants.ZK_CFG_PROPERTY_PREFIX_LEN); + String value = entry.getValue(); + // If the value has variables substitutions, need to do a get. + if (value.contains(VARIABLE_START)) { + value = conf.get(key); + } + zkProperties.put(zkKey, value); + } + } + } + + // If clientPort is not set, assign the default. + if (zkProperties.getProperty(HConstants.CLIENT_PORT_STR) == null) { + zkProperties.put(HConstants.CLIENT_PORT_STR, + HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT); + } + + // Create the server.X properties. + int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888); + int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888); + + final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM, + HConstants.LOCALHOST); + String serverHost; + String address; + String key; + for (int i = 0; i < serverHosts.length; ++i) { + if (serverHosts[i].contains(":")) { + serverHost = serverHosts[i].substring(0, serverHosts[i].indexOf(':')); + } else { + serverHost = serverHosts[i]; + } + address = serverHost + ":" + peerPort + ":" + leaderPort; + key = "server." + i; + zkProperties.put(key, address); + } + + return zkProperties; + } + + /** + * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in. + * This method is used for testing so we can pass our own InputStream. + * @param conf HBaseConfiguration to use for injecting variables. + * @param inputStream InputStream to read from. + * @return Properties parsed from config stream with variables substituted. + * @throws IOException if anything goes wrong parsing config + * @deprecated in 0.96 onwards. HBase will no longer rely on zoo.cfg + * availability. + */ + @Deprecated + public static Properties parseZooCfg(Configuration conf, + InputStream inputStream) throws IOException { + Properties properties = new Properties(); + try { + properties.load(inputStream); + } catch (IOException e) { + final String msg = "fail to read properties from " + + HConstants.ZOOKEEPER_CONFIG_NAME; + LOG.fatal(msg); + throw new IOException(msg, e); + } + for (Entry<Object, Object> entry : properties.entrySet()) { + String value = entry.getValue().toString().trim(); + String key = entry.getKey().toString().trim(); + StringBuilder newValue = new StringBuilder(); + int varStart = value.indexOf(VARIABLE_START); + int varEnd = 0; + while (varStart != -1) { + varEnd = value.indexOf(VARIABLE_END, varStart); + if (varEnd == -1) { + String msg = "variable at " + varStart + " has no end marker"; + LOG.fatal(msg); + throw new IOException(msg); + } + String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd); + + String substituteValue = System.getProperty(variable); + if (substituteValue == null) { + substituteValue = conf.get(variable); + } + if (substituteValue == null) { + String msg = "variable " + variable + " not set in system property " + + "or hbase configs"; + LOG.fatal(msg); + throw new IOException(msg); + } + + newValue.append(substituteValue); + + varEnd += VARIABLE_END_LENGTH; + varStart = value.indexOf(VARIABLE_START, varEnd); + } + // Special case for 'hbase.cluster.distributed' property being 'true' + if (key.startsWith("server.")) { + boolean mode = + conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, HConstants.DEFAULT_CLUSTER_DISTRIBUTED); + if (mode == HConstants.CLUSTER_IS_DISTRIBUTED && value.startsWith(HConstants.LOCALHOST)) { + String msg = "The server in zoo.cfg cannot be set to localhost " + + "in a fully-distributed setup because it won't be reachable. " + + "See \"Getting Started\" for more information."; + LOG.fatal(msg); + throw new IOException(msg); + } + } + newValue.append(value.substring(varEnd)); + properties.setProperty(key, newValue.toString()); + } + return properties; + } + + /** + * Return the ZK Quorum servers string given zk properties returned by + * makeZKProps + * @param properties + * @return Quorum servers String + */ + private static String getZKQuorumServersString(Properties properties) { + String clientPort = null; + List<String> servers = new ArrayList<String>(); + + // The clientPort option may come after the server.X hosts, so we need to + // grab everything and then create the final host:port comma separated list. + boolean anyValid = false; + for (Entry<Object,Object> property : properties.entrySet()) { + String key = property.getKey().toString().trim(); + String value = property.getValue().toString().trim(); + if (key.equals("clientPort")) { + clientPort = value; + } + else if (key.startsWith("server.")) { + String host = value.substring(0, value.indexOf(':')); + servers.add(host); + anyValid = true; + } + } + + if (!anyValid) { + LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME); + return null; + } + + if (clientPort == null) { + LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME); + return null; + } + + if (servers.isEmpty()) { + LOG.fatal("No servers were found in provided ZooKeeper configuration. " + + "HBase must have a ZooKeeper cluster configured for its " + + "operation. Ensure that you've configured '" + + HConstants.ZOOKEEPER_QUORUM + "' properly."); + return null; + } + + StringBuilder hostPortBuilder = new StringBuilder(); + for (int i = 0; i < servers.size(); ++i) { + String host = servers.get(i); + if (i > 0) { + hostPortBuilder.append(','); + } + hostPortBuilder.append(host); + hostPortBuilder.append(':'); + hostPortBuilder.append(clientPort); + } + + return hostPortBuilder.toString(); + } + + /** + * Return the ZK Quorum servers string given the specified configuration + * + * @param conf + * @return Quorum servers String + */ + private static String getZKQuorumServersStringFromHbaseConfig(Configuration conf) { + String defaultClientPort = Integer.toString( + conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT)); + + // Build the ZK quorum server string with "server:clientport" list, separated by ',' + final String[] serverHosts = + conf.getStrings(HConstants.ZOOKEEPER_QUORUM, HConstants.LOCALHOST); + return buildZKQuorumServerString(serverHosts, defaultClientPort); + } + + /** + * Return the ZK Quorum servers string given the specified configuration. + * @param conf + * @return Quorum servers + */ + public static String getZKQuorumServersString(Configuration conf) { + // First try zoo.cfg; if not applicable, then try config XML. + Properties zkProperties = makeZKPropsFromZooCfg(conf); + + if (zkProperties != null) { + return getZKQuorumServersString(zkProperties); + } + + return getZKQuorumServersStringFromHbaseConfig(conf); + } + + /** + * Build the ZK quorum server string with "server:clientport" list, separated by ',' + * + * @param serverHosts a list of servers for ZK quorum + * @param clientPort the default client port + * @return the string for a list of "server:port" separated by "," + */ + public static String buildZKQuorumServerString(String[] serverHosts, String clientPort) { + StringBuilder quorumStringBuilder = new StringBuilder(); + String serverHost; + for (int i = 0; i < serverHosts.length; ++i) { + if (serverHosts[i].contains(":")) { + serverHost = serverHosts[i]; // just use the port specified from the input + } else { + serverHost = serverHosts[i] + ":" + clientPort; + } + if (i > 0) { + quorumStringBuilder.append(','); + } + quorumStringBuilder.append(serverHost); + } + return quorumStringBuilder.toString(); + } + + /** + * Verifies that the given key matches the expected format for a ZooKeeper cluster key. + * The Quorum for the ZK cluster can have one the following formats (see examples below): + * + * <ol> + * <li>s1,s2,s3 (no client port in the list, the client port could be obtained from + * clientPort)</li> + * <li>s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server, + * in this case, the clientPort would be ignored)</li> + * <li>s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use + * the clientPort; otherwise, it would use the specified port)</li> + * </ol> + * + * @param key the cluster key to validate + * @throws IOException if the key could not be parsed + */ + public static void validateClusterKey(String key) throws IOException { + transformClusterKey(key); + } + + /** + * Separate the given key into the three configurations it should contain: + * hbase.zookeeper.quorum, hbase.zookeeper.client.port + * and zookeeper.znode.parent + * @param key + * @return the three configuration in the described order + * @throws IOException + */ + public static ZKClusterKey transformClusterKey(String key) throws IOException { + String[] parts = key.split(":"); + + if (parts.length == 3) { + return new ZKClusterKey(parts [0], Integer.parseInt(parts [1]), parts [2]); + } + + if (parts.length > 3) { + // The quorum could contain client port in server:clientport format, try to transform more. + String zNodeParent = parts [parts.length - 1]; + String clientPort = parts [parts.length - 2]; + + // The first part length is the total length minus the lengths of other parts and minus 2 ":" + int endQuorumIndex = key.length() - zNodeParent.length() - clientPort.length() - 2; + String quorumStringInput = key.substring(0, endQuorumIndex); + String[] serverHosts = quorumStringInput.split(","); + + // The common case is that every server has its own client port specified - this means + // that (total parts - the ZNodeParent part - the ClientPort part) is equal to + // (the number of "," + 1) - "+ 1" because the last server has no ",". + if ((parts.length - 2) == (serverHosts.length + 1)) { + return new ZKClusterKey(quorumStringInput, Integer.parseInt(clientPort), zNodeParent); + } + + // For the uncommon case that some servers has no port specified, we need to build the + // server:clientport list using default client port for servers without specified port. + return new ZKClusterKey( + buildZKQuorumServerString(serverHosts, clientPort), + Integer.parseInt(clientPort), + zNodeParent); + } + + throw new IOException("Cluster key passed " + key + " is invalid, the format should be:" + + HConstants.ZOOKEEPER_QUORUM + ":" + HConstants.ZOOKEEPER_CLIENT_PORT + ":" + + HConstants.ZOOKEEPER_ZNODE_PARENT); + } + + /** + * Get the key to the ZK ensemble for this configuration without + * adding a name at the end + * @param conf Configuration to use to build the key + * @return ensemble key without a name + */ + public static String getZooKeeperClusterKey(Configuration conf) { + return getZooKeeperClusterKey(conf, null); + } + + /** + * Get the key to the ZK ensemble for this configuration and append + * a name at the end + * @param conf Configuration to use to build the key + * @param name Name that should be appended at the end if not empty or null + * @return ensemble key with a name (if any) + */ + public static String getZooKeeperClusterKey(Configuration conf, String name) { + String ensemble = conf.get(HConstants.ZOOKEEPER_QUORUM).replaceAll( + "[\\t\\n\\x0B\\f\\r]", ""); + StringBuilder builder = new StringBuilder(ensemble); + builder.append(":"); + builder.append(conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)); + builder.append(":"); + builder.append(conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + if (name != null && !name.isEmpty()) { + builder.append(","); + builder.append(name); + } + return builder.toString(); + } + + /** + * Standardize the ZK quorum string: make it a "server:clientport" list, separated by ',' + * @param quorumStringInput a string contains a list of servers for ZK quorum + * @param clientPort the default client port + * @return the string for a list of "server:port" separated by "," + */ + @VisibleForTesting + public static String standardizeZKQuorumServerString(String quorumStringInput, + String clientPort) { + String[] serverHosts = quorumStringInput.split(","); + return buildZKQuorumServerString(serverHosts, clientPort); + } + + // The Quorum for the ZK cluster can have one the following format (see examples below): + // (1). s1,s2,s3 (no client port in the list, the client port could be obtained from clientPort) + // (2). s1:p1,s2:p2,s3:p3 (with client port, which could be same or different for each server, + // in this case, the clientPort would be ignored) + // (3). s1:p1,s2,s3:p3 (mix of (1) and (2) - if port is not specified in a server, it would use + // the clientPort; otherwise, it would use the specified port) + @VisibleForTesting + public static class ZKClusterKey { + private String quorumString; + private int clientPort; + private String znodeParent; + + ZKClusterKey(String quorumString, int clientPort, String znodeParent) { + this.quorumString = quorumString; + this.clientPort = clientPort; + this.znodeParent = znodeParent; + } + + public String getQuorumString() { + return quorumString; + } + + public int getClientPort() { + return clientPort; + } + + public String getZnodeParent() { + return znodeParent; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java index bbddb60..f8b60fd 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestHBaseConfiguration.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; @@ -27,10 +28,12 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.List; +import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -72,8 +75,11 @@ public class TestHBaseConfiguration { String prefix = "hbase.mapred.output."; conf.set("hbase.security.authentication", "kerberos"); conf.set("hbase.regionserver.kerberos.principal", "hbasesource"); - conf.set(prefix + "hbase.regionserver.kerberos.principal", "hbasedest"); - conf.set(prefix, "shouldbemissing"); + HBaseConfiguration.setWithPrefix(conf, prefix, + ImmutableMap.of( + "hbase.regionserver.kerberos.principal", "hbasedest", + "", "shouldbemissing") + .entrySet()); Configuration subsetConf = HBaseConfiguration.subset(conf, prefix); assertNull(subsetConf.get(prefix + "hbase.regionserver.kerberos.principal")); http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java new file mode 100644 index 0000000..7879aea --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKConfig.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category({MiscTests.class, SmallTests.class}) +public class TestZKConfig { + + @Test + public void testZKConfigLoading() throws Exception { + Configuration conf = HBaseConfiguration.create(); + // Test that we read only from the config instance + // (i.e. via hbase-default.xml and hbase-site.xml) + conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 2181); + Properties props = ZKConfig.makeZKProps(conf); + assertEquals("Property client port should have been default from the HBase config", + "2181", + props.getProperty("clientPort")); + } + + @Test + public void testGetZooKeeperClusterKey() { + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.ZOOKEEPER_QUORUM, "\tlocalhost\n"); + conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "3333"); + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "hbase"); + String clusterKey = ZKConfig.getZooKeeperClusterKey(conf, "test"); + assertTrue(!clusterKey.contains("\t") && !clusterKey.contains("\n")); + assertEquals("localhost:3333:hbase,test", clusterKey); + } + + @Test + public void testClusterKey() throws Exception { + testKey("server", 2181, "hbase"); + testKey("server1,server2,server3", 2181, "hbase"); + try { + ZKConfig.validateClusterKey("2181:hbase"); + } catch (IOException ex) { + // OK + } + } + + @Test + public void testClusterKeyWithMultiplePorts() throws Exception { + // server has different port than the default port + testKey("server1:2182", 2181, "hbase", true); + // multiple servers have their own port + testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true); + // one server has no specified port, should use default port + testKey("server1:2182,server2,server3:2184", 2181, "hbase", true); + // the last server has no specified port, should use default port + testKey("server1:2182,server2:2183,server3", 2181, "hbase", true); + // multiple servers have no specified port, should use default port for those servers + testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true); + // same server, different ports + testKey("server1:2182,server1:2183,server1", 2181, "hbase", true); + // mix of same server/different port and different server + testKey("server1:2182,server2:2183,server1", 2181, "hbase", true); + } + + private void testKey(String ensemble, int port, String znode) + throws IOException { + testKey(ensemble, port, znode, false); // not support multiple client ports + } + + private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport) + throws IOException { + Configuration conf = new Configuration(); + String key = ensemble+":"+port+":"+znode; + String ensemble2 = null; + ZKConfig.ZKClusterKey zkClusterKey = ZKConfig.transformClusterKey(key); + if (multiplePortSupport) { + ensemble2 = ZKConfig.standardizeZKQuorumServerString(ensemble, + Integer.toString(port)); + assertEquals(ensemble2, zkClusterKey.getQuorumString()); + } + else { + assertEquals(ensemble, zkClusterKey.getQuorumString()); + } + assertEquals(port, zkClusterKey.getClientPort()); + assertEquals(znode, zkClusterKey.getZnodeParent()); + + conf = HBaseConfiguration.createClusterConf(conf, key); + assertEquals(zkClusterKey.getQuorumString(), conf.get(HConstants.ZOOKEEPER_QUORUM)); + assertEquals(zkClusterKey.getClientPort(), conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1)); + assertEquals(zkClusterKey.getZnodeParent(), conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + + String reconstructedKey = ZKConfig.getZooKeeperClusterKey(conf); + if (multiplePortSupport) { + String key2 = ensemble2 + ":" + port + ":" + znode; + assertEquals(key2, reconstructedKey); + } + else { + assertEquals(key, reconstructedKey); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java index 3495ca9..23fd10e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; @@ -174,8 +173,9 @@ public class SyncTable extends Configured implements Tool { Configuration conf = context.getConfiguration(); sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); - sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY); - targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY); + sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null); + targetConnection = openConnection(conf, TARGET_ZK_CLUSTER_CONF_KEY, + TableOutputFormat.OUTPUT_CONF_PREFIX); sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY); targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY); dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false); @@ -196,13 +196,12 @@ public class SyncTable extends Configured implements Tool { targetHasher = new HashTable.ResultHasher(); } - private static Connection openConnection(Configuration conf, String zkClusterConfKey) + private static Connection openConnection(Configuration conf, String zkClusterConfKey, + String configPrefix) throws IOException { - Configuration clusterConf = new Configuration(conf); String zkCluster = conf.get(zkClusterConfKey); - if (zkCluster != null) { - ZKUtil.applyClusterKeyToConf(clusterConf, zkCluster); - } + Configuration clusterConf = HBaseConfiguration.createClusterConf(conf, + zkCluster, configPrefix); return ConnectionFactory.createConnection(clusterConf); } http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java index fdd68ce..1614883 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java @@ -26,6 +26,7 @@ import java.util.*; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; +import com.google.protobuf.InvalidProtocolBufferException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -49,12 +50,11 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; -import com.google.protobuf.InvalidProtocolBufferException; /** * Utility for {@link TableMapper} and {@link TableReducer} @@ -475,12 +475,8 @@ public class TableMapReduceUtil { String quorumAddress = job.getConfiguration().get(TableOutputFormat.QUORUM_ADDRESS); User user = userProvider.getCurrent(); if (quorumAddress != null) { - Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); - ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); - // apply any "hbase.mapred.output." configuration overrides - Configuration outputOverrides = - HBaseConfiguration.subset(peerConf, TableOutputFormat.OUTPUT_CONF_PREFIX); - HBaseConfiguration.merge(peerConf, outputOverrides); + Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), + quorumAddress, TableOutputFormat.OUTPUT_CONF_PREFIX); Connection peerConn = ConnectionFactory.createConnection(peerConf); try { TokenUtil.addTokenForJob(peerConn, user, job); @@ -513,15 +509,30 @@ public class TableMapReduceUtil { * @param job The job that requires the permission. * @param quorumAddress string that contains the 3 required configuratins * @throws IOException When the authentication token cannot be obtained. + * @deprecated Since 1.2.0, use {@link #initCredentialsForCluster(Job, Configuration)} instead. */ + @Deprecated public static void initCredentialsForCluster(Job job, String quorumAddress) throws IOException { + Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), + quorumAddress); + initCredentialsForCluster(job, peerConf); + } + + /** + * Obtain an authentication token, for the specified cluster, on behalf of the current user + * and add it to the credentials for the given map reduce job. + * + * @param job The job that requires the permission. + * @param conf The configuration to use in connecting to the peer cluster + * @throws IOException When the authentication token cannot be obtained. + */ + public static void initCredentialsForCluster(Job job, Configuration conf) + throws IOException { UserProvider userProvider = UserProvider.instantiate(job.getConfiguration()); if (userProvider.isHBaseSecurityEnabled()) { try { - Configuration peerConf = HBaseConfiguration.create(job.getConfiguration()); - ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress); - Connection peerConn = ConnectionFactory.createConnection(peerConf); + Connection peerConn = ConnectionFactory.createConnection(conf); try { TokenUtil.addTokenForJob(peerConn, userProvider.getCurrent(), job); } finally { @@ -670,7 +681,7 @@ public class TableMapReduceUtil { // If passed a quorum/ensemble address, pass it on to TableOutputFormat. if (quorumAddress != null) { // Calling this will validate the format - ZKUtil.transformClusterKey(quorumAddress); + ZKConfig.validateClusterKey(quorumAddress); conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress); } if (serverClass != null && serverImpl != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index 190962e..5904f9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -191,22 +190,19 @@ implements Configurable { @Override public void setConf(Configuration otherConf) { - this.conf = HBaseConfiguration.create(otherConf); - - String tableName = this.conf.get(OUTPUT_TABLE); + String tableName = otherConf.get(OUTPUT_TABLE); if(tableName == null || tableName.length() <= 0) { throw new IllegalArgumentException("Must specify table name"); } - String address = this.conf.get(QUORUM_ADDRESS); - int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); - String serverClass = this.conf.get(REGION_SERVER_CLASS); - String serverImpl = this.conf.get(REGION_SERVER_IMPL); + String address = otherConf.get(QUORUM_ADDRESS); + int zkClientPort = otherConf.getInt(QUORUM_PORT, 0); + String serverClass = otherConf.get(REGION_SERVER_CLASS); + String serverImpl = otherConf.get(REGION_SERVER_IMPL); try { - if (address != null) { - ZKUtil.applyClusterKeyToConf(this.conf, address); - } + this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX); + if (serverClass != null) { this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); } @@ -217,9 +213,5 @@ implements Configurable { LOG.error(e); throw new RuntimeException(e); } - - // finally apply any remaining "hbase.mapred.output." configuration overrides - Configuration outputOverrides = HBaseConfiguration.subset(otherConf, OUTPUT_CONF_PREFIX); - HBaseConfiguration.merge(this.conf, outputOverrides); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 9bd2a6c..75dfe9e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; @@ -69,6 +68,7 @@ public class VerifyReplication extends Configured implements Tool { LogFactory.getLog(VerifyReplication.class); public final static String NAME = "verifyrep"; + private final static String PEER_CONFIG_PREFIX = NAME + ".peer."; static long startTime = 0; static long endTime = Long.MAX_VALUE; static int versions = -1; @@ -126,8 +126,8 @@ public class VerifyReplication extends Configured implements Tool { @Override public Void connect(HConnection conn) throws IOException { String zkClusterKey = conf.get(NAME + ".peerQuorumAddress"); - Configuration peerConf = HBaseConfiguration.create(conf); - ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey); + Configuration peerConf = HBaseConfiguration.createClusterConf(conf, + zkClusterKey, PEER_CONFIG_PREFIX); TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName")); replicatedTable = new HTable(peerConf, tableName); @@ -203,7 +203,8 @@ public class VerifyReplication extends Configured implements Tool { } } - private static String getPeerQuorumAddress(final Configuration conf) throws IOException { + private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig( + final Configuration conf) throws IOException { ZooKeeperWatcher localZKW = null; ReplicationPeerZKImpl peer = null; try { @@ -220,8 +221,8 @@ public class VerifyReplication extends Configured implements Tool { if (pair == null) { throw new IOException("Couldn't get peer conf!"); } - Configuration peerConf = rp.getPeerConf(peerId).getSecond(); - return ZKUtil.getZooKeeperClusterKey(peerConf); + + return pair; } catch (ReplicationException e) { throw new IOException( "An error occured while trying to connect to the remove peer cluster", e); @@ -260,9 +261,14 @@ public class VerifyReplication extends Configured implements Tool { conf.set(NAME+".families", families); } - String peerQuorumAddress = getPeerQuorumAddress(conf); + Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf); + ReplicationPeerConfig peerConfig = peerConfigPair.getFirst(); + String peerQuorumAddress = peerConfig.getClusterKey(); + LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " + + peerConfig.getConfiguration()); conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress); - LOG.info("Peer Quorum Address: " + peerQuorumAddress); + HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX, + peerConfig.getConfiguration().entrySet()); conf.setInt(NAME + ".versions", versions); LOG.info("Number of version: " + versions); @@ -285,8 +291,9 @@ public class VerifyReplication extends Configured implements Tool { TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); + Configuration peerClusterConf = peerConfigPair.getSecond(); // Obtain the auth token from peer cluster - TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress); + TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index 5c61afb..2ba1b47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; /** * Similar to {@link RegionReplicaUtil} but for the server side @@ -148,7 +148,7 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { try { if (repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER) == null) { ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); - peerConfig.setClusterKey(ZKUtil.getZooKeeperClusterKey(conf)); + peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf)); peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()); repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null); } http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java index a756652..3441aa6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java @@ -345,71 +345,6 @@ public class TestZooKeeper { assertNull(ZKUtil.getDataNoWatch(zkw, "/l1/l2", null)); } - @Test - public void testClusterKey() throws Exception { - testKey("server", 2181, "hbase"); - testKey("server1,server2,server3", 2181, "hbase"); - try { - ZKUtil.transformClusterKey("2181:hbase"); - } catch (IOException ex) { - // OK - } - } - - @Test - public void testClusterKeyWithMultiplePorts() throws Exception { - // server has different port than the default port - testKey("server1:2182", 2181, "hbase", true); - // multiple servers have their own port - testKey("server1:2182,server2:2183,server3:2184", 2181, "hbase", true); - // one server has no specified port, should use default port - testKey("server1:2182,server2,server3:2184", 2181, "hbase", true); - // the last server has no specified port, should use default port - testKey("server1:2182,server2:2183,server3", 2181, "hbase", true); - // multiple servers have no specified port, should use default port for those servers - testKey("server1:2182,server2,server3:2184,server4", 2181, "hbase", true); - // same server, different ports - testKey("server1:2182,server1:2183,server1", 2181, "hbase", true); - // mix of same server/different port and different server - testKey("server1:2182,server2:2183,server1", 2181, "hbase", true); - } - - private void testKey(String ensemble, int port, String znode) - throws IOException { - testKey(ensemble, port, znode, false); // not support multiple client ports - } - - private void testKey(String ensemble, int port, String znode, Boolean multiplePortSupport) - throws IOException { - Configuration conf = new Configuration(); - String key = ensemble+":"+port+":"+znode; - String ensemble2 = null; - ZKUtil.ZKClusterKey zkClusterKey = ZKUtil.transformClusterKey(key); - if (multiplePortSupport) { - ensemble2 = ZKUtil.standardizeQuorumServerString(ensemble, Integer.toString(port)); - assertEquals(ensemble2, zkClusterKey.quorumString); - } - else { - assertEquals(ensemble, zkClusterKey.quorumString); - } - assertEquals(port, zkClusterKey.clientPort); - assertEquals(znode, zkClusterKey.znodeParent); - - ZKUtil.applyClusterKeyToConf(conf, key); - assertEquals(zkClusterKey.quorumString, conf.get(HConstants.ZOOKEEPER_QUORUM)); - assertEquals(zkClusterKey.clientPort, conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, -1)); - assertEquals(zkClusterKey.znodeParent, conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); - - String reconstructedKey = ZKUtil.getZooKeeperClusterKey(conf); - if (multiplePortSupport) { - String key2 = ensemble2 + ":" + port + ":" + znode; - assertEquals(key2, reconstructedKey); - } - else { - assertEquals(key, reconstructedKey); - } - } - /** * A test for HBASE-3238 * @throws IOException A connection attempt to zk failed http://git-wip-us.apache.org/repos/asf/hbase/blob/5dec5ad2/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index d5e0e31..119cee5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -24,9 +24,13 @@ import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeer; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -37,10 +41,12 @@ import org.junit.experimental.categories.Category; import com.google.common.collect.Lists; -import static org.junit.Assert.fail; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Unit testing of ReplicationAdmin @@ -135,7 +141,7 @@ public class TestReplicationAdmin { } repQueues.removeQueue(ID_ONE); assertEquals(0, repQueues.getAllQueues().size()); - + // add recovered queue for ID_ONE repQueues.addLog(ID_ONE + "-server2", "file1"); try { @@ -149,6 +155,28 @@ public class TestReplicationAdmin { } /** + * Tests that the peer configuration used by ReplicationAdmin contains all + * the peer's properties. + */ + @Test + public void testPeerConfig() throws Exception { + ReplicationPeerConfig config = new ReplicationPeerConfig(); + config.setClusterKey(KEY_ONE); + config.getConfiguration().put("key1", "value1"); + config.getConfiguration().put("key2", "value2"); + admin.addPeer(ID_ONE, config, null); + + List<ReplicationPeer> peers = admin.listValidReplicationPeers(); + assertEquals(1, peers.size()); + ReplicationPeer peerOne = peers.get(0); + assertNotNull(peerOne); + assertEquals("value1", peerOne.getConfiguration().get("key1")); + assertEquals("value2", peerOne.getConfiguration().get("key2")); + + admin.removePeer(ID_ONE); + } + + /** * basic checks that when we add a peer that it is enabled, and that we can disable * @throws Exception */
