Author: taton Date: Wed Oct 3 14:46:35 2007 New Revision: 581734 URL: http://svn.apache.org/viewvc?rev=581734&view=rev Log: HADOOP-1822. Allow the specialization and configuration of socket factories. Provide a StandardSocketFactory, and a SocksSocketFactory to allow the use of SOCKS proxies. (taton).
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=581734&r1=581733&r2=581734&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Oct 3 14:46:35 2007 @@ -77,6 +77,10 @@ HADOOP-1963. Add a FileSystem implementation for the Kosmos Filesystem (KFS). (Sriram Rao via cutting) + HADOOP-1822. Allow the specialization and configuration of socket + factories. Provide a StandardSocketFactory, and a SocksSocketFactory to + allow the use of SOCKS proxies. (taton). + OPTIMIZATIONS HADOOP-1910. Reduce the number of RPCs that DistributedFileSystem.create() Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=581734&r1=581733&r2=581734&view=diff ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Oct 3 14:46:35 2007 @@ -954,4 +954,39 @@ </description> </property> +<!-- Proxy Configuration --> + +<property> + <name>hadoop.rpc.socket.factory.class.default</name> + <value>org.apache.hadoop.net.StandardSocketFactory</value> + <description> Default SocketFactory to use. This parameter is expected to be + formatted as "package.FactoryClassName". + </description> +</property> + +<property> + <name>hadoop.rpc.socket.factory.class.ClientProtocol</name> + <value></value> + <description> SocketFactory to use to connect to a DFS. If null or empty, use + hadoop.rpc.socket.class.default. This socket factory is also used by + DFSClient to create sockets to DataNodes. + </description> +</property> + +<property> + <name>hadoop.rpc.socket.factory.class.JobSubmissionProtocol</name> + <value></value> + <description> SocketFactory to use to connect to a Map/Reduce master + (JobTracker). If null or empty, then use hadoop.rpc.socket.class.default. + </description> +</property> + +<property> + <name>hadoop.socks.server</name> + <value></value> + <description> Address (host:port) of the SOCKS server to be used by the + SocksSocketFactory. + </description> +</property> + </configuration> Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=581734&r1=581733&r2=581734&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Oct 3 14:46:35 2007 @@ -23,6 +23,7 @@ import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.fs.*; import org.apache.hadoop.ipc.*; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.conf.*; import org.apache.hadoop.dfs.DistributedFileSystem.DiskStatus; import org.apache.hadoop.util.*; @@ -36,6 +37,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.ConcurrentHashMap; +import javax.net.SocketFactory; + /******************************************************** * DFSClient can connect to a Hadoop Filesystem and * perform basic file tasks. It uses the ClientProtocol @@ -60,6 +63,7 @@ private long defaultBlockSize; private short defaultReplication; private LocalDirAllocator dirAllocator; + private SocketFactory socketFactory; /** * A map from name -> DFSOutputStream of files that are currently being @@ -142,7 +146,8 @@ return (ClientProtocol) RetryProxy.create(ClientProtocol.class, RPC.getProxy(ClientProtocol.class, - ClientProtocol.versionID, nameNodeAddr, conf), + ClientProtocol.versionID, nameNodeAddr, conf, + NetUtils.getSocketFactory(conf, ClientProtocol.class)), methodNameToPolicyMap); } @@ -152,6 +157,7 @@ public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf) throws IOException { this.conf = conf; + this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); this.namenode = createNamenode(nameNodeAddr, conf); String taskId = conf.get("mapred.task.id"); if (taskId != null) { @@ -984,7 +990,7 @@ InetSocketAddress targetAddr = retval.addr; try { - s = new Socket(); + s = socketFactory.createSocket(); s.connect(targetAddr, READ_TIMEOUT); s.setSoTimeout(READ_TIMEOUT); Block blk = targetBlock.getBlock(); @@ -1161,7 +1167,7 @@ InetSocketAddress targetAddr = retval.addr; try { - dn = new Socket(); + dn = socketFactory.createSocket(); dn.connect(targetAddr, READ_TIMEOUT); dn.setSoTimeout(READ_TIMEOUT); @@ -1490,7 +1496,7 @@ // InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName()); try { - s = new Socket(); + s = socketFactory.createSocket(); s.connect(target, READ_TIMEOUT); s.setSoTimeout(replication * READ_TIMEOUT); } catch (IOException ie) { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java?rev=581734&r1=581733&r2=581734&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java Wed Oct 3 14:46:35 2007 @@ -24,6 +24,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.mapred.StatusHttpServer; +import org.apache.hadoop.net.NetUtils; import java.io.*; import java.net.*; @@ -93,8 +94,10 @@ nameNodeAddr = DataNode.createSocketAddr( conf.get("fs.default.name", "local")); this.conf = conf; - this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class, - ClientProtocol.versionID, nameNodeAddr, conf); + this.namenode = + (ClientProtocol) RPC.getProxy(ClientProtocol.class, + ClientProtocol.versionID, nameNodeAddr, conf, NetUtils + .getSocketFactory(conf, ClientProtocol.class)); // // initialize the webserver for uploading files. Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=581734&r1=581733&r2=581734&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Oct 3 14:46:35 2007 @@ -31,19 +31,20 @@ import java.io.BufferedOutputStream; import java.io.FilterInputStream; import java.io.FilterOutputStream; -import java.io.OutputStream; import java.util.Hashtable; import java.util.Iterator; +import javax.net.SocketFactory; + import org.apache.commons.logging.*; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.dfs.FSConstants; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -71,6 +72,7 @@ private int maxIdleTime; //connections will be culled if it was idle for //maxIdleTime msecs private int maxRetries; //the max. no. of retries for socket connections + private SocketFactory socketFactory; // how to create sockets /** A call waiting for a value. */ private class Call { @@ -146,7 +148,7 @@ short failures = 0; while (true) { try { - this.socket = new Socket(); + this.socket = socketFactory.createSocket(); this.socket.connect(address, FSConstants.READ_TIMEOUT); break; } catch (IOException ie) { //SocketTimeoutException is also caught @@ -426,19 +428,29 @@ /** Construct an IPC client whose values are of the given [EMAIL PROTECTED] Writable} * class. */ - public Client(Class valueClass, Configuration conf) { + public Client(Class valueClass, Configuration conf, + SocketFactory factory) { this.valueClass = valueClass; this.timeout = conf.getInt("ipc.client.timeout", 10000); this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime", 1000); this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10); this.conf = conf; - + this.socketFactory = factory; Thread t = new ConnectionCuller(); t.setDaemon(true); t.setName(valueClass.getName() + " Connection Culler"); LOG.debug(valueClass.getName() + "Connection culler maxidletime= " + maxIdleTime + "ms"); t.start(); + } + + /** + * Construct an IPC client with the default SocketFactory + * @param valueClass + * @param conf + */ + public Client(Class<?> valueClass, Configuration conf) { + this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf)); } /** Stop all threads related to this client. No further calls may be made Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=581734&r1=581733&r2=581734&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Wed Oct 3 14:46:35 2007 @@ -28,10 +28,15 @@ import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.io.*; +import java.util.Map; +import java.util.HashMap; + +import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.io.*; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.conf.*; /** A simple RPC mechanism. @@ -126,37 +131,51 @@ } - private static Client CLIENT; + private static Map<SocketFactory, Client> CLIENTS = + new HashMap<SocketFactory, Client>(); - private static synchronized Client getClient(Configuration conf) { + private static synchronized Client getClient(Configuration conf, + SocketFactory factory) { // Construct & cache client. The configuration is only used for timeout, // and Clients have connection pools. So we can either (a) lose some // connection pooling and leak sockets, or (b) use the same timeout for all // configurations. Since the IPC is usually intended globally, not // per-job, we choose (a). - if (CLIENT == null) { - CLIENT = new Client(ObjectWritable.class, conf); + Client client = CLIENTS.get(factory); + if (client == null) { + client = new Client(ObjectWritable.class, conf, factory); + CLIENTS.put(factory, client); } - return CLIENT; + return client; + } + + /** + * Construct & cache client with the default SocketFactory. + * @param conf + * @return + */ + private static Client getClient(Configuration conf) { + return getClient(conf, SocketFactory.getDefault()); } /** * Stop all RPC client connections */ public static synchronized void stopClient(){ - if (CLIENT != null) { - CLIENT.stop(); - CLIENT = null; - } + for (Client client : CLIENTS.values()) + client.stop(); + CLIENTS.clear(); } private static class Invoker implements InvocationHandler { private InetSocketAddress address; private Client client; - public Invoker(InetSocketAddress address, Configuration conf) { + public Invoker(InetSocketAddress address, Configuration conf, + SocketFactory factory) { + this.address = address; - this.client = getClient(conf); + this.client = getClient(conf, factory); } public Object invoke(Object proxy, Method method, Object[] args) @@ -239,12 +258,14 @@ } /** Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. */ - public static VersionedProtocol getProxy(Class protocol, long clientVersion, - InetSocketAddress addr, Configuration conf) throws IOException { - VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( - protocol.getClassLoader(), - new Class[] { protocol }, - new Invoker(addr, conf)); + public static VersionedProtocol getProxy(Class<?> protocol, + long clientVersion, InetSocketAddress addr, Configuration conf, + SocketFactory factory) throws IOException { + + VersionedProtocol proxy = + (VersionedProtocol) Proxy.newProxyInstance( + protocol.getClassLoader(), new Class[] { protocol }, + new Invoker(addr, conf, factory)); long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion); if (serverVersion == clientVersion) { @@ -253,6 +274,24 @@ throw new VersionMismatch(protocol.getName(), clientVersion, serverVersion); } + } + + /** + * Construct a client-side proxy object with the default SocketFactory + * + * @param protocol + * @param clientVersion + * @param addr + * @param conf + * @return a proxy instance + * @throws IOException + */ + public static VersionedProtocol getProxy(Class<?> protocol, + long clientVersion, InetSocketAddress addr, Configuration conf) + throws IOException { + + return getProxy(protocol, clientVersion, addr, conf, NetUtils + .getDefaultSocketFactory(conf)); } /** Expert: Make multiple, parallel calls to a set of servers. */ Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=581734&r1=581733&r2=581734&view=diff ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Wed Oct 3 14:46:35 2007 @@ -55,6 +55,7 @@ import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.mapred.TaskInProgress; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -267,9 +268,10 @@ private JobSubmissionProtocol createProxy(InetSocketAddress addr, Configuration conf ) throws IOException { - JobSubmissionProtocol raw = (JobSubmissionProtocol) - RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, addr, conf); + JobSubmissionProtocol raw = + (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, + JobSubmissionProtocol.versionID, addr, conf, NetUtils + .getSocketFactory(conf, JobSubmissionProtocol.class)); RetryPolicy backoffPolicy = RetryPolicies.retryUpToMaximumCountWithProportionalSleep (5, 10, java.util.concurrent.TimeUnit.SECONDS); Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java?rev=581734&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java Wed Oct 3 14:46:35 2007 @@ -0,0 +1,78 @@ +package org.apache.hadoop.net; + +import javax.net.SocketFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.VersionedProtocol; +import org.apache.hadoop.util.ReflectionUtils; + +public class NetUtils { + + /** + * Get the socket factory for the given class according to its + * configuration parameter + * <tt>hadoop.rpc.socket.factory.class.<ClassName></tt>. When no + * such parameter exists then fall back on the default socket factory as + * configured by <tt>hadoop.rpc.socket.factory.class.default</tt>. If + * this default socket factory is not configured, then fall back on the JVM + * default socket factory. + * + * @param conf the configuration + * @param clazz the class (usually a [EMAIL PROTECTED] VersionedProtocol}) + * @return a socket factory + */ + public static SocketFactory getSocketFactory(Configuration conf, + Class<?> clazz) { + + SocketFactory factory = null; + + String propValue = + conf.get("hadoop.rpc.socket.factory.class." + clazz.getSimpleName()); + if ((propValue != null) && (propValue.length() > 0)) + factory = getSocketFactoryFromProperty(conf, propValue); + + if (factory == null) + factory = getDefaultSocketFactory(conf); + + return factory; + } + + /** + * Get the default socket factory as specified by the configuration + * parameter <tt>hadoop.rpc.socket.factory.default</tt> + * + * @param conf the configuration + * @return the default socket factory as specified in the configuration or + * the JVM default socket factory if the configuration does not + * contain a default socket factory property. + */ + public static SocketFactory getDefaultSocketFactory(Configuration conf) { + + String propValue = conf.get("hadoop.rpc.socket.factory.class.default"); + if ((propValue == null) || (propValue.length() == 0)) + return SocketFactory.getDefault(); + + return getSocketFactoryFromProperty(conf, propValue); + } + + /** + * Get the socket factory corresponding to the given proxy URI. If the + * given proxy URI corresponds to an absence of configuration parameter, + * returns null. If the URI is malformed raises an exception. + * + * @param propValue the property which is the class name of the + * SocketFactory to instantiate; assumed non null and non empty. + * @return a socket factory as defined in the property value. + */ + public static SocketFactory getSocketFactoryFromProperty( + Configuration conf, String propValue) { + + try { + Class<?> theClass = conf.getClassByName(propValue); + return (SocketFactory) ReflectionUtils.newInstance(theClass, conf); + + } catch (ClassNotFoundException cnfe) { + throw new RuntimeException("Socket Factory class not found: " + cnfe); + } + } +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java?rev=581734&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java Wed Oct 3 14:46:35 2007 @@ -0,0 +1,144 @@ +package org.apache.hadoop.net; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.Socket; +import java.net.UnknownHostException; + +import javax.net.SocketFactory; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +/** + * Specialized SocketFactory to create sockets with a SOCKS proxy + */ +public class SocksSocketFactory extends SocketFactory implements + Configurable { + + private Configuration conf; + + private Proxy proxy; + + /** + * Default empty constructor (for use with the reflection API). + */ + public SocksSocketFactory() { + this.proxy = Proxy.NO_PROXY; + } + + /** + * Constructor with a supplied Proxy + * + * @param proxy the proxy to use to create sockets + */ + public SocksSocketFactory(Proxy proxy) { + this.proxy = proxy; + } + + /* @inheritDoc */ + @Override + public Socket createSocket() throws IOException { + + return new Socket(proxy); + } + + /* @inheritDoc */ + @Override + public Socket createSocket(InetAddress addr, int port) throws IOException { + + Socket socket = createSocket(); + socket.connect(new InetSocketAddress(addr, port)); + return socket; + } + + /* @inheritDoc */ + @Override + public Socket createSocket(InetAddress addr, int port, + InetAddress localHostAddr, int localPort) throws IOException { + + Socket socket = createSocket(); + socket.bind(new InetSocketAddress(localHostAddr, localPort)); + socket.connect(new InetSocketAddress(addr, port)); + return socket; + } + + /* @inheritDoc */ + @Override + public Socket createSocket(String host, int port) throws IOException, + UnknownHostException { + + Socket socket = createSocket(); + socket.connect(new InetSocketAddress(host, port)); + return socket; + } + + /* @inheritDoc */ + @Override + public Socket createSocket(String host, int port, + InetAddress localHostAddr, int localPort) throws IOException, + UnknownHostException { + + Socket socket = createSocket(); + socket.bind(new InetSocketAddress(localHostAddr, localPort)); + socket.connect(new InetSocketAddress(host, port)); + return socket; + } + + /* @inheritDoc */ + @Override + public int hashCode() { + return proxy.hashCode(); + } + + /* @inheritDoc */ + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof SocksSocketFactory)) + return false; + final SocksSocketFactory other = (SocksSocketFactory) obj; + if (proxy == null) { + if (other.proxy != null) + return false; + } else if (!proxy.equals(other.proxy)) + return false; + return true; + } + + /* @inheritDoc */ + public Configuration getConf() { + return this.conf; + } + + /* @inheritDoc */ + public void setConf(Configuration conf) { + this.conf = conf; + String proxyStr = conf.get("hadoop.socks.server"); + if ((proxyStr != null) && (proxyStr.length() > 0)) { + setProxy(proxyStr); + } + } + + /** + * Set the proxy of this socket factory as described in the string + * parameter + * + * @param proxyStr the proxy address using the format "host:port" + */ + private void setProxy(String proxyStr) { + String[] strs = proxyStr.split(":", 2); + if (strs.length != 2) + throw new RuntimeException("Bad SOCKS proxy parameter: " + proxyStr); + String host = strs[0]; + int port = Integer.parseInt(strs[1]); + this.proxy = + new Proxy(Proxy.Type.SOCKS, InetSocketAddress.createUnresolved(host, + port)); + } +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java?rev=581734&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java Wed Oct 3 14:46:35 2007 @@ -0,0 +1,89 @@ +package org.apache.hadoop.net; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.UnknownHostException; + +import javax.net.SocketFactory; + +/** + * Specialized SocketFactory to create sockets with a SOCKS proxy + */ +public class StandardSocketFactory extends SocketFactory { + + /** + * Default empty constructor (for use with the reflection API). + */ + public StandardSocketFactory() { + } + + /* @inheritDoc */ + @Override + public Socket createSocket() throws IOException { + return new Socket(); + } + + /* @inheritDoc */ + @Override + public Socket createSocket(InetAddress addr, int port) throws IOException { + + Socket socket = createSocket(); + socket.connect(new InetSocketAddress(addr, port)); + return socket; + } + + /* @inheritDoc */ + @Override + public Socket createSocket(InetAddress addr, int port, + InetAddress localHostAddr, int localPort) throws IOException { + + Socket socket = createSocket(); + socket.bind(new InetSocketAddress(localHostAddr, localPort)); + socket.connect(new InetSocketAddress(addr, port)); + return socket; + } + + /* @inheritDoc */ + @Override + public Socket createSocket(String host, int port) throws IOException, + UnknownHostException { + + Socket socket = createSocket(); + socket.connect(new InetSocketAddress(host, port)); + return socket; + } + + /* @inheritDoc */ + @Override + public Socket createSocket(String host, int port, + InetAddress localHostAddr, int localPort) throws IOException, + UnknownHostException { + + Socket socket = createSocket(); + socket.bind(new InetSocketAddress(localHostAddr, localPort)); + socket.connect(new InetSocketAddress(host, port)); + return socket; + } + + /* @inheritDoc */ + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof StandardSocketFactory)) + return false; + return true; + } + + /* @inheritDoc */ + @Override + public int hashCode() { + // Dummy hash code (to make find bugs happy) + return 47; + } + +} Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java?rev=581734&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java Wed Oct 3 14:46:35 2007 @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ipc; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.dfs.DistributedFileSystem; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.net.StandardSocketFactory; + +/** + * This class checks that RPCs can use specialized socket factories. + */ +public class TestSocketFactory extends TestCase { + + /** + * Check that we can reach a NameNode or a JobTracker using a specific + * socket factory + */ + public void testSocketFactory() throws IOException { + // Create a standard mini-cluster + Configuration sconf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(sconf, 1, true, null); + final int nameNodePort = cluster.getNameNodePort(); + + // Get a reference to its DFS directly + FileSystem fs = cluster.getFileSystem(); + assertTrue(fs instanceof DistributedFileSystem); + DistributedFileSystem directDfs = (DistributedFileSystem) fs; + + // Get another reference via network using a specific socket factory + Configuration cconf = new Configuration(); + cconf.set("fs.default.name", String.format("hdfs://localhost:%s/", + nameNodePort + 10)); + cconf.set("hadoop.rpc.socket.factory.class.default", + "org.apache.hadoop.ipc.DummySocketFactory"); + cconf.set("hadoop.rpc.socket.factory.class.ClientProtocol", + "org.apache.hadoop.ipc.DummySocketFactory"); + cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol", + "org.apache.hadoop.ipc.DummySocketFactory"); + + fs = FileSystem.get(cconf); + assertTrue(fs instanceof DistributedFileSystem); + DistributedFileSystem dfs = (DistributedFileSystem) fs; + + JobClient client = null; + + try { + // This will test RPC to the NameNode only. + // could we test Client-DataNode connections? + Path filePath = new Path("/dir"); + + assertFalse(directDfs.exists(filePath)); + assertFalse(dfs.exists(filePath)); + + directDfs.mkdirs(filePath); + assertTrue(directDfs.exists(filePath)); + assertTrue(dfs.exists(filePath)); + + // This will test TPC to a JobTracker + MiniMRCluster mr = new MiniMRCluster(1, fs.getUri().toString(), 1); + final int jobTrackerPort = mr.getJobTrackerPort(); + + JobConf jconf = new JobConf(cconf); + jconf.set("mapred.job.tracker", String.format("localhost:%d", + jobTrackerPort + 10)); + client = new JobClient(jconf); + + JobStatus[] jobs = client.jobsToComplete(); + assertTrue(jobs.length == 0); + + } finally { + try { + if (client != null) + client.close(); + } catch (Exception ignored) { + // nothing we can do + ignored.printStackTrace(); + } + try { + if (dfs != null) + dfs.close(); + + } catch (Exception ignored) { + // nothing we can do + ignored.printStackTrace(); + } + try { + if (directDfs != null) + directDfs.close(); + + } catch (Exception ignored) { + // nothing we can do + ignored.printStackTrace(); + } + try { + if (cluster != null) + cluster.shutdown(); + + } catch (Exception ignored) { + // nothing we can do + ignored.printStackTrace(); + } + } + } +} + +/** + * Dummy socket factory which shift TPC ports by subtracting 10 when + * establishing a connection + */ +class DummySocketFactory extends StandardSocketFactory { + /** + * Default empty constructor (for use with the reflection API). + */ + public DummySocketFactory() { + } + + /* @inheritDoc */ + @Override + public Socket createSocket() throws IOException { + return new Socket() { + @Override + public void connect(SocketAddress addr, int timeout) + throws IOException { + + assert (addr instanceof InetSocketAddress); + InetSocketAddress iaddr = (InetSocketAddress) addr; + SocketAddress newAddr = null; + if (iaddr.isUnresolved()) + newAddr = + new InetSocketAddress(iaddr.getHostName(), + iaddr.getPort() - 10); + else + newAddr = + new InetSocketAddress(iaddr.getAddress(), iaddr.getPort() - 10); + System.out.printf("Test socket: rerouting %s to %s\n", iaddr, + newAddr); + super.connect(newAddr, timeout); + } + }; + } + + /* @inheritDoc */ + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof DummySocketFactory)) + return false; + return true; + } + + /* @inheritDoc */ + @Override + public int hashCode() { + // Dummy hash code (to make find bugs happy) + return 53; + } +}