Added: hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java?rev=690096&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java (added) +++ hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java Thu Aug 28 21:31:57 2008 @@ -0,0 +1,616 @@ +package org.apache.hadoop.thriftfs; + +import com.facebook.thrift.TException; +import com.facebook.thrift.TApplicationException; +import com.facebook.thrift.protocol.TBinaryProtocol; +import com.facebook.thrift.protocol.TProtocol; +import com.facebook.thrift.server.TServer; +import com.facebook.thrift.server.TThreadPoolServer; +import com.facebook.thrift.transport.TServerSocket; +import com.facebook.thrift.transport.TServerTransport; +import com.facebook.thrift.transport.TTransportFactory; + +// Include Generated code +import org.apache.hadoop.thriftfs.api.*; +import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem; + +import java.io.*; +import java.util.*; +import java.net.*; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; + +/** + * ThriftHadoopFileSystem + * A thrift wrapper around the Hadoop File System + */ +public class HadoopThriftServer extends ThriftHadoopFileSystem { + + static int serverPort = 0; // default port + TServer server = null; + + public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface + { + + public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift"); + + // HDFS glue + Configuration conf; + FileSystem fs; + + // stucture that maps each Thrift object into an hadoop object + private long nextId = new Random().nextLong(); + private HashMap<Long, Object> hadoopHash = new HashMap<Long, Object>(); + private Daemon inactivityThread = null; + + // Detect inactive session + private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr + private static volatile long inactivityRecheckInterval = 60 * 1000; + private static volatile boolean fsRunning = true; + private static long now; + + // allow outsider to change the hadoopthrift path + public void setOption(String key, String val) { + } + + /** + * Current system time. + * @return current time in msec. + */ + static long now() { + return System.currentTimeMillis(); + } + + /** + * getVersion + * + * @return current version of the interface. + */ + public String getVersion() { + return "0.1"; + } + + /** + * shutdown + * + * cleanly closes everything and exit. + */ + public void shutdown(int status) { + LOG.info("HadoopThriftServer shutting down."); + try { + fs.close(); + } catch (IOException e) { + LOG.warn("Unable to close file system"); + } + Runtime.getRuntime().exit(status); + } + + /** + * Periodically checks to see if there is inactivity + */ + class InactivityMonitor implements Runnable { + public void run() { + while (fsRunning) { + try { + if (now() > now + inactivityPeriod) { + LOG.warn("HadoopThriftServer Inactivity period of " + + inactivityPeriod + " expired... Stopping Server."); + shutdown(-1); + } + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + } + try { + Thread.sleep(inactivityRecheckInterval); + } catch (InterruptedException ie) { + } + } + } + } + + /** + * HadoopThriftServer + * + * Constructor for the HadoopThriftServer glue with Thrift Class. + * + * @param name - the name of this handler + */ + public HadoopThriftHandler(String name) { + conf = new Configuration(); + now = now(); + try { + inactivityThread = new Daemon(new InactivityMonitor()); + fs = FileSystem.get(conf); + } catch (IOException e) { + LOG.warn("Unable to open hadoop file system..."); + Runtime.getRuntime().exit(-1); + } + } + + /** + * printStackTrace + * + * Helper function to print an exception stack trace to the log and not stderr + * + * @param e the exception + * + */ + static private void printStackTrace(Exception e) { + for(StackTraceElement s: e.getStackTrace()) { + LOG.error(s); + } + } + + /** + * Lookup a thrift object into a hadoop object + */ + private synchronized Object lookup(long id) { + return hadoopHash.get(new Long(id)); + } + + /** + * Insert a thrift object into a hadoop object. Return its id. + */ + private synchronized long insert(Object o) { + nextId++; + hadoopHash.put(nextId, o); + return nextId; + } + + /** + * Delete a thrift object from the hadoop store. + */ + private synchronized Object remove(long id) { + return hadoopHash.remove(new Long(id)); + } + + /** + * Implement the API exported by this thrift server + */ + + /** Set inactivity timeout period. The period is specified in seconds. + * if there are no RPC calls to the HadoopThrift server for this much + * time, then the server kills itself. + */ + public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) { + inactivityPeriod = periodInSeconds * 1000; // in milli seconds + if (inactivityRecheckInterval > inactivityPeriod ) { + inactivityRecheckInterval = inactivityPeriod; + } + } + + + /** + * Create a file and open it for writing + */ + public ThriftHandle create(Pathname path) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("create: " + path); + FSDataOutputStream out = fs.create(new Path(path.pathname)); + long id = insert(out); + ThriftHandle obj = new ThriftHandle(id); + HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id); + return obj; + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * Create a file and open it for writing, delete file if it exists + */ + public ThriftHandle createFile(Pathname path, + short mode, + boolean overwrite, + int bufferSize, + short replication, + long blockSize) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("create: " + path + + " permission: " + mode + + " overwrite: " + overwrite + + " bufferSize: " + bufferSize + + " replication: " + replication + + " blockSize: " + blockSize); + FSDataOutputStream out = fs.create(new Path(path.pathname), + new FsPermission(mode), + overwrite, + bufferSize, + replication, + blockSize, + null); // progress + long id = insert(out); + ThriftHandle obj = new ThriftHandle(id); + HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id); + return obj; + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * Opens an existing file and returns a handle to read it + */ + public ThriftHandle open(Pathname path) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("open: " + path); + FSDataInputStream out = fs.open(new Path(path.pathname)); + long id = insert(out); + ThriftHandle obj = new ThriftHandle(id); + HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id); + return obj; + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * Opens an existing file to append to it. + */ + public ThriftHandle append(Pathname path) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("append: " + path); + FSDataOutputStream out = fs.append(new Path(path.pathname)); + long id = insert(out); + ThriftHandle obj = new ThriftHandle(id); + HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id); + return obj; + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * write to a file + */ + public boolean write(ThriftHandle tout, String data) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("write: " + tout.id); + FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id); + byte[] tmp = data.getBytes("UTF-8"); + out.write(tmp, 0, tmp.length); + HadoopThriftHandler.LOG.debug("wrote: " + tout.id); + return true; + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * read from a file + */ + public String read(ThriftHandle tout, long offset, + int length) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("read: " + tout.id + + " offset: " + offset + + " length: " + length); + FSDataInputStream in = (FSDataInputStream)lookup(tout.id); + if (in.getPos() != offset) { + in.seek(offset); + } + byte[] tmp = new byte[length]; + int numbytes = in.read(offset, tmp, 0, length); + HadoopThriftHandler.LOG.debug("read done: " + tout.id); + return new String(tmp, 0, numbytes, "UTF-8"); + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * Delete a file/directory + */ + public boolean rm(Pathname path, boolean recursive) + throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("rm: " + path + + " recursive: " + recursive); + boolean ret = fs.delete(new Path(path.pathname), recursive); + HadoopThriftHandler.LOG.debug("rm: " + path); + return ret; + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * Move a file/directory + */ + public boolean rename(Pathname path, Pathname dest) + throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("rename: " + path + + " destination: " + dest); + boolean ret = fs.rename(new Path(path.pathname), + new Path(dest.pathname)); + HadoopThriftHandler.LOG.debug("rename: " + path); + return ret; + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * close file + */ + public boolean close(ThriftHandle tout) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("close: " + tout.id); + Object obj = remove(tout.id); + if (obj instanceof FSDataOutputStream) { + FSDataOutputStream out = (FSDataOutputStream)obj; + out.close(); + } else if (obj instanceof FSDataInputStream) { + FSDataInputStream in = (FSDataInputStream)obj; + in.close(); + } else { + throw new ThriftIOException("Unknown thrift handle."); + } + HadoopThriftHandler.LOG.debug("closed: " + tout.id); + return true; + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * Create a directory + */ + public boolean mkdirs(Pathname path) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("mkdirs: " + path); + boolean ret = fs.mkdirs(new Path(path.pathname)); + HadoopThriftHandler.LOG.debug("mkdirs: " + path); + return ret; + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * Does this pathname exist? + */ + public boolean exists(Pathname path) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("exists: " + path); + boolean ret = fs.exists(new Path(path.pathname)); + HadoopThriftHandler.LOG.debug("exists done: " + path); + return ret; + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * Returns status about the specified pathname + */ + public org.apache.hadoop.thriftfs.api.FileStatus stat( + Pathname path) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("stat: " + path); + org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus( + new Path(path.pathname)); + HadoopThriftHandler.LOG.debug("stat done: " + path); + return new org.apache.hadoop.thriftfs.api.FileStatus( + stat.getPath().toString(), + stat.getLen(), + stat.isDir(), + stat.getReplication(), + stat.getBlockSize(), + stat.getModificationTime(), + stat.getPermission().toString(), + stat.getOwner(), + stat.getGroup()); + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * If the specified pathname is a directory, then return the + * list of pathnames in this directory + */ + public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus( + Pathname path) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("listStatus: " + path); + + org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus( + new Path(path.pathname)); + HadoopThriftHandler.LOG.debug("listStatus done: " + path); + org.apache.hadoop.thriftfs.api.FileStatus tmp; + List<org.apache.hadoop.thriftfs.api.FileStatus> value = + new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>(); + + for (int i = 0; i < stat.length; i++) { + tmp = new org.apache.hadoop.thriftfs.api.FileStatus( + stat[i].getPath().toString(), + stat[i].getLen(), + stat[i].isDir(), + stat[i].getReplication(), + stat[i].getBlockSize(), + stat[i].getModificationTime(), + stat[i].getPermission().toString(), + stat[i].getOwner(), + stat[i].getGroup()); + value.add(tmp); + } + return value; + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * Sets the permission of a pathname + */ + public void chmod(Pathname path, short mode) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("chmod: " + path + + " mode " + mode); + fs.setPermission(new Path(path.pathname), new FsPermission(mode)); + HadoopThriftHandler.LOG.debug("chmod done: " + path); + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * Sets the owner & group of a pathname + */ + public void chown(Pathname path, String owner, String group) + throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("chown: " + path + + " owner: " + owner + + " group: " + group); + fs.setOwner(new Path(path.pathname), owner, group); + HadoopThriftHandler.LOG.debug("chown done: " + path); + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + + /** + * Sets the replication factor of a file + */ + public void setReplication(Pathname path, short repl) throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("setrepl: " + path + + " replication factor: " + repl); + fs.setReplication(new Path(path.pathname), repl); + HadoopThriftHandler.LOG.debug("setrepl done: " + path); + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + + } + + /** + * Returns the block locations of this file + */ + public List<org.apache.hadoop.thriftfs.api.BlockLocation> + getFileBlockLocations(Pathname path, long start, long length) + throws ThriftIOException { + try { + now = now(); + HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path); + + org.apache.hadoop.fs.FileStatus status = fs.getFileStatus( + new Path(path.pathname)); + + org.apache.hadoop.fs.BlockLocation[] stat = + fs.getFileBlockLocations(status, start, length); + HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path); + + org.apache.hadoop.thriftfs.api.BlockLocation tmp; + List<org.apache.hadoop.thriftfs.api.BlockLocation> value = + new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>(); + + for (int i = 0; i < stat.length; i++) { + + // construct the list of hostnames from the array returned + // by HDFS + List<String> hosts = new LinkedList<String>(); + String[] hostsHdfs = stat[i].getHosts(); + for (int j = 0; j < hostsHdfs.length; j++) { + hosts.add(hostsHdfs[j]); + } + + // construct the list of host:port from the array returned + // by HDFS + List<String> names = new LinkedList<String>(); + String[] namesHdfs = stat[i].getNames(); + for (int j = 0; j < namesHdfs.length; j++) { + names.add(namesHdfs[j]); + } + tmp = new org.apache.hadoop.thriftfs.api.BlockLocation( + hosts, names, stat[i].getOffset(), stat[i].getLength()); + value.add(tmp); + } + return value; + } catch (IOException e) { + throw new ThriftIOException(e.getMessage()); + } + } + } + + // Bind to port. If the specified port is 0, then bind to random port. + private ServerSocket createServerSocket(int port) throws IOException { + try { + ServerSocket sock = new ServerSocket(); + // Prevent 2MSL delay problem on server restarts + sock.setReuseAddress(true); + // Bind to listening port + if (port == 0) { + sock.bind(null); + serverPort = sock.getLocalPort(); + } else { + sock.bind(new InetSocketAddress(port)); + } + return sock; + } catch (IOException ioe) { + throw new IOException("Could not create ServerSocket on port " + port + "." + + ioe); + } + } + + /** + * Constrcts a server object + */ + public HadoopThriftServer(String [] args) { + + if (args.length > 0) { + serverPort = new Integer(args[0]); + } + try { + ServerSocket ssock = createServerSocket(serverPort); + TServerTransport serverTransport = new TServerSocket(ssock); + Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba"); + ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler); + TThreadPoolServer.Options options = new TThreadPoolServer.Options(); + options.minWorkerThreads = 10; + server = new TThreadPoolServer(processor, serverTransport, + new TTransportFactory(), + new TTransportFactory(), + new TBinaryProtocol.Factory(), + new TBinaryProtocol.Factory(), + options); + System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]..."); + HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]..."); + System.out.flush(); + + } catch (Exception x) { + x.printStackTrace(); + } + } + + public static void main(String [] args) { + HadoopThriftServer me = new HadoopThriftServer(args); + me.server.serve(); + } +}; +
Added: hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java?rev=690096&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java (added) +++ hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java Thu Aug 28 21:31:57 2008 @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.thriftfs; + +import junit.framework.TestCase; +import java.io.*; + +/** + * This class is supposed to test ThriftHadoopFileSystem but has a long long + * way to go. + */ +public class TestThriftfs extends TestCase +{ + final static int numDatanodes = 1; + + public TestThriftfs() throws IOException + { + } + + public void testServer() throws IOException + { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null); + cluster.waitActive(); + DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); + HadoopThriftServer server = new HadoopThriftServer(); + server.close(); + } + + public static void main(String[]args) throws Exception + { + new TestThriftfs().testServer(); + } + +}
