Author: dhruba
Date: Wed Aug 27 11:30:41 2008
New Revision: 689548
URL: http://svn.apache.org/viewvc?rev=689548&view=rev
Log:
HADOOP-3754. A Thrift Interface to allow access to HDFS.
This creates HDFS APIs in perl, python, php, ruby and cocoa. (dhruba)
Added:
hadoop/core/trunk/lib/thrift/
hadoop/core/trunk/lib/thrift/LICENSE
hadoop/core/trunk/lib/thrift/Linux/
hadoop/core/trunk/lib/thrift/Linux/amd64/
hadoop/core/trunk/lib/thrift/Linux/amd64/thrift (with props)
hadoop/core/trunk/lib/thrift/libthrift.jar (with props)
hadoop/core/trunk/lib/thrift/reflection_limited.thrift
hadoop/core/trunk/src/contrib/thriftfs/
hadoop/core/trunk/src/contrib/thriftfs/README
hadoop/core/trunk/src/contrib/thriftfs/build.xml
hadoop/core/trunk/src/contrib/thriftfs/if/
hadoop/core/trunk/src/contrib/thriftfs/if/hadoopfs.thrift
hadoop/core/trunk/src/contrib/thriftfs/scripts/
hadoop/core/trunk/src/contrib/thriftfs/scripts/hdfs.py
hadoop/core/trunk/src/contrib/thriftfs/scripts/start_thrift_server.sh
hadoop/core/trunk/src/contrib/thriftfs/src/
hadoop/core/trunk/src/contrib/thriftfs/src/java/
hadoop/core/trunk/src/contrib/thriftfs/src/java/org/
hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/
hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/
hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/
hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java
hadoop/core/trunk/src/contrib/thriftfs/test/
hadoop/core/trunk/src/contrib/thriftfs/test/org/
hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/
hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/
hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/
hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/build-contrib.xml
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=689548&r1=689547&r2=689548&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Aug 27 11:30:41 2008
@@ -104,6 +104,9 @@
HADOOP-3759. Provides ability to run memory intensive jobs without
affecting other running tasks on the nodes. (Hemanth Yamijala via ddas)
+ HADOOP-3754. A Thrift Interface to allow access to HDFS. This creates
+ HDFS APIs in perl, python, php, ruby and cocoa. (dhruba)
+
IMPROVEMENTS
HADOOP-3732. Delay intialization of datanode block verification till
Added: hadoop/core/trunk/lib/thrift/LICENSE
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/lib/thrift/LICENSE?rev=689548&view=auto
==============================================================================
--- hadoop/core/trunk/lib/thrift/LICENSE (added)
+++ hadoop/core/trunk/lib/thrift/LICENSE Wed Aug 27 11:30:41 2008
@@ -0,0 +1,24 @@
+Thrift Software License
+Copyright (c) 2006- Facebook, Inc.
+
+Permission is hereby granted, free of charge, to any person or organization
+obtaining a copy of the software and accompanying documentation covered by
+this license (the "Software") to use, reproduce, display, distribute,
+execute, and transmit the Software, and to prepare derivative works of the
+Software, and to permit third-parties to whom the Software is furnished to
+do so, all subject to the following:
+
+The copyright notices in the Software and this entire statement, including
+the above license grant, this restriction and the following disclaimer,
+must be included in all copies of the Software, in whole or in part, and
+all derivative works of the Software, unless such copies or derivative
+works are solely in the form of machine-executable object code generated by
+a source language processor.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
+SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
+FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
Added: hadoop/core/trunk/lib/thrift/Linux/amd64/thrift
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/lib/thrift/Linux/amd64/thrift?rev=689548&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/core/trunk/lib/thrift/Linux/amd64/thrift
------------------------------------------------------------------------------
svn:executable = *
Propchange: hadoop/core/trunk/lib/thrift/Linux/amd64/thrift
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/core/trunk/lib/thrift/libthrift.jar
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/lib/thrift/libthrift.jar?rev=689548&view=auto
==============================================================================
Binary file - no diff available.
Propchange: hadoop/core/trunk/lib/thrift/libthrift.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: hadoop/core/trunk/lib/thrift/reflection_limited.thrift
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/lib/thrift/reflection_limited.thrift?rev=689548&view=auto
==============================================================================
--- hadoop/core/trunk/lib/thrift/reflection_limited.thrift (added)
+++ hadoop/core/trunk/lib/thrift/reflection_limited.thrift Wed Aug 27 11:30:41
2008
@@ -0,0 +1,71 @@
+#!/usr/local/bin/thrift -php -java -cpp -py
+
+// NOTICE!!!
+// DO NOT FORGET to run regen.sh if you change this file
+// (or if you change the compiler).
+
+// This interface is deprecated.
+// There is no replacement yet, but I hate it so much that
+// I'm deprecating it before it's done.
+// I'm too ashamed to say.
+
+// dreiss naively thinks he knows how to do this better,
+// so talk to him if you are interested in taking it on,
+// or if you just want someone to make it better for you.
+
+cpp_namespace facebook.thrift.reflection.limited
+java_package com.facebook.thrift.reflection.limited
+py_module thrift.reflection.limited
+
+enum TTypeTag {
+ T_VOID = 1,
+ T_BOOL = 2,
+ T_BYTE = 3,
+ T_I16 = 6,
+ T_I32 = 8,
+ T_I64 = 10,
+ T_DOUBLE = 4,
+ T_STRING = 11,
+ T_STRUCT = 12,
+ T_MAP = 13,
+ T_SET = 14,
+ T_LIST = 15,
+ // This doesn't exist in TBinaryProtocol, but it could be useful for
reflection.
+ T_ENUM = 101,
+ T_NOT_REFLECTED = 102,
+}
+
+struct SimpleType {
+ 1: TTypeTag ttype,
+ 2: string name, // For structs and emums.
+}
+
+struct ContainerType {
+ 1: TTypeTag ttype,
+ 2: SimpleType subtype1,
+ 3: optional SimpleType subtype2,
+}
+
+struct ThriftType {
+ 1: bool is_container,
+ 2: optional SimpleType simple_type,
+ 3: optional ContainerType container_type,
+}
+
+struct Argument {
+ 1: i16 key,
+ 2: string name,
+ 3: ThriftType type,
+}
+
+struct Method {
+ 1: string name,
+ 2: ThriftType return_type,
+ 3: list<Argument> arguments,
+}
+
+struct Service {
+ 1: string name,
+ 2: list<Method> methods,
+ 3: bool fully_reflected,
+}
Modified: hadoop/core/trunk/src/contrib/build-contrib.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/build-contrib.xml?rev=689548&r1=689547&r2=689548&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/build-contrib.xml (original)
+++ hadoop/core/trunk/src/contrib/build-contrib.xml Wed Aug 27 11:30:41 2008
@@ -66,6 +66,9 @@
<fileset dir="${hadoop.root}/lib">
<include name="**/*.jar" />
</fileset>
+ <fileset dir="${build.dir}">
+ <include name="**/*.jar" />
+ </fileset>
</path>
<!-- the unit test classpath -->
Added: hadoop/core/trunk/src/contrib/thriftfs/README
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/thriftfs/README?rev=689548&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/thriftfs/README (added)
+++ hadoop/core/trunk/src/contrib/thriftfs/README Wed Aug 27 11:30:41 2008
@@ -0,0 +1,10 @@
+This is the Thrift interface to access HDFS. The thrift interafce definition
is located in if/hadoopfs.thrift. Thrift uses this file to generate HDFS access
methods in multiple languages.
+
+The languages that are currently supported are C++, python, php, Ruby, Cocoa
and perl.
+
+A compiled thrift package is checked into lib/thrift. This is a version for
Linux only. This patch is not yet tested on other platforms. A "ant package"
from the top level hadoop workspace build this package. The thrift-generated
methods are located in build/contrib/thriftfs/gen-*.
+
+The script in src/contrib/thriftfs/scripts/start_thrift_server.sh starts the
proxy server. You might have to change the CLASSPATH settings in this file to
make it work in your cluster. A python script hdfs.py uses the HDFS thrift APIs
to access hdfs files in python language. This file is mostly present to
demonstrate the usage of the Thrift HDFS API.
+
+
+
Added: hadoop/core/trunk/src/contrib/thriftfs/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/thriftfs/build.xml?rev=689548&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/thriftfs/build.xml (added)
+++ hadoop/core/trunk/src/contrib/thriftfs/build.xml Wed Aug 27 11:30:41 2008
@@ -0,0 +1,82 @@
+<?xml version="1.0"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+Before you can run these subtargets directly, you need
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="thriftfs" default="thriftif">
+
+ <property name="bin.dir"
value="${basedir}/../../../lib/thrift/${os.name}/${os.arch}"/>
+ <property name="executable.name" value="${bin.dir}/thrift"/>
+ <condition property="supportedPlatforms">
+ <available file="${executable.name}" property="foundBinary"/>
+ </condition>
+ <condition property="skip.contrib">
+ <not>
+ <available file="${executable.name}" property="foundBinary"/>
+ </not>
+ </condition>
+
+ <import file="../build-contrib.xml"/>
+
+ <!-- Override jar target to specify main class -->
+ <target name="jar" depends="compile" if="supportedPlatforms">
+ <echo>Building hadoop thrift proxy
${build.dir}/hadoop-${version}-${name}.jar</echo>
+ <jar jarfile="${build.dir}/hadoop-${version}-${name}.jar">
+ <fileset dir="${build.classes}"/>
+ <fileset dir="${build.classes}/../gen-java"/>
+ <manifest>
+ <attribute name="Built-By" value="${user.name}"/>
+ <attribute name="Main-Class"
value="org.apache.hadoop.thrift.HadooopThriftServer"/>
+ </manifest>
+ </jar>
+ </target>
+
+ <!-- Run only pure-Java unit tests. superdottest -->
+ <target name="test">
+ <antcall target="hadoopbuildcontrib.test">
+ <param name="test.exclude" value="TestStreamedMerge"/>
+ </antcall>
+ </target>
+
+ <!-- Run all unit tests
+ This is not called as part of the nightly build
+ because it will only run on platforms that have standard
+ Unix utilities available.
+ -->
+ <target name="test-unix">
+ <antcall target="hadoopbuildcontrib.test">
+ </antcall>
+ </target>
+
+ <target name="init-contrib" if="supportedPlatforms">
+
+ <property name="if.dir" value="${src.dir}/../../if"/>
+ <echo>Building hadoop thrift library using ${executable.name} </echo>
+ <exec executable="${executable.name}" dir="${bin.dir}">
+ <arg line="-gen py --gen java -gen cocoa -gen perl -gen rb -gen st -gen cpp
-php -I ${bin.dir}/ -o ${build.dir} ${if.dir}/hadoopfs.thrift " />
+ </exec>
+ <javac srcdir="${build.dir}/gen-java">
+ <classpath refid="classpath"/>
+ </javac>
+ <jar destfile="${build.dir}/hadoopthriftapi.jar"
basedir="${build.dir}/gen-java" includes="**/*.class"/>
+ </target>
+
+</project>
Added: hadoop/core/trunk/src/contrib/thriftfs/if/hadoopfs.thrift
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/thriftfs/if/hadoopfs.thrift?rev=689548&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/thriftfs/if/hadoopfs.thrift (added)
+++ hadoop/core/trunk/src/contrib/thriftfs/if/hadoopfs.thrift Wed Aug 27
11:30:41 2008
@@ -0,0 +1,126 @@
+
+#!/usr/local/bin/thrift -java
+#
+# Thrift Service exported by Hadoop File System
+# Dhruba Borthakur ([EMAIL PROTECTED])
+#
+
+/**
+ * The available types in Thrift:
+ *
+ * bool Boolean, one byte
+ * byte Signed byte
+ * i16 Signed 16-bit integer
+ * i32 Signed 32-bit integer
+ * i64 Signed 64-bit integer
+ * double 64-bit floating point value
+ * string String
+ * map<t1,t2> Map from one type to another
+ * list<t1> Ordered list of one type
+ * set<t1> Set of unique elements of one type
+ *
+ */
+
+namespace java org.apache.hadoop.thriftfs.api
+namespace php hadoopfs
+
+struct ThriftHandle {
+ i64 id
+}
+
+struct Pathname {
+ string pathname
+}
+
+struct FileStatus {
+ 1: string path,
+ 2: i64 length,
+ 3: bool isdir,
+ 4: i16 block_replication,
+ 5: i64 blocksize,
+ 6: i64 modification_time,
+ 7: string permission,
+ 8: string owner,
+ 9: string group
+}
+
+struct BlockLocation {
+ 1: list<string> hosts, /* hostnames of datanodes */
+ 2: list<string> names, /* hostname:portNumber of datanodes */
+ 3: i64 offset, /* offset of the block in the file */
+ 4: i64 length /* length of data */
+}
+
+exception MalformedInputException {
+ string message
+}
+
+exception ThriftIOException {
+ string message
+}
+
+service ThriftHadoopFileSystem
+{
+
+ // set inactivity timeout period. The period is specified in seconds.
+ // if there are no RPC calls to the HadoopThrift server for this much
+ // time, then the server kills itself.
+ void setInactivityTimeoutPeriod(1:i64 periodInSeconds),
+
+ // close session
+ void shutdown(1:i32 status),
+
+ // create a file and open it for writing
+ ThriftHandle create(1:Pathname path) throws (1:ThriftIOException ouch),
+
+ // create a file and open it for writing
+ ThriftHandle createFile(1:Pathname path, 2:i16 mode,
+ 3:bool overwrite, 4:i32 bufferSize,
+ 5:i16 block_replication, 6:i64 blocksize)
+ throws (1:ThriftIOException ouch),
+
+ // returns a handle to an existing file for reading
+ ThriftHandle open(1:Pathname path) throws (1:ThriftIOException ouch),
+
+ // returns a handle to an existing file for appending to it.
+ ThriftHandle append(1:Pathname path) throws (1:ThriftIOException ouch),
+
+ // write a string to the open handle for the file
+ bool write(1:ThriftHandle handle, string data) throws (1:ThriftIOException
ouch),
+
+ // read some bytes from the open handle for the file
+ string read(1:ThriftHandle handle, i64 offset, i32 size) throws
(1:ThriftIOException ouch),
+
+ // close file
+ bool close(1:ThriftHandle out) throws (1:ThriftIOException ouch),
+
+ // delete file(s) or directory(s)
+ bool rm(1:Pathname path, 2:bool recursive) throws (1:ThriftIOException ouch),
+
+ // rename file(s) or directory(s)
+ bool rename(1:Pathname path, 2:Pathname dest) throws (1:ThriftIOException
ouch),
+
+ // create directory
+ bool mkdirs(1:Pathname path) throws (1:ThriftIOException ouch),
+
+ // Does this pathname exist?
+ bool exists(1:Pathname path) throws (1:ThriftIOException ouch),
+
+ // Returns status about the path
+ FileStatus stat(1:Pathname path) throws (1:ThriftIOException ouch),
+
+ // If the path is a directory, then returns the list of pathnames in that
directory
+ list<FileStatus> listStatus(1:Pathname path) throws (1:ThriftIOException
ouch),
+
+ // Set permission for this file
+ void chmod(1:Pathname path, 2:i16 mode) throws (1:ThriftIOException ouch),
+
+ // set the owner and group of the file.
+ void chown(1:Pathname path, 2:string owner, 3:string group) throws
(1:ThriftIOException ouch),
+
+ // set the replication factor for all blocks of the specified file
+ void setReplication(1:Pathname path, 2:i16 replication) throws
(1:ThriftIOException ouch),
+
+ // get the locations of the blocks of this file
+ list<BlockLocation> getFileBlockLocations(1:Pathname path, 2:i64 start,
3:i64 length) throws (1:ThriftIOException ouch),
+}
Added: hadoop/core/trunk/src/contrib/thriftfs/scripts/hdfs.py
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/thriftfs/scripts/hdfs.py?rev=689548&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/thriftfs/scripts/hdfs.py (added)
+++ hadoop/core/trunk/src/contrib/thriftfs/scripts/hdfs.py Wed Aug 27 11:30:41
2008
@@ -0,0 +1,555 @@
+#!/usr/bin/env python
+
+"""
+ hdfs.py is a python client for the thrift interface to HDFS.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied. See the License for the specific language governing permissions
+ and limitations under the License.
+
+"""
+import sys
+sys.path.append('../../../../build/contrib/thriftfs/gen-py')
+
+from optparse import OptionParser
+from thrift import Thrift
+from thrift.transport import TSocket
+from thrift.transport import TTransport
+from thrift.protocol import TBinaryProtocol
+from hadoopfs import ThriftHadoopFileSystem
+from hadoopfs.ttypes import *
+from readline import *
+from cmd import *
+import os
+import re
+import readline
+import subprocess
+
+#
+# The address of the FileSystemClientProxy. If the host and port are
+# not specified, then a proxy server is automatically spawned.
+#
+host = 'localhost'
+port = 4677 # use any port
+proxyStartScript = './start_thrift_server.sh'
+startServer = True # shall we start a proxy server?
+
+#
+# The hdfs interactive shell. The Cmd class is a builtin that uses readline +
implements
+# a whole bunch of utility stuff like help and custom tab completions.
+# It makes everything real easy.
+#
+class hadoopthrift_cli(Cmd):
+
+ # my custom prompt looks better than the default
+ prompt = 'hdfs>> '
+
+ #############################
+ # Class constructor
+ #############################
+ def __init__(self, server_name, server_port):
+ Cmd.__init__(self)
+ self.server_name = server_name
+ self.server_port = server_port
+
+ #############################
+ # Start the ClientProxy Server if we can find it.
+ # Read in its stdout to determine what port it is running on
+ #############################
+ def startProxyServer(self):
+ try:
+ p = subprocess.Popen(proxyStartScript, self.server_port,
stdout=subprocess.PIPE)
+ content = p.stdout.readline()
+ p.stdout.close()
+ val = re.split( '\[|\]', content)
+ print val[1]
+ self.server_port = val[1]
+ return True
+
+ except Exception, ex:
+ print "ERROR in starting proxy server " + proxyStartScript
+ print '%s' % (ex.message)
+ return False
+
+ #############################
+ # Connect to clientproxy
+ #############################
+ def connect(self):
+ try:
+ # connect to hdfs thrift server
+ self.transport = TSocket.TSocket(self.server_name, self.server_port)
+ self.transport = TTransport.TBufferedTransport(self.transport)
+ self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
+
+ # Create a client to use the protocol encoder
+ self.client = ThriftHadoopFileSystem.Client(self.protocol)
+ self.transport.open()
+
+ # tell the HadoopThrift server to die after 60 minutes of inactivity
+ self.client.setInactivityTimeoutPeriod(60*60)
+ return True
+
+ except Thrift.TException, tx:
+ print "ERROR in connecting to ", self.server_name, ":", self.server_port
+ print '%s' % (tx.message)
+ return False
+
+
+ #
+ # Disconnect from client proxy
+ #
+ def shutdown(self):
+ try :
+ self.transport.close()
+ except Exception, tx:
+ return False
+
+ #############################
+ # Create the specified file. Returns a handle to write data.
+ #############################
+ def do_create(self, name):
+ if name == "":
+ print " ERROR usage: create <pathname>"
+ print
+ return 0
+
+ # Create the file, and immediately closes the handle
+ path = Pathname();
+ path.pathname = name;
+ status = self.client.create(path)
+ self.client.close(status)
+ return 0
+
+ #############################
+ # Delete the specified file.
+ #############################
+ def do_rm(self, name):
+ if name == "":
+ print " ERROR usage: rm <pathname>\n"
+ return 0
+
+ # delete file
+ path = Pathname();
+ path.pathname = name;
+ status = self.client.rm(path, False)
+ if status == False:
+ print " ERROR in deleting path: " + name
+ return 0
+
+ #############################
+ # Rename the specified file/dir
+ #############################
+ def do_mv(self, line):
+ params = line.split()
+ if (len(params) != 2):
+ print " ERROR usage: mv <srcpathname> <destpathname>\n"
+ return 0
+ src = params[0].strip()
+ dest = params[1].strip()
+
+ if src == "":
+ print " ERROR usage: mv <srcpathname> <destpathname>\n"
+ return 0
+ if dest == "":
+ print " ERROR usage: mv <srcpathname> <destpathname>\n"
+ return 0
+
+ # move file
+ path = Pathname();
+ path.pathname = src;
+ destpath = Pathname();
+ destpath.pathname = dest;
+ status = self.client.rename(path, destpath)
+ if status == False:
+ print " ERROR in renaming path: " + name
+ return 0
+
+ #############################
+ # Delete the specified file.
+ #############################
+ def do_mkdirs(self, name):
+ if name == "":
+ print " ERROR usage: mkdirs <pathname>\n"
+ return 0
+
+ # create directory
+ path = Pathname();
+ path.pathname = name;
+ fields = self.client.mkdirs(path)
+ return 0
+
+ #############################
+ # does the pathname exist?
+ #############################
+ def do_exists(self, name):
+ if name == "":
+ print " ERROR usage: exists <pathname>\n"
+ return 0
+
+ # check existence of pathname
+ path = Pathname();
+ path.pathname = name;
+ fields = self.client.exists(path)
+ if (fields == True):
+ print name + " exists."
+ else:
+ print name + " does not exist."
+ return 0
+
+ #############################
+ # copy local file into hdfs
+ #############################
+ def do_put(self, line):
+ params = line.split()
+ if (len(params) != 2):
+ print " ERROR usage: put <localpathname> <hdfspathname>\n"
+ return 0
+ local = params[0].strip()
+ hdfs = params[1].strip()
+
+ if local == "":
+ print " ERROR usage: put <localpathname> <hdfspathname>\n"
+ return 0
+ if hdfs == "":
+ print " ERROR usage: put <localpathname> <hdfspathname>\n"
+ return 0
+
+ # open local file
+ input = open(local, 'rb')
+
+ # open output file
+ path = Pathname();
+ path.pathname = hdfs;
+ output = self.client.create(path)
+
+ # read 1MB at a time and upload to hdfs
+ while True:
+ chunk = input.read(1024*1024)
+ if not chunk: break
+ self.client.write(output, chunk)
+
+ self.client.close(output)
+ input.close()
+
+ #############################
+ # copy hdfs file into local
+ #############################
+ def do_get(self, line):
+ params = line.split()
+ if (len(params) != 2):
+ print " ERROR usage: get <hdfspathname> <localpathname>\n"
+ return 0
+ hdfs = params[0].strip()
+ local = params[1].strip()
+
+ if local == "":
+ print " ERROR usage: get <hdfspathname> <localpathname>\n"
+ return 0
+ if hdfs == "":
+ print " ERROR usage: get <hdfspathname> <localpathname>\n"
+ return 0
+
+ # open output local file
+ output = open(local, 'wb')
+
+ # open input hdfs file
+ path = Pathname();
+ path.pathname = hdfs;
+ input = self.client.open(path)
+
+ # find size of hdfs file
+ filesize = self.client.stat(path).length
+
+ # read 1MB bytes at a time from hdfs
+ offset = 0
+ chunksize = 1024 * 1024
+ while True:
+ chunk = self.client.read(input, offset, chunksize)
+ if not chunk: break
+ output.write(chunk)
+ offset += chunksize
+ if (offset >= filesize): break
+
+ self.client.close(input)
+ output.close()
+
+ #############################
+ # List attributes of this path
+ #############################
+ def do_ls(self, name):
+ if name == "":
+ print " ERROR usage: list <pathname>\n"
+ return 0
+
+ # list file status
+ path = Pathname();
+ path.pathname = name;
+ status = self.client.stat(path)
+ if (status.isdir == False):
+ self.printStatus(status)
+ return 0
+
+ # This is a directory, fetch its contents
+ liststatus = self.client.listStatus(path)
+ for item in liststatus:
+ self.printStatus(item)
+
+ #############################
+ # Set permissions for a file
+ #############################
+ def do_chmod(self, line):
+ params = line.split()
+ if (len(params) != 2):
+ print " ERROR usage: chmod 774 <pathname>\n"
+ return 0
+ perm = params[0].strip()
+ name = params[1].strip()
+
+ if name == "":
+ print " ERROR usage: chmod 774 <pathname>\n"
+ return 0
+ if perm == "":
+ print " ERROR usage: chmod 774 <pathname>\n"
+ return 0
+
+ # set permissions (in octal)
+ path = Pathname();
+ path.pathname = name;
+ status = self.client.chmod(path, int(perm,8))
+ return 0
+
+ #############################
+ # Set owner for a file. This is not an atomic operation.
+ # A change to the group of a file may be overwritten by this one.
+ #############################
+ def do_chown(self, line):
+ params = line.split()
+ if (len(params) != 2):
+ print " ERROR usage: chown <ownername> <pathname>\n"
+ return 0
+ owner = params[0].strip()
+ name = params[1].strip()
+ if name == "":
+ print " ERROR usage: chown <ownername> <pathname>\n"
+ return 0
+
+ # get the current owner and group
+ path = Pathname();
+ path.pathname = name;
+ cur = self.client.stat(path)
+
+ # set new owner, keep old group
+ status = self.client.chown(path, owner, cur.group)
+ return 0
+
+ #######################################
+ # Set the replication factor for a file
+ ######################################
+ def do_setreplication(self, line):
+ params = line.split()
+ if (len(params) != 2):
+ print " ERROR usage: setreplication <replication factor> <pathname>\n"
+ return 0
+ repl = params[0].strip()
+ name = params[1].strip()
+ if name == "":
+ print " ERROR usage: setreplication <replication factor> <pathname>\n"
+ return 0
+ if repl == "":
+ print " ERROR usage: setreplication <replication factor> <pathname>\n"
+ return 0
+
+ path = Pathname();
+ path.pathname = name;
+ status = self.client.setReplication(path, int(repl))
+ return 0
+
+ #############################
+ # Display the locations of the blocks of this file
+ #############################
+ def do_getlocations(self, name):
+ if name == "":
+ print " ERROR usage: getlocations <pathname>\n"
+ return 0
+ path = Pathname();
+ path.pathname = name;
+
+ # find size of hdfs file
+ filesize = self.client.stat(path).length
+
+ # getlocations file
+ blockLocations = self.client.getFileBlockLocations(path, 0, filesize)
+ for item in blockLocations:
+ self.printLocations(item)
+
+ return 0
+
+ #############################
+ # Utility methods from here
+ #############################
+ #
+ # If I don't do this, the last command is always re-executed which is
annoying.
+ #
+ def emptyline(self):
+ pass
+
+ #
+ # print the status of a path
+ #
+ def printStatus(self, stat):
+ print str(stat.block_replication) + "\t" + str(stat.length) + "\t" +
str(stat.modification_time) + "\t" + stat.permission + "\t" + stat.owner + "\t"
+ stat.group + "\t" + stat.path
+
+ #
+ # print the locations of a block
+ #
+ def printLocations(self, location):
+ print str(location.names) + "\t" + str(location.offset) + "\t" +
str(location.length)
+
+ #
+ # Various ways to exit the hdfs shell
+ #
+ def do_quit(self,ignored):
+ try:
+ if startServer:
+ self.client.shutdown(1)
+ return -1
+ except Exception, ex:
+ return -1
+
+ def do_q(self,ignored):
+ return self.do_quit(ignored)
+
+ # ctl-d
+ def do_EOF(self,ignored):
+ return self.do_quit(ignored)
+
+ #
+ # Give the user some amount of help - I am a nice guy
+ #
+
+ def help_create(self):
+ print "create <pathname>"
+
+ def help_rm(self):
+ print "rm <pathname>"
+
+ def help_mv(self):
+ print "mv <srcpathname> <destpathname>"
+
+ def help_mkdirs(self):
+ print "mkdirs <pathname>"
+
+ def help_exists(self):
+ print "exists <pathname>"
+
+ def help_put(self):
+ print "put <localpathname> <hdfspathname>"
+
+ def help_get(self):
+ print "get <hdfspathname> <localpathname>"
+
+ def help_ls(self):
+ print "ls <hdfspathname>"
+
+ def help_chmod(self):
+ print "chmod 775 <hdfspathname>"
+
+ def help_chown(self):
+ print "chown <ownername> <hdfspathname>"
+
+ def help_setreplication(self):
+ print "setrep <replication factor> <hdfspathname>"
+
+ def help_getlocations(self):
+ print "getlocations <pathname>"
+
+ def help_EOF(self):
+ print '<ctl-d> will quit this program.'
+
+ def help_quit(self):
+ print 'if you need to know what quit does, you shouldn\'t be using a
computer.'
+
+ def help_q(self):
+ print 'quit and if you need to know what quit does, you shouldn\'t be
using a computer.'
+
+ def help_help(self):
+ print 'duh'
+
+ def usage(exec_name):
+ print "Usage: "
+ print " %s [proxyclientname [proxyclientport]]" % exec_name
+ print " %s -v" % exec_name
+ print " %s --help" % exec_name
+ print " %s -h" % exec_name
+
+if __name__ == "__main__":
+
+ #
+ # Rudimentary command line processing.
+ #
+
+ # real parsing:
+ parser = OptionParser()
+ parser.add_option("-e", "--execute", dest="command_str",
+ help="execute this command and exit")
+ parser.add_option("-s","--proxyclient",dest="host",help="the proxyclient's
hostname")
+ parser.add_option("-p","--port",dest="port",help="the proxyclient's port
number")
+
+ (options, args) = parser.parse_args()
+
+ #
+ # Save host and port information of the proxy server
+ #
+ if (options.host):
+ host = options.host
+ startServer = False
+ if (options.port):
+ port = options.port
+ startServer = False
+
+ #
+ # Retrieve the user's readline history.
+ #
+ historyFileName = os.path.expanduser("~/.hdfs_history")
+ if (os.path.exists(historyFileName)):
+ readline.read_history_file(historyFileName)
+
+ #
+ # Create class and connect to proxy server
+ #
+ c = hadoopthrift_cli(host,port)
+
+ if startServer:
+ if c.startProxyServer() == False:
+ sys.exit(1)
+ if c.connect() == False:
+ sys.exit(1)
+
+
+ #
+ # If this utility was invoked with one argument, process it
+ #
+ if (options.command_str):
+ c.onecmd(options.command_str)
+ sys.exit(0)
+
+ #
+ # Start looping over user commands.
+ #
+ c.cmdloop('Welcome to the Thrift interactive shell for Hadoop File System. -
how can I help you? ' + '\n'
+ 'Press tab twice to see the list of commands. ' + '\n' +
+ 'To complete the name of a command press tab once. \n'
+ )
+ c.shutdown();
+
+ readline.write_history_file(historyFileName)
+ print '' # I am nothing if not courteous.
+ sys.exit(0)
Added: hadoop/core/trunk/src/contrib/thriftfs/scripts/start_thrift_server.sh
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/thriftfs/scripts/start_thrift_server.sh?rev=689548&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/thriftfs/scripts/start_thrift_server.sh
(added)
+++ hadoop/core/trunk/src/contrib/thriftfs/scripts/start_thrift_server.sh Wed
Aug 27 11:30:41 2008
@@ -0,0 +1,27 @@
+#!/bin/sh
+
+CLASSPATH=
+TOP=../../../..
+
+# the hadoop libraries
+for f in $TOP/build/*.jar ; do
+ CLASSPATH=$CLASSPATH:$f
+done
+
+# the apache libraries
+for f in $TOP/lib/*.jar ; do
+ CLASSPATH=$CLASSPATH:$f
+done
+
+# the thrift libraries
+for f in $TOP/lib/thrift/*.jar ; do
+ CLASSPATH=$CLASSPATH:$f
+done
+
+# the thrift server
+for f in $TOP/build/contrib/thriftfs/*.jar ; do
+ CLASSPATH=$CLASSPATH:$f
+done
+
+java -Dcom.sun.management.jmxremote -cp $CLASSPATH
org.apache.hadoop.thriftfs.HadoopThriftServer $*
+
Added:
hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java?rev=689548&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java
(added)
+++
hadoop/core/trunk/src/contrib/thriftfs/src/java/org/apache/hadoop/thriftfs/HadoopThriftServer.java
Wed Aug 27 11:30:41 2008
@@ -0,0 +1,616 @@
+package org.apache.hadoop.thriftfs;
+
+import com.facebook.thrift.TException;
+import com.facebook.thrift.TApplicationException;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.protocol.TProtocol;
+import com.facebook.thrift.server.TServer;
+import com.facebook.thrift.server.TThreadPoolServer;
+import com.facebook.thrift.transport.TServerSocket;
+import com.facebook.thrift.transport.TServerTransport;
+import com.facebook.thrift.transport.TTransportFactory;
+
+// Include Generated code
+import org.apache.hadoop.thriftfs.api.*;
+import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
+
+import java.io.*;
+import java.util.*;
+import java.net.*;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * ThriftHadoopFileSystem
+ * A thrift wrapper around the Hadoop File System
+ */
+public class HadoopThriftServer extends ThriftHadoopFileSystem {
+
+ static int serverPort = 0; // default port
+ TServer server = null;
+
+ public static class HadoopThriftHandler implements
ThriftHadoopFileSystem.Iface
+ {
+
+ public static final Log LOG =
LogFactory.getLog("org.apache.hadoop.thrift");
+
+ // HDFS glue
+ Configuration conf;
+ FileSystem fs;
+
+ // stucture that maps each Thrift object into an hadoop object
+ private long nextId = new Random().nextLong();
+ private HashMap<Long, Object> hadoopHash = new HashMap<Long, Object>();
+ private Daemon inactivityThread = null;
+
+ // Detect inactive session
+ private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr
+ private static volatile long inactivityRecheckInterval = 60 * 1000;
+ private static volatile boolean fsRunning = true;
+ private static long now;
+
+ // allow outsider to change the hadoopthrift path
+ public void setOption(String key, String val) {
+ }
+
+ /**
+ * Current system time.
+ * @return current time in msec.
+ */
+ static long now() {
+ return System.currentTimeMillis();
+ }
+
+ /**
+ * getVersion
+ *
+ * @return current version of the interface.
+ */
+ public String getVersion() {
+ return "0.1";
+ }
+
+ /**
+ * shutdown
+ *
+ * cleanly closes everything and exit.
+ */
+ public void shutdown(int status) {
+ LOG.info("HadoopThriftServer shutting down.");
+ try {
+ fs.close();
+ } catch (IOException e) {
+ LOG.warn("Unable to close file system");
+ }
+ Runtime.getRuntime().exit(status);
+ }
+
+ /**
+ * Periodically checks to see if there is inactivity
+ */
+ class InactivityMonitor implements Runnable {
+ public void run() {
+ while (fsRunning) {
+ try {
+ if (now() > now + inactivityPeriod) {
+ LOG.warn("HadoopThriftServer Inactivity period of " +
+ inactivityPeriod + " expired... Stopping Server.");
+ shutdown(-1);
+ }
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ }
+ try {
+ Thread.sleep(inactivityRecheckInterval);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+ }
+
+ /**
+ * HadoopThriftServer
+ *
+ * Constructor for the HadoopThriftServer glue with Thrift Class.
+ *
+ * @param name - the name of this handler
+ */
+ public HadoopThriftHandler(String name) {
+ conf = new Configuration();
+ now = now();
+ try {
+ inactivityThread = new Daemon(new InactivityMonitor());
+ fs = FileSystem.get(conf);
+ } catch (IOException e) {
+ LOG.warn("Unable to open hadoop file system...");
+ Runtime.getRuntime().exit(-1);
+ }
+ }
+
+ /**
+ * printStackTrace
+ *
+ * Helper function to print an exception stack trace to the log and not
stderr
+ *
+ * @param e the exception
+ *
+ */
+ static private void printStackTrace(Exception e) {
+ for(StackTraceElement s: e.getStackTrace()) {
+ LOG.error(s);
+ }
+ }
+
+ /**
+ * Lookup a thrift object into a hadoop object
+ */
+ private synchronized Object lookup(long id) {
+ return hadoopHash.get(new Long(id));
+ }
+
+ /**
+ * Insert a thrift object into a hadoop object. Return its id.
+ */
+ private synchronized long insert(Object o) {
+ nextId++;
+ hadoopHash.put(nextId, o);
+ return nextId;
+ }
+
+ /**
+ * Delete a thrift object from the hadoop store.
+ */
+ private synchronized Object remove(long id) {
+ return hadoopHash.remove(new Long(id));
+ }
+
+ /**
+ * Implement the API exported by this thrift server
+ */
+
+ /** Set inactivity timeout period. The period is specified in seconds.
+ * if there are no RPC calls to the HadoopThrift server for this much
+ * time, then the server kills itself.
+ */
+ public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) {
+ inactivityPeriod = periodInSeconds * 1000; // in milli seconds
+ if (inactivityRecheckInterval > inactivityPeriod ) {
+ inactivityRecheckInterval = inactivityPeriod;
+ }
+ }
+
+
+ /**
+ * Create a file and open it for writing
+ */
+ public ThriftHandle create(Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("create: " + path);
+ FSDataOutputStream out = fs.create(new Path(path.pathname));
+ long id = insert(out);
+ ThriftHandle obj = new ThriftHandle(id);
+ HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
+ return obj;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Create a file and open it for writing, delete file if it exists
+ */
+ public ThriftHandle createFile(Pathname path,
+ short mode,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("create: " + path +
+ " permission: " + mode +
+ " overwrite: " + overwrite +
+ " bufferSize: " + bufferSize +
+ " replication: " + replication +
+ " blockSize: " + blockSize);
+ FSDataOutputStream out = fs.create(new Path(path.pathname),
+ new FsPermission(mode),
+ overwrite,
+ bufferSize,
+ replication,
+ blockSize,
+ null); // progress
+ long id = insert(out);
+ ThriftHandle obj = new ThriftHandle(id);
+ HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
+ return obj;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Opens an existing file and returns a handle to read it
+ */
+ public ThriftHandle open(Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("open: " + path);
+ FSDataInputStream out = fs.open(new Path(path.pathname));
+ long id = insert(out);
+ ThriftHandle obj = new ThriftHandle(id);
+ HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id);
+ return obj;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Opens an existing file to append to it.
+ */
+ public ThriftHandle append(Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("append: " + path);
+ FSDataOutputStream out = fs.append(new Path(path.pathname));
+ long id = insert(out);
+ ThriftHandle obj = new ThriftHandle(id);
+ HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id);
+ return obj;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * write to a file
+ */
+ public boolean write(ThriftHandle tout, String data) throws
ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("write: " + tout.id);
+ FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
+ byte[] tmp = data.getBytes("UTF-8");
+ out.write(tmp, 0, tmp.length);
+ HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
+ return true;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * read from a file
+ */
+ public String read(ThriftHandle tout, long offset,
+ int length) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("read: " + tout.id +
+ " offset: " + offset +
+ " length: " + length);
+ FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
+ if (in.getPos() != offset) {
+ in.seek(offset);
+ }
+ byte[] tmp = new byte[length];
+ int numbytes = in.read(offset, tmp, 0, length);
+ HadoopThriftHandler.LOG.debug("read done: " + tout.id);
+ return new String(tmp, 0, numbytes, "UTF-8");
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Delete a file/directory
+ */
+ public boolean rm(Pathname path, boolean recursive)
+ throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("rm: " + path +
+ " recursive: " + recursive);
+ boolean ret = fs.delete(new Path(path.pathname), recursive);
+ HadoopThriftHandler.LOG.debug("rm: " + path);
+ return ret;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Move a file/directory
+ */
+ public boolean rename(Pathname path, Pathname dest)
+ throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("rename: " + path +
+ " destination: " + dest);
+ boolean ret = fs.rename(new Path(path.pathname),
+ new Path(dest.pathname));
+ HadoopThriftHandler.LOG.debug("rename: " + path);
+ return ret;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * close file
+ */
+ public boolean close(ThriftHandle tout) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("close: " + tout.id);
+ Object obj = remove(tout.id);
+ if (obj instanceof FSDataOutputStream) {
+ FSDataOutputStream out = (FSDataOutputStream)obj;
+ out.close();
+ } else if (obj instanceof FSDataInputStream) {
+ FSDataInputStream in = (FSDataInputStream)obj;
+ in.close();
+ } else {
+ throw new ThriftIOException("Unknown thrift handle.");
+ }
+ HadoopThriftHandler.LOG.debug("closed: " + tout.id);
+ return true;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Create a directory
+ */
+ public boolean mkdirs(Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("mkdirs: " + path);
+ boolean ret = fs.mkdirs(new Path(path.pathname));
+ HadoopThriftHandler.LOG.debug("mkdirs: " + path);
+ return ret;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Does this pathname exist?
+ */
+ public boolean exists(Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("exists: " + path);
+ boolean ret = fs.exists(new Path(path.pathname));
+ HadoopThriftHandler.LOG.debug("exists done: " + path);
+ return ret;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Returns status about the specified pathname
+ */
+ public org.apache.hadoop.thriftfs.api.FileStatus stat(
+ Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("stat: " + path);
+ org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus(
+ new Path(path.pathname));
+ HadoopThriftHandler.LOG.debug("stat done: " + path);
+ return new org.apache.hadoop.thriftfs.api.FileStatus(
+ stat.getPath().toString(),
+ stat.getLen(),
+ stat.isDir(),
+ stat.getReplication(),
+ stat.getBlockSize(),
+ stat.getModificationTime(),
+ stat.getPermission().toString(),
+ stat.getOwner(),
+ stat.getGroup());
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * If the specified pathname is a directory, then return the
+ * list of pathnames in this directory
+ */
+ public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus(
+ Pathname path) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("listStatus: " + path);
+
+ org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus(
+ new Path(path.pathname));
+ HadoopThriftHandler.LOG.debug("listStatus done: " + path);
+ org.apache.hadoop.thriftfs.api.FileStatus tmp;
+ List<org.apache.hadoop.thriftfs.api.FileStatus> value =
+ new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>();
+
+ for (int i = 0; i < stat.length; i++) {
+ tmp = new org.apache.hadoop.thriftfs.api.FileStatus(
+ stat[i].getPath().toString(),
+ stat[i].getLen(),
+ stat[i].isDir(),
+ stat[i].getReplication(),
+ stat[i].getBlockSize(),
+ stat[i].getModificationTime(),
+ stat[i].getPermission().toString(),
+ stat[i].getOwner(),
+ stat[i].getGroup());
+ value.add(tmp);
+ }
+ return value;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Sets the permission of a pathname
+ */
+ public void chmod(Pathname path, short mode) throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("chmod: " + path +
+ " mode " + mode);
+ fs.setPermission(new Path(path.pathname), new FsPermission(mode));
+ HadoopThriftHandler.LOG.debug("chmod done: " + path);
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Sets the owner & group of a pathname
+ */
+ public void chown(Pathname path, String owner, String group)
+ throws
ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("chown: " + path +
+ " owner: " + owner +
+ " group: " + group);
+ fs.setOwner(new Path(path.pathname), owner, group);
+ HadoopThriftHandler.LOG.debug("chown done: " + path);
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+
+ /**
+ * Sets the replication factor of a file
+ */
+ public void setReplication(Pathname path, short repl) throws
ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("setrepl: " + path +
+ " replication factor: " + repl);
+ fs.setReplication(new Path(path.pathname), repl);
+ HadoopThriftHandler.LOG.debug("setrepl done: " + path);
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+
+ }
+
+ /**
+ * Returns the block locations of this file
+ */
+ public List<org.apache.hadoop.thriftfs.api.BlockLocation>
+ getFileBlockLocations(Pathname path, long start, long length)
+ throws ThriftIOException {
+ try {
+ now = now();
+ HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path);
+
+ org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(
+ new Path(path.pathname));
+
+ org.apache.hadoop.fs.BlockLocation[] stat =
+ fs.getFileBlockLocations(status, start, length);
+ HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path);
+
+ org.apache.hadoop.thriftfs.api.BlockLocation tmp;
+ List<org.apache.hadoop.thriftfs.api.BlockLocation> value =
+ new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>();
+
+ for (int i = 0; i < stat.length; i++) {
+
+ // construct the list of hostnames from the array returned
+ // by HDFS
+ List<String> hosts = new LinkedList<String>();
+ String[] hostsHdfs = stat[i].getHosts();
+ for (int j = 0; j < hostsHdfs.length; j++) {
+ hosts.add(hostsHdfs[j]);
+ }
+
+ // construct the list of host:port from the array returned
+ // by HDFS
+ List<String> names = new LinkedList<String>();
+ String[] namesHdfs = stat[i].getNames();
+ for (int j = 0; j < namesHdfs.length; j++) {
+ names.add(namesHdfs[j]);
+ }
+ tmp = new org.apache.hadoop.thriftfs.api.BlockLocation(
+ hosts, names, stat[i].getOffset(), stat[i].getLength());
+ value.add(tmp);
+ }
+ return value;
+ } catch (IOException e) {
+ throw new ThriftIOException(e.getMessage());
+ }
+ }
+ }
+
+ // Bind to port. If the specified port is 0, then bind to random port.
+ private ServerSocket createServerSocket(int port) throws IOException {
+ try {
+ ServerSocket sock = new ServerSocket();
+ // Prevent 2MSL delay problem on server restarts
+ sock.setReuseAddress(true);
+ // Bind to listening port
+ if (port == 0) {
+ sock.bind(null);
+ serverPort = sock.getLocalPort();
+ } else {
+ sock.bind(new InetSocketAddress(port));
+ }
+ return sock;
+ } catch (IOException ioe) {
+ throw new IOException("Could not create ServerSocket on port " + port +
"." +
+ ioe);
+ }
+ }
+
+ /**
+ * Constrcts a server object
+ */
+ public HadoopThriftServer(String [] args) {
+
+ if (args.length > 0) {
+ serverPort = new Integer(args[0]);
+ }
+ try {
+ ServerSocket ssock = createServerSocket(serverPort);
+ TServerTransport serverTransport = new TServerSocket(ssock);
+ Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba");
+ ThriftHadoopFileSystem.Processor processor = new
ThriftHadoopFileSystem.Processor(handler);
+ TThreadPoolServer.Options options = new TThreadPoolServer.Options();
+ options.minWorkerThreads = 10;
+ server = new TThreadPoolServer(processor, serverTransport,
+ new TTransportFactory(),
+ new TTransportFactory(),
+ new TBinaryProtocol.Factory(),
+ new TBinaryProtocol.Factory(),
+ options);
+ System.out.println("Starting the hadoop thrift server on port [" +
serverPort + "]...");
+ HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port
[" +serverPort + "]...");
+ System.out.flush();
+
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ }
+
+ public static void main(String [] args) {
+ HadoopThriftServer me = new HadoopThriftServer(args);
+ me.server.serve();
+ }
+};
+
Added:
hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java?rev=689548&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java
(added)
+++
hadoop/core/trunk/src/contrib/thriftfs/test/org/apache/hadoop/thriftfs/TestThriftfs.java
Wed Aug 27 11:30:41 2008
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.thriftfs;
+
+import junit.framework.TestCase;
+import java.io.*;
+
+/**
+ * This class is supposed to test ThriftHadoopFileSystem but has a long long
+ * way to go.
+ */
+public class TestThriftfs extends TestCase
+{
+ final static int numDatanodes = 1;
+
+ public TestThriftfs() throws IOException
+ {
+ }
+
+ public void testServer() throws IOException
+ {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true,
null);
+ cluster.waitActive();
+ DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+ HadoopThriftServer server = new HadoopThriftServer();
+ server.close();
+ }
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestThriftfs().testServer();
+ }
+
+}