Author: dhruba
Date: Fri Jul 18 16:32:44 2008
New Revision: 678074
URL: http://svn.apache.org/viewvc?rev=678074&view=rev
Log:
HADOOP-3485. Allow writing to files over fuse.
(Pete Wyckoff via dhruba)
Added:
hadoop/core/trunk/src/contrib/fuse-dfs/src/test/
hadoop/core/trunk/src/contrib/fuse-dfs/src/test/TestFuseDFS.java
Removed:
hadoop/core/trunk/src/contrib/fuse-dfs/test/TestFuseDFS.java
Modified:
hadoop/core/trunk/src/contrib/fuse-dfs/build.xml
hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs.c
hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh
Modified: hadoop/core/trunk/src/contrib/fuse-dfs/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fuse-dfs/build.xml?rev=678074&r1=678073&r2=678074&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fuse-dfs/build.xml (original)
+++ hadoop/core/trunk/src/contrib/fuse-dfs/build.xml Fri Jul 18 16:32:44 2008
@@ -44,8 +44,16 @@
<env key="HADOOP_HOME" value="${hadoop.root}"/>
<env key="PROTECTED_PATHS" value="/,/Trash,/user"/>
<env key="PACKAGE_VERSION" value="0.1.0"/>
- <env key="FUSE_HOME" value="/usr/local"/>
</exec>
+ <mkdir dir="${build.dir}"/>
+ <mkdir dir="${build.dir}/test"/>
+ <exec executable="cp" failonerror="true">
+ <arg line="${root}/src/fuse_dfs ${build.dir}"/>
+ </exec>
+ <exec executable="cp" failonerror="true">
+ <arg line="${root}/src/fuse_dfs_wrapper.sh ${build.dir}"/>
+ </exec>
+
</target>
<!-- override jar target !-->
@@ -65,6 +73,7 @@
</exec>
</target>
+
<!-- override clean target !-->
<target name="clean" depends="check-libhdfs-fuse" if="libhdfs-fuse">
<echo message="contrib: ${name}"/>
Modified: hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs.c
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs.c?rev=678074&r1=678073&r2=678074&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs.c (original)
+++ hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs.c Fri Jul 18 16:32:44
2008
@@ -88,7 +88,6 @@
#define OPTIMIZED_READS 1
-
enum
{
KEY_VERSION,
@@ -178,7 +177,7 @@
// if not connected, try to connect and fail out if we can't.
if (NULL == dfs->fs && NULL == (dfs->fs =
hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
- syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__,
__LINE__);
+ syslog(LOG_ERR, "ERROR: could not connect to %s:%d %s:%d\n",
dfs->nn_hostname, dfs->nn_port,__FILE__, __LINE__);
return -EIO;
}
@@ -733,12 +732,12 @@
return -ENOTSUP;
}
-static int dfs_truncate(const char *path, off_t size)
-{
- (void)path;
- (void)size;
- return -ENOTSUP;
-}
+//static int dfs_truncate(const char *path, off_t size)
+//{
+// (void)path;
+// (void)size;
+// return -ENOTSUP;
+//}
long tempfh = 0;
@@ -746,6 +745,7 @@
{
dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+
// check params and the context var
assert(path);
assert('/' == *path);
@@ -763,12 +763,14 @@
// bugbug figure out what this flag is and report problem to Hadoop JIRA
int flags = (fi->flags & 0x7FFF);
+
#ifdef OPTIMIZED_READS
// retrieve dfs specific data
dfs_fh *fh = (dfs_fh*)malloc(sizeof (dfs_fh));
fi->fh = (uint64_t)fh;
fh->hdfsFH = (hdfsFile)hdfsOpenFile(dfs->fs, path, flags, 0, 3, 0);
fh->buf = (char*)malloc(rd_cache_buf_size*sizeof (char));
+
fh->startOffset = 0;
fh->sizeBuffer = 0;
@@ -777,6 +779,25 @@
ret = -EIO;
}
#else
+ // fprintf(stderr,"hdfsOpenFile being called %s,%o\n",path,flags);
+
+ // bugbug should stop O_RDWR flag here.
+
+
+ // bugbug when fix https://issues.apache.org/jira/browse/HADOOP-3723 can
remove the below code
+ if (flags & O_WRONLY) {
+ flags = O_WRONLY;
+
+ }
+
+ if (flags & O_RDWR) {
+ // NOTE - should not normally be checking policy in the middleman, but the
handling of Unix flags in DFS is not
+ // consistent right now. 2008-07-16
+ syslog(LOG_ERR, "ERROR: trying to open a file with O_RDWR and DFS does not
support that %s dfs %s:%d\n", path,__FILE__, __LINE__);
+ return -EIO;
+ }
+
+ // fprintf(stderr,"hdfsOpenFile being called %s,%o\n",path,flags);
// retrieve dfs specific data
fi->fh = (uint64_t)hdfsOpenFile(dfs->fs, path, flags, 0, 3, 0);
@@ -821,24 +842,27 @@
}
#endif
- // syslog(LOG_DEBUG,"hdfsTell(dfs,%ld)\n",(long)file_handle);
-// tOffset cur_offset = hdfsTell(dfs->fs, file_handle);
+ syslog(LOG_DEBUG,"hdfsTell(dfs,%ld)\n",(long)file_handle);
+ tOffset cur_offset = hdfsTell(dfs->fs, file_handle);
- // if (cur_offset != offset) {
- // syslog(LOG_ERR, "ERROR: user trying to random access write to a file
%d!=%d for %s %s:%d\n",(int)cur_offset, (int)offset,path, __FILE__, __LINE__);
-// return -EIO;
-// }
+ if (cur_offset != offset) {
+ syslog(LOG_ERR, "ERROR: user trying to random access write to a file
%d!=%d for %s %s:%d\n",(int)cur_offset, (int)offset,path, __FILE__, __LINE__);
+ return -EIO;
+ }
-
syslog(LOG_DEBUG,"hdfsWrite(dfs,%ld,'%s',%d)\n",(long)file_handle,buf,(int)size);
tSize length = hdfsWrite(dfs->fs, file_handle, buf, size);
-
- if (length != size) {
+ if(length <= 0) {
syslog(LOG_ERR, "ERROR: fuse problem - could not write all the bytes for
%s %d!=%d%s:%d\n",path,length,(int)size, __FILE__, __LINE__);
return -EIO;
}
- return 0;
+
+ if (length != size) {
+ syslog(LOG_ERR, "WARN: fuse problem - could not write all the bytes for %s
%d!=%d%s:%d\n",path,length,(int)size, __FILE__, __LINE__);
+ }
+
+ return length;
}
@@ -866,7 +890,6 @@
hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
free(fh->buf);
free(fh);
-
#else
hdfsFile file_handle = (hdfsFile)fi->fh;
#endif
@@ -876,7 +899,8 @@
}
if (hdfsCloseFile(dfs->fs, file_handle) != 0) {
- syslog(LOG_ERR, "ERROR: dfs problem - could not close file_handle for %s
%s:%d\n",path, __FILE__, __LINE__);
+ syslog(LOG_ERR, "ERROR: dfs problem - could not close file_handle(%ld) for
%s %s:%d\n",(long)file_handle,path, __FILE__, __LINE__);
+ // fprintf(stderr, "ERROR: dfs problem - could not close
file_handle(%ld) for %s %s:%d\n",(long)file_handle,path, __FILE__, __LINE__);
return -EIO;
}
@@ -891,12 +915,47 @@
static int dfs_create(const char *path, mode_t mode, struct fuse_file_info *fi)
{
- syslog(LOG_DEBUG,"in dfs_create");
fi->flags |= mode;
-
return dfs_open(path, fi);
}
+
int dfs_flush(const char *path, struct fuse_file_info *fi) {
+
+ // retrieve dfs specific data
+ dfs_context *dfs = (dfs_context*)fuse_get_context()->private_data;
+
+ // check params and the context var
+ assert(path);
+ assert(dfs);
+ assert('/' == *path);
+
+
+ // if not connected, try to connect and fail out if we can't.
+ if (NULL == dfs->fs && NULL == (dfs->fs =
hdfsConnect(dfs->nn_hostname,dfs->nn_port))) {
+ syslog(LOG_ERR, "ERROR: could not connect to dfs %s:%d\n", __FILE__,
__LINE__);
+ return -EIO;
+ }
+
+ if (NULL == (void*)fi->fh) {
+ return 0;
+ }
+
+ // note that fuse calls flush on RO files too and hdfs does not like that
and will return an error
+ if(fi->flags & O_WRONLY) {
+
+#ifdef OPTIMIZED_READS
+ dfs_fh *fh = (dfs_fh*)fi->fh;
+ hdfsFile file_handle = (hdfsFile)fh->hdfsFH;
+#else
+ hdfsFile file_handle = (hdfsFile)fi->fh;
+#endif
+
+ if (hdfsFlush(dfs->fs, file_handle) != 0) {
+ syslog(LOG_ERR, "ERROR: dfs problem - could not flush file_handle(%x)
for %s %s:%d\n",(long)file_handle,path, __FILE__, __LINE__);
+ return -EIO;
+ }
+ }
+
return 0;
}
@@ -1024,11 +1083,11 @@
.rename = dfs_rename,
.unlink = dfs_unlink,
.release = dfs_release,
- // .create = dfs_create,
- // .write = dfs_write,
- // .flush = dfs_flush,
+ .create = dfs_create,
+ .write = dfs_write,
+ .flush = dfs_flush,
//.xsetattr = dfs_setattr,
- // .mknod = dfs_mknod,
+ .mknod = dfs_mknod,
.chmod = dfs_chmod,
.chown = dfs_chown,
// .truncate = dfs_truncate,
@@ -1037,6 +1096,7 @@
int main(int argc, char *argv[])
{
+
umask(0);
program = argv[0];
@@ -1049,10 +1109,13 @@
/** error parsing options */
return -1;
+
if (options.server == NULL || options.port == 0) {
print_usage(argv[0]);
exit(0);
}
+
+
int ret = fuse_main(args.argc, args.argv, &dfs_oper, NULL);
if (ret) printf("\n");
Modified: hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh?rev=678074&r1=678073&r2=678074&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh (original)
+++ hadoop/core/trunk/src/contrib/fuse-dfs/src/fuse_dfs_wrapper.sh Fri Jul 18
16:32:44 2008
@@ -15,23 +15,26 @@
#
if [ "$HADOOP_HOME" = "" ]; then
- HADOOP_HOME=/usr/local/share/hadoop
+export HADOOP_HOME=/usr/local/share/hadoop
fi
+export PATH=$HADOOP_HOME/contrib/fuse_dfs:$PATH
+
for f in ls $HADOOP_HOME/lib/*.jar $HADOOP_HOME/*.jar ; do
- CLASSPATH=$CLASSPATH:$f
+export CLASSPATH=$CLASSPATH:$f
done
if [ "$OS_ARCH" = "" ]; then
- OS_ARCH=amd64
+export OS_ARCH=amd64
fi
if [ "$JAVA_HOME" = "" ]; then
- JAVA_HOME=/usr/local/java
+export JAVA_HOME=/usr/local/java
fi
if [ "$LD_LIBRARY_PATH" = "" ]; then
-
LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/$OS_ARCH/server:/usr/local/share/hdfs/libhdfs/:/usr/local/lib
+export
LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/$OS_ARCH/server:/usr/local/share/hdfs/libhdfs/:/usr/local/lib
fi
+echo $LD_LIBRARY_PATH
./fuse_dfs $@ -o-o allow_other
Added: hadoop/core/trunk/src/contrib/fuse-dfs/src/test/TestFuseDFS.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fuse-dfs/src/test/TestFuseDFS.java?rev=678074&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fuse-dfs/src/test/TestFuseDFS.java (added)
+++ hadoop/core/trunk/src/contrib/fuse-dfs/src/test/TestFuseDFS.java Fri Jul 18
16:32:44 2008
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.dfs.*;
+import junit.framework.TestCase;
+import java.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import java.net.*;
+
+/**
+ * This class tests that the Fuse module for DFS can mount properly
+ * and does a few simple commands:
+ * mkdir
+ * rmdir
+ * ls
+ * cat
+ *
+ * cp and touch are purposely not tested because they won't work with the
current module
+
+ *
+ */
+public class TestFuseDFS extends TestCase {
+
+ /**
+ * mount the fuse file system using assumed fuse module library installed in
/usr/local/lib or somewhere else on your
+ * pre-existing LD_LIBRARY_PATH
+ *
+ */
+
+ static Process fuse_process;
+ static String fuse_cmd;
+ static private void mount(String mountpoint, URI dfs) throws IOException,
InterruptedException {
+
+ String cp = System.getProperty("java.class.path");
+ Runtime r = Runtime.getRuntime();
+ fuse_cmd = System.getProperty("build.test") + "/../fuse_dfs";
+ String libhdfs = System.getProperty("build.test") + "/../../../libhdfs/";
+ String jvm = System.getProperty("java.home") + "/lib/amd64/server";
+ String lp = System.getProperty("LD_LIBRARY_PATH") + ":" +
"/usr/local/lib:" + libhdfs + ":" + jvm;
+ System.err.println("LD_LIBRARY_PATH=" + lp);
+ String cmd[] = new String[4];
+ int index = 0;
+
+ cmd[index++] = fuse_cmd;
+ cmd[index++] = "dfs://" + dfs.getHost() + ":" +
String.valueOf(dfs.getPort());
+ cmd[index++] = mountpoint;
+ cmd[index++] = "-d";
+ final String [] envp = {
+ "CLASSPATH="+ cp,
+ "LD_LIBRARY_PATH=" + lp,
+ "PATH=" + "/usr/bin:/bin"
+
+ };
+
+ // ensure the mount point is not currently mounted
+ Process p = r.exec("fusermount -u " + mountpoint);
+ p.waitFor();
+
+ // clean up the mount point
+ p = r.exec("rm -rf " + mountpoint);
+ assertTrue(p.waitFor() == 0);
+
+ // make the mount point if needed
+ p = r.exec("mkdir -p " + mountpoint);
+ assertTrue(p.waitFor() == 0);
+
+ // mount fuse to the mount point
+ fuse_process = r.exec(cmd, envp);
+
+ // give DFS a chance to come up
+ try { Thread.sleep(3000); } catch(Exception e) { }
+ }
+
+ /**
+ * unmounts fuse for before shutting down.
+ */
+ static private void umount(String mpoint) throws IOException,
InterruptedException {
+ Runtime r= Runtime.getRuntime();
+ Process p = r.exec("fusermount -u " + mpoint);
+ p.waitFor();
+ }
+
+ /**
+ * Set things up - create mini dfs cluster and mount the fuse filesystem.
+ */
+ public TestFuseDFS() throws IOException,InterruptedException {
+ }
+
+ static private MiniDFSCluster cluster;
+ static private FileSystem fileSys;
+ final static private String mpoint;
+
+ static {
+ mpoint = System.getProperty("build.test") + "/mnt";
+ System.runFinalizersOnExit(true);
+ startStuff();
+ }
+
+
+ static public void startStuff() {
+ try {
+ Configuration conf = new Configuration();
+ conf.setBoolean("dfs.permissions",false);
+ cluster = new MiniDFSCluster(conf, 1, true, null);
+ fileSys = cluster.getFileSystem();
+ assertTrue(fileSys.getFileStatus(new Path("/")).isDir());
+ mount(mpoint, fileSys.getUri());
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void setUp() {
+ }
+
+ /**
+ * use shell to create a dir and then use filesys to see it exists.
+ */
+ public void testMkdir() throws IOException,InterruptedException, Exception {
+ try {
+ // First create a new directory with mkdirs
+ Path path = new Path("/foo");
+ Runtime r = Runtime.getRuntime();
+ String cmd = "mkdir -p " + mpoint + path.toString();
+ Process p = r.exec(cmd);
+ assertTrue(p.waitFor() == 0);
+
+ // check it is there
+ assertTrue(fileSys.getFileStatus(path).isDir());
+
+ // check again through the shell
+ String lsCmd = "ls " + mpoint + path.toString();
+ p = r.exec(lsCmd);
+ assertTrue(p.waitFor() == 0);
+ } catch(Exception e) {
+ System.err.println("e=" + e);
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+
+ /**
+ * use shell to create a dir and then use filesys to see it exists.
+ */
+ public void testWrites() throws IOException,InterruptedException {
+ try {
+
+ // write a hello file
+ File file = new File(mpoint, "hello.txt");
+ FileOutputStream f = new FileOutputStream(file);
+ String s = "hello ";
+ f.write(s.getBytes());
+ s = "world";
+ f.write(s.getBytes());
+ f.flush();
+ f.close();
+
+ // check the file exists.
+ Path myPath = new Path("/hello.txt");
+ assertTrue(fileSys.exists(myPath));
+
+ // check the data is ok
+ FileInputStream fi = new FileInputStream(new File(mpoint, "hello.txt"));
+ byte b[] = new byte[12];
+ int length = fi.read(b,0,12);
+ String s2 = new String( b);
+ } catch(Exception e) {
+ e.printStackTrace();
+ } finally {
+ }
+ }
+
+
+
+ /**
+ * Test ls for dir already created in testMkdDir also tests bad ls
+ */
+ public void testLs() throws IOException,InterruptedException {
+ try {
+ // First create a new directory with mkdirs
+ Runtime r = Runtime.getRuntime();
+
+ // mkdir
+ Process p = r.exec("mkdir -p " + mpoint + "/test/mkdirs");
+ assertTrue(p.waitFor() == 0);
+
+ // ls
+ p = r.exec("ls " + mpoint + "/test/mkdirs");
+ assertTrue(p.waitFor() == 0);
+
+ // ls non-existant directory
+ p = r.exec("ls " + mpoint + "/test/mkdirsNotThere");
+ int res = p.waitFor();
+ assertFalse(res == 0);
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ /**
+ * Remove a dir using the shell and use filesys to see it no longer exists.
+ */
+ public void testRmdir() throws IOException,InterruptedException {
+ try {
+ // First create a new directory with mkdirs
+
+ Runtime r = Runtime.getRuntime();
+ Process p = r.exec("mkdir -p " + mpoint + "/test/mkdirs");
+ Path myPath = new Path("/test/mkdirs");
+ assertTrue(fileSys.exists(myPath));
+
+ // remove it
+ p = r.exec("rmdir " + mpoint + "/test/mkdirs");
+ assertTrue(p.waitFor() == 0);
+
+ // check it is not there
+ assertFalse(fileSys.exists(myPath));
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ /**
+ * Use filesys to create the hello world! file and then cat it and see its
contents are correct.
+ */
+ public void testCat() throws IOException,InterruptedException {
+ try {
+ // First create a new directory with mkdirs
+ Runtime r = Runtime.getRuntime();
+ Process p = r.exec("rm -rf " + mpoint + "/test/hello");
+ assertTrue(p.waitFor() == 0);
+
+ // create the file
+ Path myPath = new Path("/test/hello");
+ FSDataOutputStream s = fileSys.create(myPath);
+ String hello = "hello world!";
+ s.write(hello.getBytes());
+ s.close();
+
+ // check it exists
+ assertTrue(fileSys.exists(myPath));
+
+ // cat the file
+ p = r.exec("cat " + mpoint + "/test/hello");
+ assertTrue(p != null);
+ assertTrue(p.waitFor() == 0);
+
+ // check the data is the same
+ {
+ InputStream i = p.getInputStream();
+ byte b[] = new byte[1024];
+ int length = i.read(b);
+ String s2 = new String(b,0,length);
+ assertTrue(s2.equals(hello));
+ }
+
+ } catch(Exception e) {
+ e.printStackTrace();
+ } finally {
+ close();
+ }
+ }
+
+
+ /**
+ * Unmount and close
+ */
+ protected void tearDown() throws Exception {
+ }
+
+ /**
+ * Unmount and close
+ */
+ protected void finalize() throws Throwable {
+ close();
+ }
+
+ public void close() {
+ try {
+
+ // print out the fuse debug output
+ {
+ InputStream i = fuse_process.getInputStream();
+ byte b[] = new byte[i.available()];
+ int length = i.read(b);
+ System.err.println("read x bytes: " + length);
+ System.err.write(b,0,b.length);
+ }
+
+ int length;
+ do {
+ InputStream i = fuse_process.getErrorStream();
+ byte b[] = new byte[i.available()];
+ length = i.read(b);
+ System.err.println("read x bytes: " + length);
+ System.err.write(b,0,b.length);
+ } while(length > 0) ;
+
+ umount(mpoint);
+
+ fuse_process.destroy();
+ fuse_process = null;
+ if(fileSys != null) {
+ fileSys.close();
+ fileSys = null;
+ }
+ if(cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ } catch(Exception e) { }
+ }
+};