Author: edwardyoon Date: Tue Jun 14 10:05:08 2011 New Revision: 1135453 URL: http://svn.apache.org/viewvc?rev=1135453&view=rev Log: BSPPeer's ZK object should be constructed with listed quorum servers
Added: incubator/hama/trunk/src/test/org/apache/hama/zookeeper/ incubator/hama/trunk/src/test/org/apache/hama/zookeeper/TestZKTools.java Modified: incubator/hama/trunk/CHANGES.txt incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java incubator/hama/trunk/src/java/org/apache/hama/zookeeper/QuorumPeer.java incubator/hama/trunk/src/test/org/apache/hama/bsp/TestMessages.java Modified: incubator/hama/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1135453&r1=1135452&r2=1135453&view=diff ============================================================================== --- incubator/hama/trunk/CHANGES.txt (original) +++ incubator/hama/trunk/CHANGES.txt Tue Jun 14 10:05:08 2011 @@ -11,6 +11,7 @@ Release 0.3 - Unreleased BUG FIXES + HAMA-396: BSPPeer's ZK object should be constructed with listed quorum servers (edwardyoon) HAMA-391: Refactor Exception.printStackTrace() to LOG.error(Exception) (Thomas Jungblut via edwardyoon) HAMA-393: Add environment variable 'HAMA_HEAPSIZE' to hama-env.sh (edwardyoon) Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1135453&r1=1135452&r2=1135453&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Tue Jun 14 10:05:08 2011 @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Server; import org.apache.hama.Constants; +import org.apache.hama.zookeeper.QuorumPeer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -47,7 +48,7 @@ import org.apache.zookeeper.data.Stat; * This class represents a BSP peer. */ public class BSPPeer implements Watcher, BSPPeerInterface { - + public static final Log LOG = LogFactory.getLog(BSPPeer.class); private Configuration conf; @@ -58,7 +59,7 @@ public class BSPPeer implements Watcher, private volatile Integer mutex = 0; private final String bspRoot; - private final String zookeeperAddr; + private final String quorumServers; private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress, BSPPeerInterface>(); private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues = new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>(); @@ -81,10 +82,9 @@ public class BSPPeer implements Watcher, .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT); bspRoot = conf.get(Constants.ZOOKEEPER_ROOT, Constants.DEFAULT_ZOOKEEPER_ROOT); - zookeeperAddr = conf.get(Constants.ZOOKEEPER_QUORUM) - + ":" - + conf.getInt(Constants.ZOOKEPER_CLIENT_PORT, - Constants.DEFAULT_ZOOKEPER_CLIENT_PORT); + quorumServers = QuorumPeer.getZKQuorumServersString(conf); + LOG.debug("Quorum " + quorumServers); + // TODO: may require to dynamic reflect the underlying // network e.g. ip address, port. peerAddress = new InetSocketAddress(bindAddress, bindPort); @@ -95,8 +95,8 @@ public class BSPPeer implements Watcher, public void reinitialize() { try { LOG.debug("reinitialize(): " + getPeerName()); - server = RPC.getServer(this, peerAddress.getHostName(), - peerAddress.getPort(), conf); + server = RPC.getServer(this, peerAddress.getHostName(), peerAddress + .getPort(), conf); server.start(); LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:" + peerAddress.getPort()); @@ -105,7 +105,7 @@ public class BSPPeer implements Watcher, } try { - zk = new ZooKeeper(zookeeperAddr, 3000, this); + zk = new ZooKeeper(quorumServers, 3000, this); } catch (IOException e) { LOG.error("Exception during reinitialization!", e); } @@ -115,7 +115,7 @@ public class BSPPeer implements Watcher, try { s = zk.exists(Constants.DEFAULT_ZOOKEEPER_ROOT, false); } catch (Exception e) { - LOG.error(s,e); + LOG.error(s, e); } if (s == null) { @@ -337,8 +337,8 @@ public class BSPPeer implements Watcher, private InetSocketAddress getAddress(String peerName) { String[] peerAddrParts = peerName.split(":"); - return new InetSocketAddress(peerAddrParts[0], - Integer.parseInt(peerAddrParts[1])); + return new InetSocketAddress(peerAddrParts[0], Integer + .parseInt(peerAddrParts[1])); } @Override Modified: incubator/hama/trunk/src/java/org/apache/hama/zookeeper/QuorumPeer.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/zookeeper/QuorumPeer.java?rev=1135453&r1=1135452&r2=1135453&view=diff ============================================================================== --- incubator/hama/trunk/src/java/org/apache/hama/zookeeper/QuorumPeer.java (original) +++ incubator/hama/trunk/src/java/org/apache/hama/zookeeper/QuorumPeer.java Tue Jun 14 10:05:08 2011 @@ -120,6 +120,7 @@ public class QuorumPeer implements Const for (Entry<Object, Object> entry : properties.entrySet()) { String key = entry.getKey().toString().trim(); String value = entry.getValue().toString().trim(); + if (key.startsWith("server.")) { int dot = key.indexOf('.'); long id = Long.parseLong(key.substring(dot + 1)); @@ -279,4 +280,75 @@ public class QuorumPeer implements Const } return properties; } + + /** + * Return the ZK Quorum servers string given zk properties returned by + * makeZKProps + * @param properties + * @return Quorum servers String + */ + public static String getZKQuorumServersString(Properties properties) { + String clientPort = null; + List<String> servers = new ArrayList<String>(); + + // The clientPort option may come after the server.X hosts, so we need to + // grab everything and then create the final host:port comma separated list. + boolean anyValid = false; + for (Entry<Object,Object> property : properties.entrySet()) { + String key = property.getKey().toString().trim(); + String value = property.getValue().toString().trim(); + if (key.equals("clientPort")) { + clientPort = value; + } + else if (key.startsWith("server.")) { + String host = value.substring(0, value.indexOf(':')); + servers.add(host); + try { + //noinspection ResultOfMethodCallIgnored + InetAddress.getByName(host); + anyValid = true; + } catch (UnknownHostException e) { + LOG.warn(StringUtils.stringifyException(e)); + } + } + } + + if (!anyValid) { + LOG.error("no valid quorum servers found in " + Constants.ZOOKEEPER_CONFIG_NAME); + return null; + } + + if (clientPort == null) { + LOG.error("no clientPort found in " + Constants.ZOOKEEPER_CONFIG_NAME); + return null; + } + + if (servers.isEmpty()) { + LOG.fatal("No server.X lines found in conf/zoo.cfg. Hama must have a " + + "ZooKeeper cluster configured for its operation."); + return null; + } + + StringBuilder hostPortBuilder = new StringBuilder(); + for (int i = 0; i < servers.size(); ++i) { + String host = servers.get(i); + if (i > 0) { + hostPortBuilder.append(','); + } + hostPortBuilder.append(host); + hostPortBuilder.append(':'); + hostPortBuilder.append(clientPort); + } + + return hostPortBuilder.toString(); + } + + /** + * Return the ZK Quorum servers string given the specified configuration. + * @param conf + * @return Quorum servers + */ + public static String getZKQuorumServersString(Configuration conf) { + return getZKQuorumServersString(makeZKProps(conf)); + } } Modified: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestMessages.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestMessages.java?rev=1135453&r1=1135452&r2=1135453&view=diff ============================================================================== --- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestMessages.java (original) +++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestMessages.java Tue Jun 14 10:05:08 2011 @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hama.bsp; import junit.framework.TestCase; Added: incubator/hama/trunk/src/test/org/apache/hama/zookeeper/TestZKTools.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/zookeeper/TestZKTools.java?rev=1135453&view=auto ============================================================================== --- incubator/hama/trunk/src/test/org/apache/hama/zookeeper/TestZKTools.java (added) +++ incubator/hama/trunk/src/test/org/apache/hama/zookeeper/TestZKTools.java Tue Jun 14 10:05:08 2011 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.zookeeper; + +import org.apache.hama.Constants; +import org.apache.hama.HamaConfiguration; + +import junit.framework.TestCase; + +public class TestZKTools extends TestCase { + + public void testZKProps() { + HamaConfiguration conf = new HamaConfiguration(); + conf.set(Constants.ZOOKEEPER_QUORUM, "test.com:123"); + conf.set(Constants.ZOOKEPER_CLIENT_PORT, "2222"); + + assertEquals("test.com:2222", QuorumPeer.getZKQuorumServersString(conf)); + } +}