Author: atm
Date: Thu Aug 9 14:44:05 2012
New Revision: 1371231
URL: http://svn.apache.org/viewvc?rev=1371231&view=rev
Log:
HDFS-3634. Add self-contained, mavenized fuse_dfs test. Contributed by Colin
Patrick McCabe.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/test_fuse_dfs.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.h
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1371231&r1=1371230&r2=1371231&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug 9
14:44:05 2012
@@ -375,6 +375,9 @@ Branch-2 ( Unreleased changes )
HDFS-3291. add test that covers HttpFS working w/ a non-HDFS Hadoop
filesystem (tucu)
+ HDFS-3634. Add self-contained, mavenized fuse_dfs test. (Colin Patrick
+ McCabe via atm)
+
OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt?rev=1371231&r1=1371230&r2=1371231&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt Thu
Aug 9 14:44:05 2012
@@ -107,6 +107,10 @@ set(LIBHDFS_VERSION "0.0.0")
set_target_properties(hdfs PROPERTIES
SOVERSION ${LIBHDFS_VERSION})
+add_library(posix_util
+ main/native/util/posix_util.c
+)
+
add_executable(test_libhdfs_ops
main/native/libhdfs/test/test_libhdfs_ops.c
)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt?rev=1371231&r1=1371230&r2=1371231&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/CMakeLists.txt
Thu Aug 9 14:44:05 2012
@@ -71,6 +71,16 @@ IF(FUSE_FOUND)
m
pthread
)
+ add_executable(test_fuse_dfs
+ test/test_fuse_dfs.c
+ test/fuse_workload.c
+ )
+ target_link_libraries(test_fuse_dfs
+ ${FUSE_LIBRARIES}
+ native_mini_dfs
+ posix_util
+ pthread
+ )
ELSE(FUSE_FOUND)
IF(REQUIRE_FUSE)
MESSAGE(FATAL_ERROR "Required component fuse_dfs could not be built.")
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c?rev=1371231&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.c
Thu Aug 9 14:44:05 2012
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fuse-dfs/test/fuse_workload.h"
+#include "libhdfs/expect.h"
+#include "util/posix_util.h"
+
+#include <dirent.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/statvfs.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <utime.h>
+
+typedef int (*testReadDirFn)(const struct dirent *de, void *v);
+
+struct fileCtx {
+ int fd;
+ char *str;
+ int strLen;
+ char *path;
+};
+
+static const char *DIRS_A_AND_B[] = { "a", "b", NULL };
+static const char *DIRS_B_AND_C[] = { "b", "c", NULL };
+
+#define LONG_STR_LEN 1024
+#define NUM_FILE_CTX 3
+#define MAX_TRIES 120
+
+// TODO: implement/test access, mknod, symlink
+// TODO: rmdir on non-dir, non-empty dir
+// TODO: test unlink-during-writing
+// TODO: test weird open flags failing
+// TODO: test chown, chmod
+
+static int testReadDirImpl(DIR *dp, testReadDirFn fn, void *data)
+{
+ struct dirent *de;
+ int ret, noda = 0;
+
+ while (1) {
+ de = readdir(dp);
+ if (!de)
+ return noda;
+ if (!strcmp(de->d_name, "."))
+ continue;
+ if (!strcmp(de->d_name, ".."))
+ continue;
+ ret = fn(de, data);
+ if (ret < 0)
+ return ret;
+ ++noda;
+ }
+ return noda;
+}
+
+static int testReadDir(const char *dirName, testReadDirFn fn, void *data)
+{
+ int ret;
+ DIR *dp;
+
+ dp = opendir(dirName);
+ if (!dp) {
+ return -errno;
+ }
+ ret = testReadDirImpl(dp, fn, data);
+ closedir(dp);
+ return ret;
+}
+
+static int expectDirs(const struct dirent *de, void *v)
+{
+ const char **names = v;
+ const char **n;
+
+ for (n = names; *n; ++n) {
+ if (!strcmp(de->d_name, *n)) {
+ return 0;
+ }
+ }
+ return -ENOENT;
+}
+
+static int safeWrite(int fd, const void *buf, size_t amt)
+{
+ while (amt > 0) {
+ int r = write(fd, buf, amt);
+ if (r < 0) {
+ if (errno != EINTR)
+ return -errno;
+ continue;
+ }
+ amt -= r;
+ buf = (const char *)buf + r;
+ }
+ return 0;
+}
+
+static int safeRead(int fd, void *buf, int c)
+{
+ int res;
+ size_t amt = 0;
+
+ while (amt < c) {
+ res = read(fd, buf, c - amt);
+ if (res <= 0) {
+ if (res == 0)
+ return amt;
+ if (errno != EINTR)
+ return -errno;
+ continue;
+ }
+ amt += res;
+ buf = (char *)buf + res;
+ }
+ return amt;
+}
+
+int runFuseWorkloadImpl(const char *root, const char *pcomp,
+ struct fileCtx *ctx)
+{
+ char base[PATH_MAX], tmp[PATH_MAX], *tmpBuf;
+ char src[PATH_MAX], dst[PATH_MAX];
+ struct stat stBuf;
+ int ret, i, try;
+ struct utimbuf tbuf;
+ struct statvfs stvBuf;
+
+ // The root must be a directory
+ EXPECT_ZERO(stat(root, &stBuf));
+ EXPECT_NONZERO(S_ISDIR(stBuf.st_mode));
+
+ // base = <root>/<pcomp>. base must not exist yet
+ snprintf(base, sizeof(base), "%s/%s", root, pcomp);
+ EXPECT_NEGATIVE_ONE_WITH_ERRNO(stat(base, &stBuf), ENOENT);
+
+ // mkdir <base>
+ RETRY_ON_EINTR_GET_ERRNO(ret, mkdir(base, 0755));
+ EXPECT_ZERO(ret);
+
+ // rmdir <base>
+ RETRY_ON_EINTR_GET_ERRNO(ret, rmdir(base));
+ EXPECT_ZERO(ret);
+
+ // mkdir <base>
+ RETRY_ON_EINTR_GET_ERRNO(ret, mkdir(base, 0755));
+ EXPECT_ZERO(ret);
+
+ // stat <base>
+ EXPECT_ZERO(stat(base, &stBuf));
+ EXPECT_NONZERO(S_ISDIR(stBuf.st_mode));
+
+ // mkdir <base>/a
+ snprintf(tmp, sizeof(tmp), "%s/a", base);
+ RETRY_ON_EINTR_GET_ERRNO(ret, mkdir(tmp, 0755));
+ EXPECT_ZERO(ret);
+
+ /* readdir test */
+ EXPECT_INT_EQ(1, testReadDir(base, expectDirs, DIRS_A_AND_B));
+
+ // mkdir <base>/b
+ snprintf(tmp, sizeof(tmp), "%s/b", base);
+ RETRY_ON_EINTR_GET_ERRNO(ret, mkdir(tmp, 0755));
+ EXPECT_ZERO(ret);
+
+ // readdir a and b
+ EXPECT_INT_EQ(2, testReadDir(base, expectDirs, DIRS_A_AND_B));
+
+ // rename a -> c
+ snprintf(src, sizeof(src), "%s/a", base);
+ snprintf(dst, sizeof(dst), "%s/c", base);
+ EXPECT_ZERO(rename(src, dst));
+
+ // readdir c and b
+ EXPECT_INT_EQ(-ENOENT, testReadDir(base, expectDirs, DIRS_A_AND_B));
+ EXPECT_INT_EQ(2, testReadDir(base, expectDirs, DIRS_B_AND_C));
+
+ // statvfs
+ memset(&stvBuf, 0, sizeof(stvBuf));
+ EXPECT_ZERO(statvfs(root, &stvBuf));
+
+ // set utime on base
+ memset(&tbuf, 0, sizeof(tbuf));
+ tbuf.actime = 123;
+ tbuf.modtime = 456;
+ EXPECT_ZERO(utime(base, &tbuf));
+
+ // stat(base)
+ EXPECT_ZERO(stat(base, &stBuf));
+ EXPECT_NONZERO(S_ISDIR(stBuf.st_mode));
+ //EXPECT_INT_EQ(456, stBuf.st_atime); // hdfs doesn't store atime on
directories
+ EXPECT_INT_EQ(456, stBuf.st_mtime);
+
+ // open some files and write to them
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ snprintf(tmp, sizeof(tmp), "%s/b/%d", base, i);
+ ctx[i].path = strdup(tmp);
+ if (!ctx[i].path) {
+ fprintf(stderr, "FUSE_WORKLOAD: OOM on line %d\n", __LINE__);
+ return -ENOMEM;
+ }
+ ctx[i].strLen = strlen(ctx[i].str);
+ }
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ ctx[i].fd = open(ctx[i].path, O_RDONLY);
+ if (ctx[i].fd >= 0) {
+ fprintf(stderr, "FUSE_WORKLOAD: Error: was able to open %s for read
before it "
+ "was created!\n", ctx[i].path);
+ return -EIO;
+ }
+ ctx[i].fd = creat(ctx[i].path, 0755);
+ if (ctx[i].fd < 0) {
+ fprintf(stderr, "FUSE_WORKLOAD: Failed to create file %s for writing!\n",
+ ctx[i].path);
+ return -EIO;
+ }
+ }
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ EXPECT_ZERO(safeWrite(ctx[i].fd, ctx[i].str, ctx[i].strLen));
+ }
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ RETRY_ON_EINTR_GET_ERRNO(ret, close(ctx[i].fd));
+ EXPECT_ZERO(ret);
+ ctx[i].fd = -1;
+ }
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ /* Bug: HDFS-2551.
+ * When a program writes a file, closes it, and immediately re-opens it,
+ * it might not appear to have the correct length. This is because FUSE
+ * invokes the release() callback asynchronously.
+ *
+ * To work around this, we keep retrying until the file length is what we
+ * expect.
+ */
+ for (try = 0; try < MAX_TRIES; try++) {
+ EXPECT_ZERO(stat(ctx[i].path, &stBuf));
+ EXPECT_NONZERO(S_ISREG(stBuf.st_mode));
+ if (ctx[i].strLen == stBuf.st_size) {
+ break;
+ }
+ sleepNoSig(1);
+ }
+ if (try == MAX_TRIES) {
+ fprintf(stderr, "FUSE_WORKLOAD: error: expected file %s to have length "
+ "%d; instead, it had length %lld\n",
+ ctx[i].path, ctx[i].strLen, (long long)stBuf.st_size);
+ return -EIO;
+ }
+ }
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ ctx[i].fd = open(ctx[i].path, O_RDONLY);
+ if (ctx[i].fd < 0) {
+ fprintf(stderr, "FUSE_WORKLOAD: Failed to open file %s for reading!\n",
+ ctx[i].path);
+ return -EIO;
+ }
+ }
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ tmpBuf = calloc(1, ctx[i].strLen);
+ if (!tmpBuf) {
+ fprintf(stderr, "FUSE_WORKLOAD: OOM on line %d\n", __LINE__);
+ return -ENOMEM;
+ }
+ EXPECT_INT_EQ(ctx[i].strLen, safeRead(ctx[i].fd, tmpBuf, ctx[i].strLen));
+ EXPECT_ZERO(memcmp(ctx[i].str, tmpBuf, ctx[i].strLen));
+ RETRY_ON_EINTR_GET_ERRNO(ret, close(ctx[i].fd));
+ ctx[i].fd = -1;
+ EXPECT_ZERO(ret);
+ free(tmpBuf);
+ }
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ EXPECT_ZERO(truncate(ctx[i].path, 0));
+ }
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ EXPECT_ZERO(stat(ctx[i].path, &stBuf));
+ EXPECT_NONZERO(S_ISREG(stBuf.st_mode));
+ EXPECT_INT_EQ(0, stBuf.st_size);
+ }
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ RETRY_ON_EINTR_GET_ERRNO(ret, unlink(ctx[i].path));
+ EXPECT_ZERO(ret);
+ }
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ EXPECT_NEGATIVE_ONE_WITH_ERRNO(stat(ctx[i].path, &stBuf), ENOENT);
+ }
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ free(ctx[i].path);
+ }
+ EXPECT_ZERO(recursiveDelete(base));
+ return 0;
+}
+
+int runFuseWorkload(const char *root, const char *pcomp)
+{
+ int ret, err;
+ size_t i;
+ char longStr[LONG_STR_LEN];
+ struct fileCtx ctx[NUM_FILE_CTX] = {
+ {
+ .fd = -1,
+ .str = "hello, world",
+ },
+ {
+ .fd = -1,
+ .str = "A",
+ },
+ {
+ .fd = -1,
+ .str = longStr,
+ },
+ };
+ for (i = 0; i < LONG_STR_LEN - 1; i++) {
+ longStr[i] = 'a' + (i % 10);
+ }
+ longStr[LONG_STR_LEN - 1] = '\0';
+
+ ret = runFuseWorkloadImpl(root, pcomp, ctx);
+ // Make sure all file descriptors are closed, or else we won't be able to
+ // unmount
+ for (i = 0; i < NUM_FILE_CTX; i++) {
+ if (ctx[i].fd >= 0) {
+ RETRY_ON_EINTR_GET_ERRNO(err, close(ctx[i].fd));
+ }
+ }
+ return ret;
+}
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.h
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.h?rev=1371231&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.h
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/fuse_workload.h
Thu Aug 9 14:44:05 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __FUSE_WORKLOAD_H__
+#define __FUSE_WORKLOAD_H__
+
+/**
+ * Perform some FUSE operations.
+ *
+ * The operations will be performed under <root>/<pcomp>.
+ * This directory should not exist prior to the test, and should not be used by
+ * any other tests concurrently.
+ *
+ * @param root The root directory for the testing.
+ * @param pcomp Path component to add to root
+ *
+ * @return 0 on success; negative error code otherwise
+ */
+int runFuseWorkload(const char *root, const char *pcomp);
+
+#endif
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/test_fuse_dfs.c
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/test_fuse_dfs.c?rev=1371231&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/test_fuse_dfs.c
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/fuse-dfs/test/test_fuse_dfs.c
Thu Aug 9 14:44:05 2012
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fuse-dfs/test/fuse_workload.h"
+#include "libhdfs/expect.h"
+#include "libhdfs/hdfs.h"
+#include "libhdfs/native_mini_dfs.h"
+#include "util/posix_util.h"
+
+#include <ctype.h>
+#include <errno.h>
+#include <libgen.h>
+#include <limits.h>
+#include <mntent.h>
+#include <semaphore.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+#include <time.h>
+#include <unistd.h>
+
+/** Exit status to return when there is an exec error.
+ *
+ * We assume that fusermount and fuse_dfs don't normally return this error
code.
+ */
+#define EXIT_STATUS_EXEC_ERROR 240
+
+/** Maximum number of times to try to unmount the fuse filesystem we created.
+ */
+#define MAX_UNMOUNT_TRIES 20
+
+/**
+ * Verify that the fuse workload works on the local FS.
+ *
+ * @return 0 on success; error code otherwise
+ */
+static int verifyFuseWorkload(void)
+{
+ char tempDir[PATH_MAX];
+
+ EXPECT_ZERO(createTempDir(tempDir, sizeof(tempDir), 0755));
+ EXPECT_ZERO(runFuseWorkload(tempDir, "test"));
+ EXPECT_ZERO(recursiveDelete(tempDir));
+
+ return 0;
+}
+
+static int fuserMount(int *procRet, ...) __attribute__((sentinel));
+
+/**
+ * Invoke the fusermount binary.
+ *
+ * @param retVal (out param) The return value from fusermount
+ *
+ * @return 0 on success; error code if the fork fails
+ */
+static int fuserMount(int *procRet, ...)
+{
+ int ret, status;
+ size_t i = 0;
+ char *args[64], *c, *env[] = { NULL };
+ va_list ap;
+ pid_t pid, pret;
+
+ args[i++] = "fusermount";
+ va_start(ap, procRet);
+ while (1) {
+ c = va_arg(ap, char *);
+ args[i++] = c;
+ if (!c)
+ break;
+ if (i > sizeof(args)/sizeof(args[0])) {
+ va_end(ap);
+ return -EINVAL;
+ }
+ }
+ va_end(ap);
+ pid = fork();
+ if (pid < 0) {
+ ret = errno;
+ fprintf(stderr, "FUSE_TEST: failed to fork: error %d: %s\n",
+ ret, strerror(ret));
+ return -ret;
+ } else if (pid == 0) {
+ if (execvpe("fusermount", args, env)) {
+ ret = errno;
+ fprintf(stderr, "FUSE_TEST: failed to execute fusermount: "
+ "error %d: %s\n", ret, strerror(ret));
+ exit(EXIT_STATUS_EXEC_ERROR);
+ }
+ }
+ pret = waitpid(pid, &status, 0);
+ if (pret != pid) {
+ ret = errno;
+ fprintf(stderr, "FUSE_TEST: failed to wait for pid %d: returned %d "
+ "(error %d: %s)\n", pid, pret, ret, strerror(ret));
+ return -ret;
+ }
+ if (WIFEXITED(status)) {
+ *procRet = WEXITSTATUS(status);
+ } else if (WIFSIGNALED(status)) {
+ fprintf(stderr, "FUSE_TEST: fusermount exited with signal %d\n",
+ WTERMSIG(status));
+ *procRet = -1;
+ } else {
+ fprintf(stderr, "FUSE_TEST: fusermount exited with unknown exit type\n");
+ *procRet = -1;
+ }
+ return 0;
+}
+
+static int isMounted(const char *mntPoint)
+{
+ int ret;
+ FILE *fp;
+ struct mntent *mntEnt;
+
+ fp = setmntent("/proc/mounts", "r");
+ if (!fp) {
+ ret = errno;
+ fprintf(stderr, "FUSE_TEST: isMounted(%s) failed to open /proc/mounts: "
+ "error %d: %s", mntPoint, ret, strerror(ret));
+ return -ret;
+ }
+ while ((mntEnt = getmntent(fp))) {
+ if (!strcmp(mntEnt->mnt_dir, mntPoint)) {
+ endmntent(fp);
+ return 1;
+ }
+ }
+ endmntent(fp);
+ return 0;
+}
+
+static int waitForMount(const char *mntPoint, int retries)
+{
+ int ret, try = 0;
+
+ while (try++ < retries) {
+ ret = isMounted(mntPoint);
+ if (ret < 0) {
+ fprintf(stderr, "FUSE_TEST: waitForMount(%s, %d): isMounted returned "
+ "error %d\n", mntPoint, retries, ret);
+ } else if (ret == 1) {
+ return 0;
+ }
+ sleepNoSig(2);
+ }
+ return -ETIMEDOUT;
+}
+
+/**
+ * Try to unmount the fuse filesystem we mounted.
+ *
+ * Normally, only one try should be sufficient to unmount the filesystem.
+ * However, if our tests needs to exit right after starting the fuse_dfs
process,
+ * the fuse_dfs process might not have gotten around to mounting itself yet.
The
+ * retry loop in this function ensures that we always unmount FUSE before
+ * exiting, rather than leaving around a zombie fuse_dfs.
+ *
+ * @param mntPoint Where the FUSE filesystem is mounted
+ * @return 0 on success; error code otherwise
+ */
+static int cleanupFuse(const char *mntPoint)
+{
+ int ret, pret, tries = 0;
+
+ while (1) {
+ ret = fuserMount(&pret, "-u", mntPoint, NULL);
+ if (ret) {
+ fprintf(stderr, "FUSE_TEST: unmountFuse: failed to invoke fuserMount: "
+ "error %d\n", ret);
+ return ret;
+ }
+ if (pret == 0) {
+ fprintf(stderr, "FUSE_TEST: successfully unmounted FUSE filesystem.\n");
+ return 0;
+ }
+ if (tries++ > MAX_UNMOUNT_TRIES) {
+ return -EIO;
+ }
+ fprintf(stderr, "FUSE_TEST: retrying unmount in 2 seconds...\n");
+ sleepNoSig(2);
+ }
+}
+
+/**
+ * Create a fuse_dfs process using the miniDfsCluster we set up.
+ *
+ * @param argv0 argv[0], as passed into main
+ * @param cluster The NativeMiniDfsCluster to connect to
+ * @param mntPoint The mount point
+ * @param pid (out param) the fuse_dfs process
+ *
+ * @return 0 on success; error code otherwise
+ */
+static int spawnFuseServer(const char *argv0,
+ const struct NativeMiniDfsCluster *cluster, const char *mntPoint,
+ pid_t *pid)
+{
+ int ret, procRet;
+ char scratch[PATH_MAX], *dir, fusePath[PATH_MAX], portOpt[128];
+
+ snprintf(scratch, sizeof(scratch), "%s", argv0);
+ dir = dirname(scratch);
+ snprintf(fusePath, sizeof(fusePath), "%s/fuse_dfs", dir);
+ if (access(fusePath, X_OK)) {
+ fprintf(stderr, "FUSE_TEST: spawnFuseServer: failed to find fuse_dfs "
+ "binary at %s!\n", fusePath);
+ return -ENOENT;
+ }
+ /* Let's make sure no other FUSE filesystem is mounted at mntPoint */
+ ret = fuserMount(&procRet, "-u", mntPoint, NULL);
+ if (ret) {
+ fprintf(stderr, "FUSE_TEST: fuserMount -u %s failed with error %d\n",
+ mntPoint, ret);
+ return -EIO;
+ }
+ if (procRet == EXIT_STATUS_EXEC_ERROR) {
+ fprintf(stderr, "FUSE_TEST: fuserMount probably could not be executed\n");
+ return -EIO;
+ }
+ /* fork and exec the fuse_dfs process */
+ *pid = fork();
+ if (*pid < 0) {
+ ret = errno;
+ fprintf(stderr, "FUSE_TEST: spawnFuseServer: failed to fork: "
+ "error %d: %s\n", ret, strerror(ret));
+ return -ret;
+ } else if (*pid == 0) {
+ snprintf(portOpt, sizeof(portOpt), "-oport=%d",
+ nmdGetNameNodePort(cluster));
+ if (execl(fusePath, fusePath, "-obig_writes", "-oserver=hdfs://localhost",
+ portOpt, "-onopermissions", "-ononempty", "-oinitchecks",
+ mntPoint, "-f", NULL)) {
+ ret = errno;
+ fprintf(stderr, "FUSE_TEST: spawnFuseServer: failed to execv %s: "
+ "error %d: %s\n", fusePath, ret, strerror(ret));
+ exit(EXIT_STATUS_EXEC_ERROR);
+ }
+ }
+ return 0;
+}
+
+/**
+ * Test that we can start up fuse_dfs and do some stuff.
+ */
+int main(int argc, char **argv)
+{
+ int ret, pret, status;
+ pid_t fusePid;
+ const char *mntPoint;
+ char mntTmp[PATH_MAX] = "";
+ struct NativeMiniDfsCluster* tlhCluster;
+ struct NativeMiniDfsConf conf = {
+ .doFormat = 1,
+ };
+
+ mntPoint = getenv("TLH_FUSE_MNT_POINT");
+ if (!mntPoint) {
+ if (createTempDir(mntTmp, sizeof(mntTmp), 0755)) {
+ fprintf(stderr, "FUSE_TEST: failed to create temporary directory for "
+ "fuse mount point.\n");
+ ret = EXIT_FAILURE;
+ goto done;
+ }
+ fprintf(stderr, "FUSE_TEST: creating mount point at '%s'\n", mntTmp);
+ mntPoint = mntTmp;
+ }
+ if (verifyFuseWorkload()) {
+ fprintf(stderr, "FUSE_TEST: failed to verify fuse workload on "
+ "local FS.\n");
+ ret = EXIT_FAILURE;
+ goto done_rmdir;
+ }
+ tlhCluster = nmdCreate(&conf);
+ if (!tlhCluster) {
+ ret = EXIT_FAILURE;
+ goto done_rmdir;
+ }
+ if (nmdWaitClusterUp(tlhCluster)) {
+ ret = EXIT_FAILURE;
+ goto done_nmd_shutdown;
+ }
+ ret = spawnFuseServer(argv[0], tlhCluster, mntPoint, &fusePid);
+ if (ret) {
+ fprintf(stderr, "FUSE_TEST: spawnFuseServer failed with error "
+ "code %d\n", ret);
+ ret = EXIT_FAILURE;
+ goto done_nmd_shutdown;
+ }
+ ret = waitForMount(mntPoint, 20);
+ if (ret) {
+ fprintf(stderr, "FUSE_TEST: waitForMount(%s) failed with error "
+ "code %d\n", mntPoint, ret);
+ cleanupFuse(mntPoint);
+ ret = EXIT_FAILURE;
+ goto done_nmd_shutdown;
+ }
+ ret = runFuseWorkload(mntPoint, "test");
+ if (ret) {
+ fprintf(stderr, "FUSE_TEST: runFuseWorkload failed with error "
+ "code %d\n", ret);
+ cleanupFuse(mntPoint);
+ ret = EXIT_FAILURE;
+ goto done_nmd_shutdown;
+ }
+ if (cleanupFuse(mntPoint)) {
+ fprintf(stderr, "FUSE_TEST: fuserMount -u %s failed with error "
+ "code %d\n", mntPoint, ret);
+ ret = EXIT_FAILURE;
+ goto done_nmd_shutdown;
+ }
+ alarm(120);
+ pret = waitpid(fusePid, &status, 0);
+ if (pret != fusePid) {
+ ret = errno;
+ fprintf(stderr, "FUSE_TEST: failed to wait for fusePid %d: "
+ "returned %d: error %d (%s)\n",
+ fusePid, pret, ret, strerror(ret));
+ ret = EXIT_FAILURE;
+ goto done_nmd_shutdown;
+ }
+ if (WIFEXITED(status)) {
+ ret = WEXITSTATUS(status);
+ if (ret) {
+ fprintf(stderr, "FUSE_TEST: fuse exited with failure status "
+ "%d!\n", ret);
+ ret = EXIT_FAILURE;
+ goto done_nmd_shutdown;
+ }
+ } else if (WIFSIGNALED(status)) {
+ ret = WTERMSIG(status);
+ if (ret != SIGTERM) {
+ fprintf(stderr, "FUSE_TEST: fuse exited with unexpected "
+ "signal %d!\n", ret);
+ ret = EXIT_FAILURE;
+ goto done_nmd_shutdown;
+ }
+ } else {
+ fprintf(stderr, "FUSE_TEST: fusermount exited with unknown exit type\n");
+ ret = EXIT_FAILURE;
+ goto done_nmd_shutdown;
+ }
+ ret = EXIT_SUCCESS;
+
+done_nmd_shutdown:
+ EXPECT_ZERO(nmdShutdown(tlhCluster));
+ nmdFree(tlhCluster);
+done_rmdir:
+ if (mntTmp[0]) {
+ rmdir(mntTmp);
+ }
+done:
+ if (ret == EXIT_SUCCESS) {
+ fprintf(stderr, "FUSE_TEST: SUCCESS.\n");
+ } else {
+ fprintf(stderr, "FUSE_TEST: FAILURE!\n");
+ }
+ return ret;
+}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h?rev=1371231&r1=1371230&r2=1371231&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
Thu Aug 9 14:44:05 2012
@@ -78,7 +78,7 @@
do { \
int __my_ret__ = x; \
int __my_errno__ = errno; \
- if (__my_ret__) { \
+ if (!__my_ret__) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
"code %d (errno: %d): got zero from %s\n", __LINE__, \
__my_ret__, __my_errno__, #x); \
@@ -98,4 +98,23 @@
} \
} while (0);
+#define EXPECT_INT_EQ(x, y) \
+ do { \
+ int __my_ret__ = y; \
+ int __my_errno__ = errno; \
+ if (__my_ret__ != (x)) { \
+ fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+ "code %d (errno: %d): expected %d\n", \
+ __LINE__, __my_ret__, __my_errno__, (x)); \
+ return -1; \
+ } \
+ } while (0);
+
+#define RETRY_ON_EINTR_GET_ERRNO(ret, expr) do { \
+ ret = expr; \
+ if (!ret) \
+ break; \
+ ret = -errno; \
+ } while (ret == -EINTR);
+
#endif
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c?rev=1371231&r1=1371230&r2=1371231&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c
Thu Aug 9 14:44:05 2012
@@ -156,7 +156,7 @@ int nmdWaitClusterUp(struct NativeMiniDf
return 0;
}
-int nmdGetNameNodePort(struct NativeMiniDfsCluster *cl)
+int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl)
{
JNIEnv *env = getJNIEnv();
jvalue jVal;
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h?rev=1371231&r1=1371230&r2=1371231&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h
Thu Aug 9 14:44:05 2012
@@ -76,6 +76,5 @@ void nmdFree(struct NativeMiniDfsCluster
*
* @return the port, or a negative error code
*/
-int nmdGetNameNodePort(struct NativeMiniDfsCluster *cl);
-
+int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
#endif
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.c
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.c?rev=1371231&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.c
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.c
Thu Aug 9 14:44:05 2012
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "util/posix_util.h"
+
+#include <dirent.h>
+#include <errno.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+static pthread_mutex_t gTempdirLock = PTHREAD_MUTEX_INITIALIZER;
+
+static int gTempdirNonce = 0;
+
+int recursiveDeleteContents(const char *path)
+{
+ int ret;
+ DIR *dp;
+ struct dirent *de;
+ char tmp[PATH_MAX];
+
+ dp = opendir(path);
+ if (!dp) {
+ ret = -errno;
+ fprintf(stderr, "recursiveDelete(%s) failed with error %d\n", path, ret);
+ return ret;
+ }
+ while (1) {
+ de = readdir(dp);
+ if (!de) {
+ ret = 0;
+ break;
+ }
+ if ((de->d_name[0] == '.') && (de->d_name[1] == '\0'))
+ continue;
+ if ((de->d_name[0] == '.') && (de->d_name[1] == '.') &&
+ (de->d_name[2] == '\0'))
+ continue;
+ snprintf(tmp, sizeof(tmp), "%s/%s", path, de->d_name);
+ ret = recursiveDelete(tmp);
+ if (ret)
+ break;
+ }
+ if (closedir(dp)) {
+ ret = -errno;
+ fprintf(stderr, "recursiveDelete(%s): closedir failed with "
+ "error %d\n", path, ret);
+ return ret;
+ }
+ return ret;
+}
+
+/*
+ * Simple recursive delete implementation.
+ * It could be optimized, but there is no need at the moment.
+ * TODO: use fstat, etc.
+ */
+int recursiveDelete(const char *path)
+{
+ int ret;
+ struct stat stBuf;
+
+ ret = stat(path, &stBuf);
+ if (ret != 0) {
+ ret = -errno;
+ fprintf(stderr, "recursiveDelete(%s): stat failed with "
+ "error %d\n", path, ret);
+ return ret;
+ }
+ if (S_ISDIR(stBuf.st_mode)) {
+ ret = recursiveDeleteContents(path);
+ if (ret)
+ return ret;
+ ret = rmdir(path);
+ if (ret) {
+ ret = errno;
+ fprintf(stderr, "recursiveDelete(%s): rmdir failed with error %d\n",
+ path, ret);
+ return ret;
+ }
+ } else {
+ ret = unlink(path);
+ if (ret) {
+ ret = -errno;
+ fprintf(stderr, "recursiveDelete(%s): unlink failed with "
+ "error %d\n", path, ret);
+ return ret;
+ }
+ }
+ return 0;
+}
+
+int createTempDir(char *tempDir, int nameMax, int mode)
+{
+ char tmp[PATH_MAX];
+ int pid, nonce;
+ const char *base = getenv("TMPDIR");
+ if (!base)
+ base = "/tmp";
+ if (base[0] != '/') {
+ // canonicalize non-absolute TMPDIR
+ if (realpath(base, tmp) == NULL) {
+ return -errno;
+ }
+ base = tmp;
+ }
+ pid = getpid();
+ pthread_mutex_lock(&gTempdirLock);
+ nonce = gTempdirNonce++;
+ pthread_mutex_unlock(&gTempdirLock);
+ snprintf(tempDir, nameMax, "%s/temp.%08d.%08d", base, pid, nonce);
+ if (mkdir(tempDir, mode) == -1) {
+ int ret = errno;
+ return -ret;
+ }
+ return 0;
+}
+
+void sleepNoSig(int sec)
+{
+ int ret;
+ struct timespec req, rem;
+
+ rem.tv_sec = sec;
+ rem.tv_nsec = 0;
+ do {
+ req = rem;
+ ret = nanosleep(&req, &rem);
+ } while ((ret == -1) && (errno == EINTR));
+ if (ret == -1) {
+ ret = errno;
+ fprintf(stderr, "nanosleep error %d (%s)\n", ret, strerror(ret));
+ }
+}
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.h
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.h?rev=1371231&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.h
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native/util/posix_util.h
Thu Aug 9 14:44:05 2012
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __POSIX_UTIL_H__
+#define __POSIX_UTIL_H__
+
+/**
+ * Recursively delete the contents of a directory
+ *
+ * @param path directory whose contents we should delete
+ *
+ * @return 0 on success; error code otherwise
+ */
+int recursiveDeleteContents(const char *path);
+
+/**
+ * Recursively delete a local path, using unlink or rmdir as appropriate.
+ *
+ * @param path path to delete
+ *
+ * @return 0 on success; error code otherwise
+ */
+int recursiveDelete(const char *path);
+
+/**
+ * Get a temporary directory
+ *
+ * @param tempDir (out param) path to the temporary directory
+ * @param nameMax Length of the tempDir buffer
+ * @param mode Mode to create with
+ *
+ * @return 0 on success; error code otherwise
+ */
+int createTempDir(char *tempDir, int nameMax, int mode);
+
+/**
+ * Sleep without using signals
+ *
+ * @param sec Number of seconds to sleep
+ */
+void sleepNoSig(int sec);
+
+#endif