Author: cmccabe
Date: Thu Aug 21 23:10:49 2014
New Revision: 1619622

URL: http://svn.apache.org/r1619622
Log:
HADOOP-10985. native client: split ndfs.c into meta, file, util, and permission 
(cmccabe)

Added:
    
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/file.c
    
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/file.h
    
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/meta.c
    
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/meta.h
    
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/ops.c
    
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/permission.c
    
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/permission.h
    
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/util.c
    
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/util.h
Removed:
    
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/ndfs.c
Modified:
    hadoop/common/branches/HADOOP-10388/hadoop-native-core/CHANGES.txt
    
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/CMakeLists.txt

Modified: hadoop/common/branches/HADOOP-10388/hadoop-native-core/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/CHANGES.txt?rev=1619622&r1=1619621&r2=1619622&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-native-core/CHANGES.txt 
(original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-native-core/CHANGES.txt Thu Aug 
21 23:10:49 2014
@@ -1,5 +1,7 @@
 HADOOP-10388 Change Log
 
+    HADOOP-10985: native client: split ndfs.c into meta, file, util, and 
permission (cmccabe)
+
     HADOOP-10877. native client: implement hdfsMove and hdfsCopy (cmccabe)
 
     HADOOP-10725. Implement listStatus and getFileInfo in the native client 
(cmccabe)

Modified: 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/CMakeLists.txt?rev=1619622&r1=1619621&r2=1619622&view=diff
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/CMakeLists.txt
 (original)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/CMakeLists.txt
 Thu Aug 21 23:10:49 2014
@@ -113,7 +113,11 @@ set(FS_SRCS
     fs/common.c
     fs/copy.c
     fs/fs.c
-    ndfs/ndfs.c
+    ndfs/file.c
+    ndfs/meta.c
+    ndfs/ops.c
+    ndfs/permission.c
+    ndfs/util.c
     ${HDFS_PROTOBUF_SRCS}
 )
 

Added: 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/file.c
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/file.c?rev=1619622&view=auto
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/file.c
 (added)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/file.c
 Thu Aug 21 23:10:49 2014
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/hadoop_err.h"
+#include "common/hconf.h"
+#include "common/net.h"
+#include "common/string.h"
+#include "common/uri.h"
+#include "fs/common.h"
+#include "fs/fs.h"
+#include "ndfs/meta.h"
+#include "ndfs/util.h"
+#include "rpc/messenger.h"
+#include "rpc/proxy.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
+#include <uv.h>
+
+/**
+ * Set if the file is read-only... otherwise, the file is assumed to be
+ * write-only.
+ */
+#define NDFS_FILE_FLAG_RO                       0x1
+
+/** This flag is for compatibility with some old test harnesses. */
+#define NDFS_FILE_FLAG_DISABLE_DIRECT_READ      0x2
+
+/** Base class for both read-only and write-only files. */
+struct native_file_base {
+    /** Fields common to all filesystems. */
+    struct hadoop_file_base base;
+
+    /** NDFS file flags. */
+    int flags;
+};
+
+/** A read-only file. */
+struct native_ro_file {
+    struct native_file_base base;
+    uint64_t bytes_read;
+};
+
+/** A write-only file. */
+struct native_wo_file {
+    struct native_file_base base;
+};
+
+int ndfs_file_is_open_for_read(hdfsFile bfile)
+{
+    struct native_file_base *file = (struct native_file_base *)bfile;
+    return !!(file->flags & NDFS_FILE_FLAG_RO);
+}
+
+int ndfs_file_is_open_for_write(hdfsFile bfile)
+{
+    struct native_file_base *file = (struct native_file_base *)bfile;
+    return !(file->flags & NDFS_FILE_FLAG_RO);
+}
+
+int ndfs_file_get_read_statistics(hdfsFile bfile,
+            struct hdfsReadStatistics **out)
+{
+    struct hdfsReadStatistics *stats;
+    struct native_ro_file *file = (struct native_ro_file *)bfile;
+
+    if (!(file->base.flags & NDFS_FILE_FLAG_RO)) {
+        errno = EINVAL;
+        return -1;
+    }
+    stats = calloc(1, sizeof(*stats));
+    if (!stats) {
+        errno = ENOMEM; 
+        return -1;
+    }
+    stats->totalBytesRead = file->bytes_read;
+    *out = stats;
+    return 0;
+}
+
+static struct hadoop_err *ndfs_open_file_for_read(
+        struct native_ro_file **out __attribute__((unused)),
+        struct native_fs *fs __attribute__((unused)),
+        const char *uri __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return NULL;
+}
+
+static struct hadoop_err *ndfs_open_file_for_write(
+        struct native_ro_file **out __attribute__((unused)),
+        struct native_fs *fs __attribute__((unused)),
+        const char *uri __attribute__((unused)),
+        int buffer_size __attribute__((unused)),
+        short replication __attribute__((unused)),
+        tSize block_size __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return NULL;
+}
+
+hdfsFile ndfs_open_file(hdfsFS bfs, const char* uri, int flags, 
+                      int buffer_size, short replication, tSize block_size)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct native_ro_file *file = NULL;
+    struct hadoop_err *err;
+    int accmode;
+    char *path = NULL;
+
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    accmode = flags & O_ACCMODE;
+    if (accmode == O_RDONLY) {
+        err = ndfs_open_file_for_read(&file, fs, path);
+    } else if (accmode == O_WRONLY) {
+        err = ndfs_open_file_for_write(&file, fs, path,
+                        buffer_size, replication, block_size);
+    } else {
+        err = hadoop_lerr_alloc(EINVAL, "cannot open a hadoop file in "
+                "mode 0x%x\n", accmode);
+    }
+done:
+    free(path);
+    return hadoopfs_errno_and_retptr(err, file);
+}
+
+int ndfs_close_file(hdfsFS fs __attribute__((unused)),
+                    hdfsFile bfile __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+int ndfs_seek(hdfsFS bfs __attribute__((unused)),
+              hdfsFile bfile __attribute__((unused)),
+              tOffset desiredPos __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+tOffset ndfs_tell(hdfsFS bfs __attribute__((unused)),
+                  hdfsFile bfile __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+tSize ndfs_read(hdfsFS bfs __attribute__((unused)),
+                       hdfsFile bfile __attribute__((unused)),
+                       void *buffer __attribute__((unused)),
+                       tSize length __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+tSize ndfs_pread(hdfsFS bfs __attribute__((unused)),
+            hdfsFile bfile __attribute__((unused)),
+            tOffset position __attribute__((unused)),
+            void* buffer __attribute__((unused)),
+            tSize length __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+tSize ndfs_write(hdfsFS bfs __attribute__((unused)),
+            hdfsFile bfile __attribute__((unused)),
+            const void* buffer __attribute__((unused)),
+            tSize length __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+int ndfs_flush(hdfsFS bfs __attribute__((unused)),
+               hdfsFile bfile __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+int ndfs_hflush(hdfsFS bfs __attribute__((unused)),
+                hdfsFile bfile __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+int ndfs_hsync(hdfsFS bfs __attribute__((unused)),
+               hdfsFile bfile __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+int ndfs_available(hdfsFS bfs __attribute__((unused)),
+                   hdfsFile bfile __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return -1;
+}
+
+struct hadoopRzBuffer* ndfs_read_zero(
+            hdfsFile bfile __attribute__((unused)),
+            struct hadoopRzOptions *opts __attribute__((unused)),
+            int32_t maxLength __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return NULL;
+}
+
+void ndfs_rz_buffer_free(hdfsFile bfile __attribute__((unused)),
+                struct hadoopRzBuffer *buffer __attribute__((unused)))
+{
+}
+
+int ndfs_file_uses_direct_read(hdfsFile bfile)
+{
+    // Set the 'disable direct reads' flag so that old test harnesses designed
+    // to test jniFS will run against NDFS.  The flag doesn't do anything,
+    // since all reads are always direct in NDFS.
+    struct native_file_base *file = (struct native_file_base *)bfile;
+    return (!(file->flags & NDFS_FILE_FLAG_DISABLE_DIRECT_READ));
+}
+
+void ndfs_file_disable_direct_read(hdfsFile bfile __attribute__((unused)))
+{
+    struct native_file_base *file = (struct native_file_base *)bfile;
+    file->flags |= NDFS_FILE_FLAG_DISABLE_DIRECT_READ;
+}
+
+char***
+ndfs_get_hosts(hdfsFS bfs __attribute__((unused)),
+               const char* path __attribute__((unused)),
+               tOffset start __attribute__((unused)),
+               tOffset length __attribute__((unused)))
+{
+    errno = ENOTSUP;
+    return NULL;
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/file.h
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/file.h?rev=1619622&view=auto
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/file.h
 (added)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/file.h
 Thu Aug 21 23:10:49 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HADOOP_CORE_NDFS_FILE_H
+#define HADOOP_CORE_NDFS_FILE_H
+
+#include "fs/hdfs.h"
+
+#include <stdint.h>
+
+int ndfs_file_is_open_for_read(hdfsFile bfile);
+int ndfs_file_is_open_for_write(hdfsFile bfile);
+int ndfs_file_get_read_statistics(hdfsFile bfile,
+            struct hdfsReadStatistics **out);
+hdfsFile ndfs_open_file(hdfsFS bfs, const char* uri, int flags, 
+                      int buffer_size, short replication, tSize block_size);
+int ndfs_close_file(hdfsFS fs, hdfsFile bfile);
+int ndfs_seek(hdfsFS bfs, hdfsFile bfile, tOffset desiredPos);
+int64_t ndfs_tell(hdfsFS bfs, hdfsFile bfile);
+tSize ndfs_read(hdfsFS bfs, hdfsFile bfile, void *buffer, tSize length);
+tSize ndfs_pread(hdfsFS bfs, hdfsFile bfile, tOffset position,
+                 void* buffer, tSize length);
+tSize ndfs_write(hdfsFS bfs, hdfsFile bfile, const void* buffer, tSize length);
+int ndfs_flush(hdfsFS bfs, hdfsFile bfile);
+int ndfs_hflush(hdfsFS bfs, hdfsFile bfile);
+int ndfs_hsync(hdfsFS bfs, hdfsFile bfile);
+int ndfs_available(hdfsFS bfs, hdfsFile bfile);
+struct hadoopRzBuffer* ndfs_read_zero(hdfsFile bfile,
+            struct hadoopRzOptions *opts, int32_t maxLength);
+void ndfs_rz_buffer_free(hdfsFile bfile, struct hadoopRzBuffer *buffer);
+int ndfs_file_uses_direct_read(hdfsFile bfile);
+void ndfs_file_disable_direct_read(hdfsFile bfile);
+char*** ndfs_get_hosts(hdfsFS bfs, const char* path,
+                       tOffset start, tOffset length);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et

Added: 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/meta.c
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/meta.c?rev=1619622&view=auto
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/meta.c
 (added)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/meta.c
 Thu Aug 21 23:10:49 2014
@@ -0,0 +1,915 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/hadoop_err.h"
+#include "common/hconf.h"
+#include "common/net.h"
+#include "common/string.h"
+#include "common/uri.h"
+#include "fs/common.h"
+#include "fs/fs.h"
+#include "ndfs/meta.h"
+#include "ndfs/permission.h"
+#include "ndfs/util.h"
+#include "protobuf/ClientNamenodeProtocol.call.h"
+#include "protobuf/hdfs.pb-c.h.s"
+#include "rpc/messenger.h"
+#include "rpc/proxy.h"
+
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
+#include <uv.h>
+
+#define DEFAULT_NN_PORT 8020
+
+#define FS_PERMISSIONS_UMASK_KEY "fs.permissions.umask-mode"
+
+#define FS_PERMISSIONS_UMASK_DEFAULT "022"
+
+#define DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL \
+    "dfs.client.write.exclude.nodes.cache.expiry.interval.millis"
+
+#define DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT \
+    (10LL * 60LL * 1000LL)
+
+/** Whole-filesystem stats sent back from the NameNode. */
+struct hadoop_vfs_stats {
+    int64_t capacity;
+    int64_t used;
+    int64_t remaining;
+    int64_t under_replicated;
+    int64_t corrupt_blocks;
+    int64_t missing_blocks;
+};
+
+/** Server defaults sent back from the NameNode. */
+struct ndfs_server_defaults {
+    uint64_t blocksize;
+};
+
+static struct hadoop_err *ndfs_connect_setup_conf(struct native_fs *fs,
+                                                  struct hconf *hconf);
+
+static struct hadoop_err *populate_file_info(struct file_info *info,
+                    HdfsFileStatusProto *status, const char *prefix);
+
+static void ndfs_free(struct native_fs *fs);
+
+static struct hadoop_err *ndfs_get_server_defaults(struct native_fs *fs,
+            struct ndfs_server_defaults *defaults)
+{
+    struct hadoop_err *err = NULL;
+    GetServerDefaultsRequestProto req =
+        GET_SERVER_DEFAULTS_REQUEST_PROTO__INIT;
+    GetServerDefaultsResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = cnn_get_server_defaults(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    defaults->blocksize = resp->serverdefaults->blocksize;
+
+done:
+    if (resp) {
+        get_server_defaults_response_proto__free_unpacked(resp, NULL);
+    }
+    return err;
+}
+
+/**
+ * Parse an address in the form <hostname> or <hostname>:<port>.
+ *
+ * @param host          The hostname
+ * @param addr          (out param) The sockaddr.
+ * @param default_port  The default port to use, if one is not found in the
+ *                          string.
+ *
+ * @return              NULL on success; the error otherwise.
+ */
+static struct hadoop_err *parse_rpc_addr(const char *input,
+            struct sockaddr_in *out, int default_port)
+{
+    struct hadoop_err *err = NULL;
+    char *host, *colon;
+    uint32_t addr;
+    int port;
+
+    fprintf(stderr, "parse_rpc_addr(input=%s, default_port=%d)\n",
+            input, default_port);
+
+    // If the URI doesn't contain a port, we use a default.
+    // This may come either from the hdfsBuilder, or from the
+    // 'default default' for HDFS.
+    // It's kind of silly that hdfsBuilder even includes this field, since this
+    // information should just be included in the URI, but this is here for
+    // compatibility.
+    port = (default_port <= 0) ? DEFAULT_NN_PORT : default_port;
+    host = strdup(input);
+    if (!host) {
+        err = hadoop_lerr_alloc(ENOMEM, "parse_rpc_addr: OOM");
+        goto done;
+    }
+    colon = index(host, ':');
+    if (colon) {
+        // If the URI has a colon, we parse the next part as a port.
+        char *port_str = colon + 1;
+        *colon = '\0';
+        port = atoi(colon);
+        if ((port <= 0) || (port >= 65536)) {
+            err = hadoop_lerr_alloc(EINVAL, "parse_rpc_addr: invalid port "
+                                    "string %s", port_str);
+            goto done;
+        }
+    }
+    err = get_first_ipv4_addr(host, &addr);
+    if (err)
+        goto done;
+    out->sin_family = AF_INET;
+    out->sin_port = htons(port);
+    out->sin_addr.s_addr = htonl(addr);
+done:
+    free(host);
+    return err;
+}
+
+static struct hadoop_err *get_namenode_addr(const struct hadoop_uri *conn_uri,
+        const struct hdfsBuilder *hdfs_bld, struct sockaddr_in *nn_addr)
+{
+    const char *nameservice_id;
+    const char *rpc_addr;
+
+    nameservice_id = hconf_get(hdfs_bld->hconf, "dfs.nameservice.id");
+    if (nameservice_id) {
+        return hadoop_lerr_alloc(ENOTSUP, "get_namenode_addr: we "
+                "don't yet support HA or federated configurations.");
+    }
+    rpc_addr = hconf_get(hdfs_bld->hconf, "dfs.namenode.rpc-address");
+    if (rpc_addr) {
+        return parse_rpc_addr(rpc_addr, nn_addr, hdfs_bld->port);
+    }
+    return parse_rpc_addr(conn_uri->auth, nn_addr, hdfs_bld->port);
+}
+
+struct hadoop_err *ndfs_connect(struct hdfsBuilder *hdfs_bld,
+                                struct hdfs_internal **out)
+{
+    struct hadoop_err *err = NULL;
+    struct native_fs *fs = NULL;
+    struct hrpc_messenger_builder *msgr_bld;
+    struct ndfs_server_defaults defaults;
+    int used_port;
+    char *working_dir = NULL;
+
+    fs = calloc(1, sizeof(*fs));
+    if (!fs) {
+        err = hadoop_lerr_alloc(ENOMEM, "failed to allocate space "
+                                "for a native_fs structure.");
+        goto done;
+    }
+    fs->base.ty = HADOOP_FS_TY_NDFS;
+    fs->conn_uri = hdfs_bld->uri;
+    hdfs_bld->uri = NULL;
+    // Calculate our url_prefix.  We'll need this when spitting out URIs from
+    // listStatus and getFileInfo.  We don't include the port in this URL
+    // prefix unless it is non-standard.
+    used_port = ntohs(fs->nn_addr.sin_port);
+    if (used_port == DEFAULT_NN_PORT) {
+        err = dynprintf(&fs->url_prefix, "%s://%s",
+                     fs->conn_uri->scheme, fs->conn_uri->auth);
+        if (err)
+            goto done;
+    } else {
+        err = dynprintf(&fs->url_prefix, "%s://%s:%d",
+                     fs->conn_uri->scheme, fs->conn_uri->auth,
+                     used_port);
+        if (err)
+            goto done;
+    }
+    msgr_bld = hrpc_messenger_builder_alloc();
+    if (!msgr_bld) {
+        err = hadoop_lerr_alloc(ENOMEM, "failed to allocate space "
+                                "for a messenger builder.");
+        goto done;
+    }
+    err = get_namenode_addr(fs->conn_uri, hdfs_bld, &fs->nn_addr);
+    if (err)
+        goto done;
+    err = hrpc_messenger_create(msgr_bld, &fs->msgr);
+    if (err)
+        goto done;
+    // Get the default working directory
+    if (uv_mutex_init(&fs->working_uri_lock) < 0) {
+        err = hadoop_lerr_alloc(ENOMEM, "failed to create a mutex.");
+        goto done;
+    }
+    err = dynprintf(&working_dir, "%s:///user/%s/",
+                 fs->conn_uri->scheme, fs->conn_uri->user_info);
+    if (err) {
+        uv_mutex_destroy(&fs->working_uri_lock);
+        goto done;
+    }
+    err = hadoop_uri_parse(working_dir, NULL, &fs->working_uri,
+                           H_URI_PARSE_ALL | H_URI_APPEND_SLASH);
+    if (err) {
+        uv_mutex_destroy(&fs->working_uri_lock);
+        err = hadoop_err_prepend(err, 0, "ndfs_connect: error parsing "
+                                "working directory");
+        goto done;
+    }
+    err = ndfs_connect_setup_conf(fs, hdfs_bld->hconf);
+    if (err)
+        goto done;
+
+    // Ask the NameNode about our server defaults.  We'll use this information
+    // later in ndfs_get_default_block_size, and when writing new files.  Just
+    // as important, tghis validates that we can talk to the NameNode with our
+    // current configuration.
+    memset(&defaults, 0, sizeof(defaults));
+    err = ndfs_get_server_defaults(fs, &defaults);
+    if (err)
+        goto done;
+    fs->default_block_size = defaults.blocksize;
+    err = NULL;
+
+done:
+    free(working_dir);
+    if (err) {
+        ndfs_free(fs);
+        return err; 
+    }
+    *out = (struct hdfs_internal *)fs;
+    return NULL; 
+}
+
+/**
+ * Configure the native file system using the Hadoop configuration.
+ *
+ * @param fs            The filesystem to set configuration keys for.
+ * @param hconf         The configuration object to read from.
+ */
+static struct hadoop_err *ndfs_connect_setup_conf(struct native_fs *fs,
+                                                  struct hconf *hconf)
+{
+    struct hadoop_err *err = NULL;
+    const char *umask_str;
+    int64_t timeout_ms;
+
+    umask_str = hconf_get(hconf, FS_PERMISSIONS_UMASK_KEY);
+    if (!umask_str)
+        umask_str = FS_PERMISSIONS_UMASK_DEFAULT;
+    err = parse_permission(umask_str, &fs->umask);
+    if (err) {
+        return hadoop_err_prepend(err, 0, "ndfs_connect_setup_conf: "
+                "error handling %s", FS_PERMISSIONS_UMASK_DEFAULT);
+    }
+
+    timeout_ms = DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
+    hconf_get_int64(hconf,
+            DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL, &timeout_ms);
+    fs->dead_dn_timeout_ns = timeout_ms * 1000LL;
+    return NULL;
+}
+
+static void ndfs_free(struct native_fs *fs)
+{
+    if (fs->msgr) {
+        hrpc_messenger_shutdown(fs->msgr);
+        hrpc_messenger_free(fs->msgr);
+    }
+    free(fs->url_prefix);
+    hadoop_uri_free(fs->conn_uri);
+    if (fs->working_uri) {
+        hadoop_uri_free(fs->working_uri);
+        uv_mutex_destroy(&fs->working_uri_lock);
+    }
+    free(fs);
+}
+
+int ndfs_disconnect(hdfsFS bfs)
+{
+    struct native_fs *fs = (struct native_fs*)bfs;
+
+    ndfs_free(fs);
+    return 0;
+}
+
+int ndfs_file_exists(hdfsFS bfs, const char *uri)
+{
+    static hdfsFileInfo *info;
+
+    info = ndfs_get_path_info(bfs, uri);
+    if (!info) {
+        // errno will be set
+        return -1;
+    }
+    hdfsFreeFileInfo(info, 1);
+    return 0;
+}
+
+int ndfs_unlink(struct hdfs_internal *bfs,
+                const char *uri, int recursive)
+{
+    struct native_fs *fs = (struct native_fs*)bfs;
+    struct hadoop_err *err = NULL;
+    DeleteRequestProto req = DELETE_REQUEST_PROTO__INIT;
+    struct hrpc_proxy proxy;
+    DeleteResponseProto *resp = NULL;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    req.recursive = !!recursive;
+    err = cnn_delete(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    if (resp->result == 0) {
+        err = hadoop_lerr_alloc(ENOENT, "ndfs_unlink(%s, recursive=%d): "
+                    "deletion failed on the server", uri, recursive);
+        goto done;
+    }
+
+done:
+    free(path);
+    if (resp) {
+        delete_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+int ndfs_rename(hdfsFS bfs, const char *src_uri, const char *dst_uri)
+{
+    struct native_fs *fs = (struct native_fs*)bfs;
+    struct hadoop_err *err = NULL;
+    Rename2RequestProto req = RENAME2_REQUEST_PROTO__INIT;
+    Rename2ResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+    char *src_path = NULL, *dst_path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, src_uri, &src_path);
+    if (err) {
+        goto done;
+    }
+    err = build_path(fs, dst_uri, &dst_path);
+    if (err) {
+        goto done;
+    }
+    req.src = src_path;
+    req.dst = dst_path;
+    req.overwritedest = 0; // TODO: support overwrite
+    err = cnn_rename2(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+
+done:
+    free(src_path);
+    free(dst_path);
+    if (resp) {
+        rename2_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+char* ndfs_get_working_directory(hdfsFS bfs, char *buffer, size_t bufferSize)
+{
+    size_t len;
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+
+    uv_mutex_lock(&fs->working_uri_lock);
+    len = strlen(fs->conn_uri->path);
+    if (len + 1 > bufferSize) {
+        err = hadoop_lerr_alloc(ENAMETOOLONG, "ndfs_get_working_directory: "
+                "the buffer supplied was only %zd bytes, but we would need "
+                "%zd bytes to hold the working directory.",
+                bufferSize, len + 1);
+        goto done;
+    }
+    strcpy(buffer, fs->conn_uri->path);
+done:
+    uv_mutex_unlock(&fs->working_uri_lock);
+    return hadoopfs_errno_and_retptr(err, buffer);
+}
+
+int ndfs_set_working_directory(hdfsFS bfs, const char* uri_str)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+    struct hadoop_uri *uri = NULL;
+
+    uv_mutex_lock(&fs->working_uri_lock);
+    err = hadoop_uri_parse(uri_str, fs->working_uri, &uri,
+            H_URI_PARSE_ALL | H_URI_APPEND_SLASH);
+    if (err) {
+        err = hadoop_err_prepend(err, 0, "ndfs_set_working_directory: ");
+        goto done;
+    }
+    hadoop_uri_free(fs->working_uri);
+    fs->working_uri = uri;
+    err = NULL;
+
+done:
+    uv_mutex_unlock(&fs->working_uri_lock);
+    return hadoopfs_errno_and_retcode(err);
+}
+
+int ndfs_mkdir(hdfsFS bfs, const char* uri)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+    MkdirsRequestProto req = MKDIRS_REQUEST_PROTO__INIT;
+    MkdirsResponseProto *resp = NULL;
+    FsPermissionProto perm = FS_PERMISSION_PROTO__INIT;
+    struct hrpc_proxy proxy;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    // TODO: a better libhdfs API would allow us to specify what mode to
+    // create a particular directory with.
+    perm.perm = 0777 & (~fs->umask);
+    req.masked = &perm;
+    req.createparent = 1; // TODO: add libhdfs API for non-recursive mkdir
+    err = cnn_mkdirs(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    if (!resp->result) {
+        err = hadoop_lerr_alloc(EEXIST, "ndfs_mkdir(%s): a path "
+                "component already exists as a non-directory.", path);
+        goto done;
+    }
+    err = NULL;
+
+done:
+    free(path);
+    if (resp) {
+        mkdirs_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+int ndfs_set_replication(hdfsFS bfs, const char* uri, int16_t replication)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+    SetReplicationRequestProto req = SET_REPLICATION_REQUEST_PROTO__INIT;
+    SetReplicationResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    req.replication = replication;
+    err = cnn_set_replication(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    if (!resp->result) {
+        err = hadoop_lerr_alloc(EINVAL, "ndfs_set_replication(%s): path "
+                "does not exist or is not a regular file.", path);
+        goto done;
+    }
+
+done:
+    free(path);
+    if (resp) {
+        set_replication_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+static struct hadoop_err *ndfs_list_partial(struct native_fs *fs,
+        const char *path, const char *prev, uint32_t *entries_len,
+        hdfsFileInfo **entries, uint32_t *remaining)
+{
+    struct hadoop_err *err = NULL;
+    GetListingRequestProto req = GET_LISTING_REQUEST_PROTO__INIT;
+    GetListingResponseProto *resp = NULL;
+    hdfsFileInfo *nentries;
+    struct hrpc_proxy proxy;
+    uint64_t nlen;
+    size_t i;
+    char *prefix = NULL;
+
+    err = dynprintf(&prefix, "%s%s/", fs->url_prefix, path);
+    if (err)
+        goto done;
+    ndfs_nn_proxy_init(fs, &proxy);
+    req.src = (char*)path;
+    req.startafter.data = (unsigned char*)prev;
+    req.startafter.len = strlen(prev);
+    req.needlocation = 0;
+    err = cnn_get_listing(&proxy, &req, &resp);
+    if (err)
+        goto done;
+    if (!resp->dirlist) {
+        err = hadoop_lerr_alloc(ENOENT, "ndfs_list_partial(path=%s, "
+                "prev=%s): No such directory.", path, prev);
+        goto done;
+    }
+    nlen = *entries_len;
+    nlen += resp->dirlist->n_partiallisting;
+    nentries = realloc(*entries, nlen * sizeof(hdfsFileInfo));
+    if (!nentries) {
+        err = hadoop_lerr_alloc(ENOENT, "ndfs_list_partial(path=%s, "
+                "prev=%s): failed to allocate space for %zd new entries.",
+                path, prev, resp->dirlist->n_partiallisting);
+        goto done;
+    }
+    memset(nentries + ((*entries_len) * sizeof(hdfsFileInfo)),
+           0, (resp->dirlist->n_partiallisting * sizeof(hdfsFileInfo)));
+    *entries = nentries;
+    *entries_len = nlen;
+    *remaining = resp->dirlist->remainingentries;
+    for (i = 0; i < resp->dirlist->n_partiallisting; i++) {
+        err = populate_file_info(&nentries[i],
+                    resp->dirlist->partiallisting[i], prefix);
+        if (err)
+            goto done;
+    }
+    err = NULL;
+
+done:
+    free(prefix);
+    if (resp) {
+        get_listing_response_proto__free_unpacked(resp, NULL);
+    }
+    return err;
+}
+
+static struct hadoop_err *populate_file_info(struct file_info *info,
+                    HdfsFileStatusProto *status, const char *prefix)
+{
+    if (status->filetype == IS_DIR) {
+        info->mKind = kObjectKindDirectory; 
+    } else {
+        // note: we don't support symlinks yet here.
+        info->mKind = kObjectKindFile; 
+    }
+    info->mName = malloc(strlen(prefix) + status->path.len + 1);
+    if (!info->mName)
+        goto oom;
+    strcpy(info->mName, prefix);
+    memcpy(info->mName + strlen(prefix), status->path.data, status->path.len);
+    info->mName[strlen(prefix) + status->path.len] = '\0';
+    info->mLastMod = status->modification_time / 1000LL;
+    info->mSize = status->length;
+    if (status->has_block_replication) {
+        info->mReplication = status->block_replication;
+    } else {
+        info->mReplication = 0;
+    }
+    if (status->has_blocksize) {
+        info->mBlockSize = status->blocksize;
+    } else {
+        info->mBlockSize = 0;
+    }
+    info->mOwner = strdup(status->owner);
+    if (!info->mOwner)
+        goto oom;
+    info->mGroup = strdup(status->group);
+    if (!info->mGroup)
+        goto oom;
+    info->mPermissions = status->permission->perm;
+    info->mLastAccess = status->access_time / 1000LL;
+    return NULL;
+oom:
+    return hadoop_lerr_alloc(ENOMEM, "populate_file_info(%s): OOM",
+                            info->mName);
+}
+
+hdfsFileInfo* ndfs_list_directory(hdfsFS bfs, const char* uri, int *numEntries)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+    hdfsFileInfo *entries = NULL;
+    uint32_t entries_len = 0, remaining = 0;
+    char *prev, *path = NULL;
+
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    // We may need to make multiple RPCs to the Namenode to get all the
+    // entries in this directory.  We need to keep making RPCs as long as the
+    // 'remaining' value we get back is more than 0.  The actual value of
+    // 'remaining' isn't interesting, because it may have changed by the time
+    // we make the next RPC.
+    do {
+        if (entries_len > 0) {
+            prev = entries[entries_len - 1].mName;
+        } else {
+            prev = "";
+        }
+        err = ndfs_list_partial(fs, path, prev, &entries_len,
+                                &entries, &remaining);
+        if (err)
+            goto done;
+    } while (remaining != 0);
+    err = NULL;
+
+done:
+    free(path);
+    if (err) {
+        if (entries) {
+            hdfsFreeFileInfo(entries, entries_len);
+            entries = NULL;
+        }
+    } else {
+        *numEntries = entries_len;
+    }
+    return hadoopfs_errno_and_retptr(err, entries);
+}
+
+hdfsFileInfo *ndfs_get_path_info(hdfsFS bfs, const char* uri)
+{
+    struct native_fs *fs = (struct native_fs*)bfs;
+    struct hadoop_err *err = NULL;
+    GetFileInfoRequestProto req = GET_FILE_INFO_REQUEST_PROTO__INIT;
+    GetFileInfoResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+    char *prefix = NULL, *path = NULL;
+    hdfsFileInfo *info = NULL;
+
+    // The GetFileInfo RPC returns a blank 'path' field.
+    // To maintain 100% compatibility with the JNI client, we need to fill it
+    // in with a URI containing the absolute path to the file.
+    err = dynprintf(&prefix, "%s%s", fs->url_prefix, path);
+    if (err)
+        goto done;
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    err = cnn_get_file_info(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    if (!resp->fs) {
+        err = hadoop_lerr_alloc(ENOENT, "ndfs_get_path_info(%s): no such "
+                                "file or directory.", path);
+        goto done;
+    }
+    info = calloc(1, sizeof(*info));
+    if (!info) {
+        err = hadoop_lerr_alloc(ENOMEM, "ndfs_get_path_info(%s): OOM", path);
+        goto done;
+    }
+    err = populate_file_info(info, resp->fs, prefix);
+    if (err) {
+        err = hadoop_err_prepend(err, 0, "ndfs_get_path_info(%s)", path);
+        goto done;
+    }
+    err = NULL;
+
+done:
+    free(prefix);
+    free(path);
+    if (resp) {
+        get_file_info_response_proto__free_unpacked(resp, NULL);
+    }
+    if (err) {
+        if (info) {
+            hdfsFreeFileInfo(info, 1);
+            info = NULL;
+        }
+    }
+    return hadoopfs_errno_and_retptr(err, info);
+}
+
+tOffset ndfs_get_default_block_size(hdfsFS bfs)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    return fs->default_block_size;
+}
+
+tOffset ndfs_get_default_block_size_at_path(hdfsFS bfs, const char *uri)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+    GetPreferredBlockSizeRequestProto req =
+        GET_PREFERRED_BLOCK_SIZE_REQUEST_PROTO__INIT;
+    GetPreferredBlockSizeResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+    tOffset ret = 0;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.filename = path;
+    err = cnn_get_preferred_block_size(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    ret = resp->bsize;
+    err = NULL;
+
+done:
+    free(path);
+    if (resp) {
+        get_preferred_block_size_response_proto__free_unpacked(resp, NULL);
+    }
+    if (err)
+        return hadoopfs_errno_and_retcode(err);
+    return ret;
+}
+
+struct hadoop_err *ndfs_statvfs(struct hadoop_fs_base *hfs,
+        struct hadoop_vfs_stats *stats)
+{
+    struct native_fs *fs = (struct native_fs*)hfs;
+
+    GetFsStatusRequestProto req = GET_FS_STATUS_REQUEST_PROTO__INIT;
+    GetFsStatsResponseProto *resp = NULL;
+    struct hadoop_err *err = NULL;
+    struct hrpc_proxy proxy;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = cnn_get_fs_stats(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+    stats->capacity = resp->capacity;
+    stats->used = resp->used;
+    stats->remaining = resp->remaining;
+    stats->under_replicated = resp->under_replicated;
+    stats->corrupt_blocks = resp->corrupt_blocks;
+    stats->missing_blocks = resp->missing_blocks;
+
+done:
+    if (resp) {
+        get_fs_stats_response_proto__free_unpacked(resp, NULL);
+    }
+    return err;
+}
+
+tOffset ndfs_get_capacity(hdfsFS bfs)
+{
+    struct hadoop_err *err;
+    struct hadoop_vfs_stats stats;
+
+    err = ndfs_statvfs((struct hadoop_fs_base *)bfs, &stats);
+    if (err)
+        return hadoopfs_errno_and_retcode(err);
+    return stats.capacity;
+}
+
+tOffset ndfs_get_used(hdfsFS bfs)
+{
+    struct hadoop_err *err;
+    struct hadoop_vfs_stats stats;
+
+    err = ndfs_statvfs((struct hadoop_fs_base *)bfs, &stats);
+    if (err)
+        return hadoopfs_errno_and_retcode(err);
+    return stats.used;
+}
+
+int ndfs_chown(hdfsFS bfs, const char* uri,
+                      const char *user, const char *group)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    struct hadoop_err *err = NULL;
+    SetOwnerRequestProto req = SET_OWNER_REQUEST_PROTO__INIT;
+    SetOwnerResponseProto *resp = NULL;
+    struct hrpc_proxy proxy;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    req.username = (char*)user;
+    req.groupname = (char*)group;
+    err = cnn_set_owner(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+
+done:
+    free(path);
+    if (resp) {
+        set_owner_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+int ndfs_chmod(hdfsFS bfs, const char* uri, short mode)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    FsPermissionProto perm = FS_PERMISSION_PROTO__INIT;
+    SetPermissionRequestProto req = SET_PERMISSION_REQUEST_PROTO__INIT;
+    SetPermissionResponseProto *resp = NULL;
+    struct hadoop_err *err = NULL;
+    struct hrpc_proxy proxy;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    req.permission = &perm;
+    perm.perm = mode;
+    err = cnn_set_permission(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+
+done:
+    free(path);
+    if (resp) {
+        set_permission_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+int ndfs_utime(hdfsFS bfs, const char* uri, int64_t mtime, int64_t atime)
+{
+    struct native_fs *fs = (struct native_fs *)bfs;
+    SetTimesRequestProto req = SET_TIMES_REQUEST_PROTO__INIT ;
+    SetTimesResponseProto *resp = NULL;
+    struct hadoop_err *err = NULL;
+    struct hrpc_proxy proxy;
+    char *path = NULL;
+
+    ndfs_nn_proxy_init(fs, &proxy);
+    err = build_path(fs, uri, &path);
+    if (err) {
+        goto done;
+    }
+    req.src = path;
+    // If mtime or atime are -1, that means "no change."
+    // Otherwise, we need to multiply by 1000, to take into account the fact
+    // that libhdfs times are in seconds, and HDFS times are in milliseconds.
+    // It's unfortunate that libhdfs doesn't support the full millisecond
+    // precision.  We need to redo the API at some point.
+    if (mtime < 0) {
+        req.mtime = -1;
+    } else {
+        req.mtime = mtime;
+        req.mtime *= 1000;
+    }
+    if (atime < 0) {
+        req.atime = -1;
+    } else {
+        req.atime = atime;
+        req.atime *= 1000;
+    }
+    err = cnn_set_times(&proxy, &req, &resp);
+    if (err) {
+        goto done;
+    }
+
+done:
+    free(path);
+    if (resp) {
+        set_times_response_proto__free_unpacked(resp, NULL);
+    }
+    return hadoopfs_errno_and_retcode(err);
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/meta.h
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/meta.h?rev=1619622&view=auto
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/meta.h
 (added)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/meta.h
 Thu Aug 21 23:10:49 2014
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HADOOP_CORE_NDFS_META_H
+#define HADOOP_CORE_NDFS_META_H
+
+#include <netinet/in.h>
+#include <uv.h>
+
+struct hadoop_vfs_stats;
+
+struct native_fs {
+    /** Fields common to all filesystems. */
+    struct hadoop_fs_base base;
+
+    /**
+     * Address of the namenode.
+     * TODO: implement HA failover
+     * TODO: implement IPv6
+     */
+    struct sockaddr_in nn_addr;
+
+    /** The messenger used to perform RPCs. */
+    struct hrpc_messenger *msgr;
+
+    /** The default block size obtained from getServerDefaults. */
+    int64_t default_block_size;
+
+    /** Prefix to use when building URLs. */
+    char *url_prefix;
+
+    /** URI that was used when connecting. */
+    struct hadoop_uri *conn_uri;
+
+    /**
+     * A dynamically allocated working directory which will be prepended to
+     * all relative paths.
+     */
+    struct hadoop_uri *working_uri;
+
+    /** Lock which protects the working_uri. */
+    uv_mutex_t working_uri_lock;
+
+    /** Umask to use when creating files */
+    uint32_t umask;
+
+    /** How long to wait, in nanoseconds, before re-trying a dead datanode. */
+    uint64_t dead_dn_timeout_ns;
+};
+
+struct hadoop_err *ndfs_connect(struct hdfsBuilder *hdfs_bld,
+                                struct hdfs_internal **out);
+int ndfs_disconnect(hdfsFS bfs);
+int ndfs_file_exists(hdfsFS bfs, const char *uri);
+int ndfs_unlink(struct hdfs_internal *bfs,
+                const char *uri, int recursive);
+int ndfs_rename(hdfsFS bfs, const char *src_uri, const char *dst_uri);
+char* ndfs_get_working_directory(hdfsFS bfs, char *buffer, size_t bufferSize);
+int ndfs_set_working_directory(hdfsFS bfs, const char* uri_str);
+int ndfs_mkdir(hdfsFS bfs, const char* uri);
+int ndfs_set_replication(hdfsFS bfs, const char* uri, int16_t replication);
+hdfsFileInfo* ndfs_list_directory(hdfsFS bfs, const char* uri, int 
*numEntries);
+hdfsFileInfo *ndfs_get_path_info(hdfsFS bfs, const char* uri);
+tOffset ndfs_get_default_block_size(hdfsFS bfs);
+tOffset ndfs_get_default_block_size_at_path(hdfsFS bfs, const char *uri);
+struct hadoop_err *ndfs_statvfs(struct hadoop_fs_base *hfs,
+        struct hadoop_vfs_stats *stats);
+tOffset ndfs_get_capacity(hdfsFS bfs);
+tOffset ndfs_get_used(hdfsFS bfs);
+int ndfs_chown(hdfsFS bfs, const char* uri, const char *user,
+               const char *group);
+int ndfs_chmod(hdfsFS bfs, const char* uri, short mode);
+int ndfs_utime(hdfsFS bfs, const char* uri, int64_t mtime, int64_t atime);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et

Added: 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/ops.c
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/ops.c?rev=1619622&view=auto
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/ops.c
 (added)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/ops.c
 Thu Aug 21 23:10:49 2014
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "fs/fs.h"
+#include "ndfs/file.h"
+#include "ndfs/meta.h"
+
+const struct hadoop_fs_ops g_ndfs_ops = {
+    .name = "ndfs",
+    .file_is_open_for_read = ndfs_file_is_open_for_read,
+    .file_is_open_for_write = ndfs_file_is_open_for_write,
+    .get_read_statistics = ndfs_file_get_read_statistics,
+    .connect = ndfs_connect,
+    .disconnect = ndfs_disconnect,
+    .open = ndfs_open_file,
+    .close = ndfs_close_file,
+    .exists = ndfs_file_exists,
+    .seek = ndfs_seek,
+    .tell = ndfs_tell,
+    .read = ndfs_read,
+    .pread = ndfs_pread,
+    .write = ndfs_write,
+    .flush = ndfs_flush,
+    .hflush = ndfs_hflush,
+    .hsync = ndfs_hsync,
+    .available = ndfs_available,
+    .copy = NULL,
+    .move = NULL,
+    .unlink = ndfs_unlink,
+    .rename = ndfs_rename,
+    .get_working_directory = ndfs_get_working_directory,
+    .set_working_directory = ndfs_set_working_directory,
+    .mkdir = ndfs_mkdir,
+    .set_replication = ndfs_set_replication,
+    .list_directory = ndfs_list_directory,
+    .get_path_info = ndfs_get_path_info,
+    .get_hosts = ndfs_get_hosts,
+    .get_default_block_size = ndfs_get_default_block_size,
+    .get_default_block_size_at_path = ndfs_get_default_block_size_at_path,
+    .get_capacity = ndfs_get_capacity,
+    .get_used = ndfs_get_used,
+    .chown = ndfs_chown,
+    .chmod = ndfs_chmod,
+    .utime = ndfs_utime,
+    .read_zero = ndfs_read_zero,
+    .rz_buffer_free = ndfs_rz_buffer_free,
+
+    // test
+    .file_uses_direct_read = ndfs_file_uses_direct_read,
+    .file_disable_direct_read = ndfs_file_disable_direct_read,
+};
+
+// vim: ts=4:sw=4:tw=79:et

Added: 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/permission.c
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/permission.c?rev=1619622&view=auto
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/permission.c
 (added)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/permission.c
 Thu Aug 21 23:10:49 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/hadoop_err.h"
+#include "ndfs/permission.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+struct hadoop_err *parse_permission(const char *str, uint32_t *perm)
+{
+    if (strspn(str, " 01234567") != strlen(str)) {
+        // TODO: support permission strings as in PermissionParser.java (see
+        // HADOOP-10981)
+        return hadoop_lerr_alloc(ENOTSUP, "parse_permission(%s): "
+            "can't parse non-octal permissions (yet)", str);
+    }
+    errno = 0;
+    *perm = strtol(str, NULL, 8);
+    if (errno) {
+        int ret = errno;
+        return hadoop_lerr_alloc(EINVAL, "parse_permission(%s): "
+                "failed to parse this octal string: %s",
+                str, terror(ret));
+    }
+    return NULL;
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/permission.h
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/permission.h?rev=1619622&view=auto
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/permission.h
 (added)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/permission.h
 Thu Aug 21 23:10:49 2014
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HADOOP_CORE_NDFS_PERMISSION_H
+#define HADOOP_CORE_NDFS_PERMISSION_H
+
+#include <stdint.h>
+
+struct hadoop_err;
+
+/**
+ * Parse a Hadoop permission string into a numeric value.
+ *
+ * @param str       The string to parse.
+ * @param perm      (out param) the numeric value.
+ *
+ * @return          NULL on success; the error otherwise. 
+ */
+struct hadoop_err *parse_permission(const char *str, uint32_t *perm);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et

Added: 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/util.c
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/util.c?rev=1619622&view=auto
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/util.c
 (added)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/util.c
 Thu Aug 21 23:10:49 2014
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "common/hadoop_err.h"
+#include "common/string.h"
+#include "common/uri.h"
+#include "fs/common.h"
+#include "fs/fs.h"
+#include "ndfs/meta.h"
+#include "protobuf/ClientNamenodeProtocol.call.h"
+#include "protobuf/hdfs.pb-c.h.s"
+#include "rpc/messenger.h"
+#include "rpc/proxy.h"
+
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
+#include <uv.h>
+
+#define CLIENT_NN_PROTOCOL "org.apache.hadoop.hdfs.protocol.ClientProtocol"
+
+void ndfs_nn_proxy_init(struct native_fs *fs, struct hrpc_proxy *proxy)
+{
+    hrpc_proxy_init(proxy, fs->msgr, &fs->nn_addr, CLIENT_NN_PROTOCOL,
+                    fs->conn_uri->user_info);
+}
+
+struct hadoop_err *build_path(struct native_fs *fs, const char *uri_str,
+                                     char **out)
+{
+    struct hadoop_err *err = NULL;
+    struct hadoop_uri *uri = NULL;
+
+    uv_mutex_lock(&fs->working_uri_lock);
+    err = hadoop_uri_parse(uri_str, fs->working_uri, &uri, H_URI_PARSE_PATH);
+    if (err)
+        goto done;
+    // TODO: check URI scheme and user against saved values?
+    if (uri->path[0]) {
+        *out = strdup(uri->path);
+    } else {
+        // As a special case, when the URI given has an empty path, we assume 
that
+        // we want the current working directory.  This is to allow things like
+        // hdfs://mynamenode to map to the current working directory, as they 
do in
+        // Hadoop.  Note that this is different than hdfs://mynamenode/ (note 
the
+        // trailing slash) which maps to the root directory.
+        *out = strdup(fs->working_uri->path);
+    }
+    if (!*out) {
+        err = hadoop_lerr_alloc(ENOMEM, "build_path: out of memory.");
+        goto done;
+    }
+    err = NULL;
+
+done:
+    uv_mutex_unlock(&fs->working_uri_lock);
+    hadoop_uri_free(uri);
+    return err;
+}
+
+// vim: ts=4:sw=4:tw=79:et

Added: 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/util.h
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/util.h?rev=1619622&view=auto
==============================================================================
--- 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/util.h
 (added)
+++ 
hadoop/common/branches/HADOOP-10388/hadoop-native-core/src/main/native/ndfs/util.h
 Thu Aug 21 23:10:49 2014
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef HADOOP_CORE_NDFS_UTIL_H
+#define HADOOP_CORE_NDFS_UTIL_H
+
+struct hadoop_err;
+struct hrpc_proxy;
+struct native_fs;
+
+/**
+ * Initialize a namenode proxy.
+ *
+ * @param fs            The native filesystem.
+ * @param proxy         (out param) The proxy to initialize.
+ */
+void ndfs_nn_proxy_init(struct native_fs *fs, struct hrpc_proxy *proxy);
+
+/**
+ * Construct a canonical path from a URI.
+ *
+ * @param fs        The filesystem.
+ * @param uri       The URI.
+ * @param out       (out param) the canonical path.
+ *
+ * @return          NULL on success; the error otherwise.
+ */
+struct hadoop_err *build_path(struct native_fs *fs, const char *uri_str,
+                                     char **out);
+
+#endif
+
+// vim: ts=4:sw=4:tw=79:et


Reply via email to