Author: cutting Date: Wed Oct 3 14:17:00 2007 New Revision: 581725 URL: http://svn.apache.org/viewvc?rev=581725&view=rev Log: HADOOP-1963. Add a FileSystem implementation for the Kosmos Filesystem (KFS). Contributed by Sriram Rao.
Added: lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt lucene/hadoop/trunk/lib/kfs-0.1.jar (with props) lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/conf/hadoop-default.xml Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=581725&r1=581724&r2=581725&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Oct 3 14:17:00 2007 @@ -74,6 +74,8 @@ and codec, independent of the final output's compression parameters. (Arun C Murthy via cutting) + HADOOP-1963. Add a FileSystem implementation for the Kosmos + Filesystem (KFS). (Sriram Rao via cutting) OPTIMIZATIONS Modified: lucene/hadoop/trunk/conf/hadoop-default.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=581725&r1=581724&r2=581725&view=diff ============================================================================== --- lucene/hadoop/trunk/conf/hadoop-default.xml (original) +++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Oct 3 14:17:00 2007 @@ -145,6 +145,12 @@ </property> <property> + <name>fs.kfs.impl</name> + <value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value> + <description>The FileSystem for kfs: uris.</description> +</property> + +<property> <name>fs.hftp.impl</name> <value>org.apache.hadoop.dfs.HftpFileSystem</value> </property> Added: lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt?rev=581725&view=auto ============================================================================== --- lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt (added) +++ lucene/hadoop/trunk/lib/kfs-0.1.LICENSE.txt Wed Oct 3 14:17:00 2007 @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. Added: lucene/hadoop/trunk/lib/kfs-0.1.jar URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/lib/kfs-0.1.jar?rev=581725&view=auto ============================================================================== Binary file - no diff available. Propchange: lucene/hadoop/trunk/lib/kfs-0.1.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java?rev=581725&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/IFSImpl.java Wed Oct 3 14:17:00 2007 @@ -0,0 +1,55 @@ +/** + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + * @author: Sriram Rao (Kosmix Corp.) + * + * We need to provide the ability to the code in fs/kfs without really + * having a KFS deployment. In particular, the glue code that wraps + * around calls to KfsAccess object. This is accomplished by defining a + * filesystem implementation interface: + * -- for testing purposes, a dummy implementation of this interface + * will suffice; as long as the dummy implementation is close enough + * to doing what KFS does, we are good. + * -- for deployment purposes with KFS, this interface is + * implemented by the KfsImpl object. + */ + +package org.apache.hadoop.fs.kfs; + +import java.io.*; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +interface IFSImpl { + public boolean exists(String path) throws IOException; + public boolean isDirectory(String path) throws IOException; + public boolean isFile(String path) throws IOException; + public String[] readdir(String path) throws IOException; + + public int mkdirs(String path) throws IOException; + public int rename(String source, String dest) throws IOException; + + public int rmdir(String path) throws IOException; + public int remove(String path) throws IOException; + public long filesize(String path) throws IOException; + public short getReplication(String path) throws IOException; + public short setReplication(String path, short replication) throws IOException; + public String[][] getDataLocation(String path, long start, long len) throws IOException; + + public long getModificationTime(String path) throws IOException; + public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException; + public FSDataInputStream open(String path, int bufferSize) throws IOException; + +}; Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java?rev=581725&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java Wed Oct 3 14:17:00 2007 @@ -0,0 +1,104 @@ +/** + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + * @author: Sriram Rao (Kosmix Corp.) + * + * Provide the implementation of KFS which turn into calls to KfsAccess. + */ + +package org.apache.hadoop.fs.kfs; + +import java.io.*; +import java.net.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Progressable; + +import org.kosmix.kosmosfs.access.KfsAccess; + +class KFSImpl implements IFSImpl { + private KfsAccess kfsAccess = null; + + public KFSImpl(String metaServerHost, int metaServerPort) throws IOException { + kfsAccess = new KfsAccess(metaServerHost, metaServerPort); + } + + public boolean exists(String path) throws IOException { + return kfsAccess.kfs_exists(path); + } + + public boolean isDirectory(String path) throws IOException { + return kfsAccess.kfs_isDirectory(path); + } + + public boolean isFile(String path) throws IOException { + return kfsAccess.kfs_isFile(path); + } + + public String[] readdir(String path) throws IOException { + return kfsAccess.kfs_readdir(path); + } + + public int mkdirs(String path) throws IOException { + return kfsAccess.kfs_mkdirs(path); + } + + public int rename(String source, String dest) throws IOException { + return kfsAccess.kfs_rename(source, dest); + } + + public int rmdir(String path) throws IOException { + return kfsAccess.kfs_rmdir(path); + } + + public int remove(String path) throws IOException { + return kfsAccess.kfs_remove(path); + } + + public long filesize(String path) throws IOException { + return kfsAccess.kfs_filesize(path); + } + + public short getReplication(String path) throws IOException { + return kfsAccess.kfs_getReplication(path); + } + + public short setReplication(String path, short replication) throws IOException { + return kfsAccess.kfs_setReplication(path, replication); + } + + public String[][] getDataLocation(String path, long start, long len) throws IOException { + return kfsAccess.kfs_getDataLocation(path, start, len); + } + + public long getModificationTime(String path) throws IOException { + // Supporting this API requires changes to the Java-side of + // the KFS client API. For now, return 0; in the next rev of + // KFS, we'll update the Java API. + return 0; + } + + public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException { + return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, replication)); + } + + public FSDataInputStream open(String path, int bufferSize) throws IOException { + return new FSDataInputStream(new KFSInputStream(kfsAccess, path)); + } +}; Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java?rev=581725&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java Wed Oct 3 14:17:00 2007 @@ -0,0 +1,123 @@ +/** + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + * @author: Sriram Rao (Kosmix Corp.) + * + * Implements the Hadoop FSInputStream interfaces to allow applications to read + * files in Kosmos File System (KFS). + */ + +package org.apache.hadoop.fs.kfs; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.nio.ByteBuffer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.util.Progressable; + +import org.kosmix.kosmosfs.access.KfsAccess; +import org.kosmix.kosmosfs.access.KfsInputChannel; + +class KFSInputStream extends FSInputStream { + + private String path; + private KfsInputChannel kfsChannel; + + private long fsize; + + public KFSInputStream(KfsAccess kfsAccess, String path) { + this.path = path; + + this.kfsChannel = kfsAccess.kfs_open(path); + if (this.kfsChannel != null) + this.fsize = kfsAccess.kfs_filesize(path); + else + this.fsize = 0; + } + + public long getPos() throws IOException { + if (kfsChannel == null) { + throw new IOException("File closed"); + } + return kfsChannel.tell(); + } + + public synchronized int available() throws IOException { + if (kfsChannel == null) { + throw new IOException("File closed"); + } + return (int) (this.fsize - getPos()); + } + + public synchronized void seek(long targetPos) throws IOException { + if (kfsChannel == null) { + throw new IOException("File closed"); + } + kfsChannel.seek(targetPos); + } + + public synchronized boolean seekToNewSource(long targetPos) throws IOException { + return false; + } + + public synchronized int read() throws IOException { + if (kfsChannel == null) { + throw new IOException("File closed"); + } + byte b[] = new byte[4]; + int res = read(b, 0, 4); + if (res == 4) + return (b[0] + (b[1] << 8) + (b[2] << 16) + (b[3] << 24)); + return -1; + } + + public synchronized int read(byte b[], int off, int len) throws IOException { + if (kfsChannel == null) { + throw new IOException("File closed"); + } + int res; + + res = kfsChannel.read(ByteBuffer.wrap(b, off, len)); + // Use -1 to signify EOF + if (res == 0) + return -1; + return res; + } + + public synchronized void close() throws IOException { + if (kfsChannel == null) { + return; + } + + kfsChannel.close(); + kfsChannel = null; + } + + public boolean markSupported() { + return false; + } + + public void mark(int readLimit) { + // Do nothing + } + + public void reset() throws IOException { + throw new IOException("Mark not supported"); + } + +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java?rev=581725&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java Wed Oct 3 14:17:00 2007 @@ -0,0 +1,90 @@ +/** + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + * @author: Sriram Rao (Kosmix Corp.) + * + * Implements the Hadoop FSOutputStream interfaces to allow applications to write to + * files in Kosmos File System (KFS). + */ + +package org.apache.hadoop.fs.kfs; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.nio.ByteBuffer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.util.Progressable; + +import org.kosmix.kosmosfs.access.KfsAccess; +import org.kosmix.kosmosfs.access.KfsOutputChannel; + +class KFSOutputStream extends OutputStream { + + private String path; + private KfsOutputChannel kfsChannel; + + public KFSOutputStream(KfsAccess kfsAccess, String path, short replication) { + this.path = path; + + this.kfsChannel = kfsAccess.kfs_create(path, replication); + } + + public long getPos() throws IOException { + if (kfsChannel == null) { + throw new IOException("File closed"); + } + return kfsChannel.tell(); + } + + public void write(int v) throws IOException { + if (kfsChannel == null) { + throw new IOException("File closed"); + } + byte[] b = new byte[4]; + + b[0] = (byte) (v & 0xFF); + b[1] = (byte) ((v >> 8) & 0xFF); + b[1] = (byte) ((v >> 16) & 0xFF); + b[1] = (byte) ((v >> 24) & 0xFF); + write(b, 0, 4); + } + + public void write(byte b[], int off, int len) throws IOException { + if (kfsChannel == null) { + throw new IOException("File closed"); + } + + kfsChannel.write(ByteBuffer.wrap(b, off, len)); + } + + public void flush() throws IOException { + if (kfsChannel == null) { + throw new IOException("File closed"); + } + kfsChannel.sync(); + } + + public synchronized void close() throws IOException { + if (kfsChannel == null) { + return; + } + flush(); + kfsChannel.close(); + kfsChannel = null; + } +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java?rev=581725&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java Wed Oct 3 14:17:00 2007 @@ -0,0 +1,383 @@ +/** + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + * @author: Sriram Rao (Kosmix Corp.) + * + * Implements the Hadoop FS interfaces to allow applications to store + *files in Kosmos File System (KFS). + */ + +package org.apache.hadoop.fs.kfs; + +import java.io.*; +import java.net.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Progressable; + +/** + * A FileSystem backed by KFS. + * + */ + +public class KosmosFileSystem extends FileSystem { + + private FileSystem localFs; + private IFSImpl kfsImpl = null; + private URI uri; + private Path workingDir = new Path("/"); + + public KosmosFileSystem() { + + } + + KosmosFileSystem(IFSImpl fsimpl) { + this.kfsImpl = fsimpl; + } + + public URI getUri() { + return uri; + } + + public void initialize(URI uri, Configuration conf) throws IOException { + + try { + if (kfsImpl == null) { + kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""), + conf.getInt("fs.kfs.metaServerPort", -1)); + } + this.localFs = FileSystem.getLocal(conf); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Unable to initialize KFS"); + System.exit(-1); + } + } + + @Deprecated + public String getName() { + return getUri().toString(); + } + + public Path getWorkingDirectory() { + return workingDir; + } + + public void setWorkingDirectory(Path dir) { + workingDir = makeAbsolute(dir); + } + + private Path makeAbsolute(Path path) { + if (path.isAbsolute()) { + return path; + } + return new Path(workingDir, path); + } + + public boolean exists(Path path) throws IOException { + // stat the path to make sure it exists + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + return kfsImpl.exists(srep); + } + + public boolean mkdirs(Path path) throws IOException { + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + + int res; + + // System.out.println("Calling mkdirs on: " + srep); + + res = kfsImpl.mkdirs(srep); + + return res == 0; + } + + @Deprecated + public boolean isDirectory(Path path) throws IOException { + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + + // System.out.println("Calling isdir on: " + srep); + + return kfsImpl.isDirectory(srep); + } + + @Deprecated + public boolean isFile(Path path) throws IOException { + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + return kfsImpl.isFile(srep); + } + + public long getContentLength(Path path) throws IOException { + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + + if (kfsImpl.isFile(srep)) + return kfsImpl.filesize(srep); + + String[] entries = kfsImpl.readdir(srep); + + if (entries == null) + return 0; + + // kfsreaddir() returns "." and ".."; strip them before + // passing back to hadoop fs. + long numEntries = 0; + for (int i = 0; i < entries.length; i++) { + if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0)) + continue; + numEntries++; + } + return numEntries; + } + + public FileStatus[] listStatus(Path path) throws IOException { + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + + if (kfsImpl.isFile(srep)) + return new FileStatus[] { getFileStatus(path) } ; + + String[] entries = kfsImpl.readdir(srep); + + if (entries == null) + return null; + + // kfsreaddir() returns "." and ".."; strip them before + // passing back to hadoop fs. + int numEntries = 0; + for (int i = 0; i < entries.length; i++) { + if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0)) + continue; + numEntries++; + } + if (numEntries == 0) { + return null; + } + + // System.out.println("Calling listStatus on: " + path); + + FileStatus[] pathEntries = new FileStatus[numEntries]; + int j = 0; + for (int i = 0; i < entries.length; i++) { + if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0)) + continue; + + pathEntries[j] = getFileStatus(new Path(path, entries[i])); + j++; + } + return pathEntries; + } + + public FileStatus getFileStatus(Path path) throws IOException { + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + + if (kfsImpl.isDirectory(srep)) { + // System.out.println("Status of path: " + path + " is dir"); + return new FileStatus(0, true, 1, 0, 0, path); + } else { + // System.out.println("Status of path: " + path + " is file"); + return new FileStatus(kfsImpl.filesize(srep), false, + kfsImpl.getReplication(srep), + getDefaultBlockSize(), + kfsImpl.getModificationTime(srep), path); + } + } + + + public Path[] listPaths(Path path) throws IOException { + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + + if (kfsImpl.isFile(srep)) + return new Path[] { path } ; + + String[] entries = kfsImpl.readdir(srep); + + if (entries == null) + return null; + + // kfsreaddir() returns "." and ".."; strip them before + // passing back to hadoop fs. + int numEntries = 0; + for (int i = 0; i < entries.length; i++) { + if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0)) + continue; + numEntries++; + } + if (numEntries == 0) { + return null; + } + Path[] pathEntries = new Path[numEntries]; + int j = 0; + for (int i = 0; i < entries.length; i++) { + if ((entries[i].compareTo(".") == 0) || (entries[i].compareTo("..") == 0)) + continue; + + pathEntries[j] = new Path(path, entries[i]); + j++; + } + return pathEntries; + + } + + public FSDataOutputStream create(Path file, boolean overwrite, int bufferSize, + short replication, long blockSize, Progressable progress) + throws IOException { + + if (exists(file)) { + if (overwrite) { + delete(file); + } else { + throw new IOException("File already exists: " + file); + } + } + + Path parent = file.getParent(); + if (parent != null && !mkdirs(parent)) { + throw new IOException("Mkdirs failed to create " + parent); + } + + Path absolute = makeAbsolute(file); + String srep = absolute.toUri().getPath(); + + return kfsImpl.create(srep, replication, bufferSize); + } + + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + if (!exists(path)) + throw new IOException("File does not exist: " + path); + + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + + return kfsImpl.open(srep, bufferSize); + } + + public boolean rename(Path src, Path dst) throws IOException { + Path absoluteS = makeAbsolute(src); + String srepS = absoluteS.toUri().getPath(); + Path absoluteD = makeAbsolute(dst); + String srepD = absoluteD.toUri().getPath(); + + // System.out.println("Calling rename on: " + srepS + " -> " + srepD); + + return kfsImpl.rename(srepS, srepD) == 0; + } + + // recursively delete the directory and its contents + public boolean delete(Path path) throws IOException { + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + + if (kfsImpl.isFile(srep)) + return kfsImpl.remove(srep) == 0; + + Path[] dirEntries = listPaths(absolute); + if (dirEntries != null) { + for (int i = 0; i < dirEntries.length; i++) { + delete(new Path(absolute, dirEntries[i])); + } + } + return kfsImpl.rmdir(srep) == 0; + } + + @Deprecated + public long getLength(Path path) throws IOException { + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + return kfsImpl.filesize(srep); + } + + @Deprecated + public short getReplication(Path path) throws IOException { + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + return kfsImpl.getReplication(srep); + } + + public short getDefaultReplication() { + return 3; + } + + public boolean setReplication(Path path, short replication) + throws IOException { + + Path absolute = makeAbsolute(path); + String srep = absolute.toUri().getPath(); + + int res = kfsImpl.setReplication(srep, replication); + return res >= 0; + } + + // 64MB is the KFS block size + + public long getDefaultBlockSize() { + return 1 << 26; + } + + @Deprecated + public void lock(Path path, boolean shared) throws IOException { + + } + + @Deprecated + public void release(Path path) throws IOException { + + } + + /** + * Return null if the file doesn't exist; otherwise, get the + * locations of the various chunks of the file file from KFS. + */ + public String[][] getFileCacheHints(Path f, long start, long len) + throws IOException { + if (!exists(f)) { + return null; + } + String srep = makeAbsolute(f).toUri().getPath(); + String[][] hints = kfsImpl.getDataLocation(srep, start, len); + return hints; + } + + public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException { + FileUtil.copy(localFs, src, this, dst, delSrc, getConf()); + } + + public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException { + FileUtil.copy(this, src, localFs, dst, delSrc, getConf()); + } + + public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) + throws IOException { + return tmpLocalFile; + } + + public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) + throws IOException { + moveFromLocalFile(tmpLocalFile, fsOutputFile); + } +} Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html?rev=581725&view=auto ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html (added) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/kfs/package.html Wed Oct 3 14:17:00 2007 @@ -0,0 +1,80 @@ +<html> +<head></head> +<body> +<h1>A client for the Kosmos filesystem (KFS)</h1> + +<h3>Introduction</h3> + +This pages describes how to use Kosmos Filesystem +(<a href="http://kosmosfs.sourceforge.net"> KFS </a>) as a backing +store with Hadoop. This page assumes that you have downloaded the +KFS software and installed necessary binaries as outlined in the KFS +documentation. + +<h3>Steps</h3> + + <ul> + <li>In the Hadoop conf directory edit hadoop-default.xml, + add the following: + <pre> +<property> + <name>fs.kfs.impl</name> + <value>org.apache.hadoop.fs.kfs.KosmosFileSystem</value> + <description>The FileSystem for kfs: uris.</description> +</property> + </pre> + + <li>In the Hadoop conf directory edit hadoop-site.xml, + adding the following (with appropriate values for + <server> and <port>): + <pre> +<property> + <name>fs.default.name</name> + <value>kfs://<server:port></value> +</property> + +<property> + <name>fs.kfs.metaServerHost</name> + <value><server></value> + <description>The location of the KFS meta server.</description> +</property> + +<property> + <name>fs.kfs.metaServerPort</name> + <value><port></value> + <description>The location of the meta server's port.</description> +</property> + +</pre> + </li> + + <li>Copy KFS's <i> kfs-0.1.jar </i> to Hadoop's lib directory. This step + enables Hadoop's to load the KFS specific modules. Note + that, kfs-0.1.jar was built when you compiled KFS source + code. This jar file contains code that calls KFS's client + library code via JNI; the native code is in KFS's <i> + libkfsClient.so </i> library. + </li> + + <li> When the Hadoop map/reduce trackers start up, those +processes (on local as well as remote nodes) will now need to load +KFS's <i> libkfsClient.so </i> library. To simplify this process, it is advisable to +store libkfsClient.so in an NFS accessible directory (similar to where +Hadoop binaries/scripts are stored); then, modify Hadoop's +conf/hadoop-env.sh adding the following line and providing suitable +value for <path>: +<pre> +export LD_LIBRARY_PATH=<path> +</pre> + + + <li>Start only the map/reduce trackers + <br /> + example: execute Hadoop's bin/start-mapred.sh</li> + </ul> +<br/> + +If the map/reduce job trackers start up, all file-I/O is done to KFS. + +</body> +</html> Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java?rev=581725&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java Wed Oct 3 14:17:00 2007 @@ -0,0 +1,145 @@ +/** + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + * @author: Sriram Rao (Kosmix Corp.) + * + * We need to provide the ability to the code in fs/kfs without really + * having a KFS deployment. For this purpose, use the LocalFileSystem + * as a way to "emulate" KFS. + */ + +package org.apache.hadoop.fs.kfs; + +import java.io.*; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; + + +public class KFSEmulationImpl implements IFSImpl { + FileSystem localFS; + + public KFSEmulationImpl(Configuration conf) throws IOException { + localFS = FileSystem.getLocal(conf); + } + + public boolean exists(String path) throws IOException { + return localFS.exists(new Path(path)); + } + public boolean isDirectory(String path) throws IOException { + return localFS.isDirectory(new Path(path)); + } + public boolean isFile(String path) throws IOException { + return localFS.isFile(new Path(path)); + } + + // as part of the emulation, KFS adds ./.. as directory entries + // when doing a directory listing. + public String[] readdir(String path) throws IOException { + Path[] p = localFS.listPaths(new Path(path)); + String[] entries = null; + + if (p == null) { + if (isDirectory(path)) { + // empty dirs have "." and ".." + entries = new String[2]; + entries[0] = new String("."); + entries[1] = new String(".."); + } + return entries; + } + + if (isDirectory(path)) { + // for dirs, add "."/".." as KFS does that + entries = new String[p.length + 2]; + entries[0] = new String("."); + entries[1] = new String(".."); + for (int i = 0; i < p.length; i++) + entries[i+2] = p[i].toString(); + } else { + entries = new String[p.length]; + for (int i = 0; i < p.length; i++) + entries[i] = p[i].toString(); + } + return entries; + } + + public int mkdirs(String path) throws IOException { + if (localFS.mkdirs(new Path(path))) + return 0; + + return -1; + } + + public int rename(String source, String dest) throws IOException { + if (localFS.rename(new Path(source), new Path(dest))) + return 0; + return -1; + } + + public int rmdir(String path) throws IOException { + if (isDirectory(path)) { + // the directory better be empty + String[] dirEntries = readdir(path); + if ((dirEntries.length <= 2) && (localFS.delete(new Path(path)))) + return 0; + } + return -1; + } + + public int remove(String path) throws IOException { + if (isFile(path) && (localFS.delete(new Path(path)))) + return 0; + return -1; + } + + public long filesize(String path) throws IOException { + return localFS.getLength(new Path(path)); + } + public short getReplication(String path) throws IOException { + return 1; + } + public short setReplication(String path, short replication) throws IOException { + return 1; + } + public String[][] getDataLocation(String path, long start, long len) throws IOException { + return localFS.getFileCacheHints(new Path(path), start, len); + } + + public long getModificationTime(String path) throws IOException { + FileStatus s = localFS.getFileStatus(new Path(path)); + if (s == null) + return 0; + + return s.getModificationTime(); + } + + public FSDataOutputStream create(String path, short replication, int bufferSize) throws IOException { + // besides path/overwrite, the other args don't matter for + // testing purposes. + return localFS.create(new Path(path)); + } + + public FSDataInputStream open(String path, int bufferSize) throws IOException { + return localFS.open(new Path(path)); + } + + +}; Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java?rev=581725&view=auto ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java (added) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java Wed Oct 3 14:17:00 2007 @@ -0,0 +1,169 @@ +/** + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + * @author: Sriram Rao (Kosmix Corp.) + * + * Unit tests for testing the KosmosFileSystem API implementation. + */ + +package org.apache.hadoop.fs.kfs; + +import java.io.*; +import java.net.*; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.fs.kfs.KosmosFileSystem; + +public class TestKosmosFileSystem extends TestCase { + + KosmosFileSystem kosmosFileSystem; + KFSEmulationImpl kfsEmul; + + @Override + protected void setUp() throws IOException { + Configuration conf = new Configuration(); + + kfsEmul = new KFSEmulationImpl(conf); + kosmosFileSystem = new KosmosFileSystem(kfsEmul); + // a dummy URI; we are not connecting to any setup here + kosmosFileSystem.initialize(URI.create("kfs:///"), conf); + } + + @Override + protected void tearDown() throws Exception { + + } + + // @Test + // Check all the directory API's in KFS + public void testDirs() throws Exception { + Path baseDir = new Path("/tmp/test/kfs-test"); + Path subDir1 = new Path("dir.1"); + + // make the dir + kosmosFileSystem.mkdirs(baseDir); + assertTrue(kosmosFileSystem.isDirectory(baseDir)); + kosmosFileSystem.setWorkingDirectory(baseDir); + + kosmosFileSystem.mkdirs(subDir1); + assertTrue(kosmosFileSystem.isDirectory(subDir1)); + + assertFalse(kosmosFileSystem.exists(new Path("test1"))); + assertFalse(kosmosFileSystem.isDirectory(new Path("test/dir.2"))); + + Path[] p = kosmosFileSystem.listPaths(baseDir); + assertEquals(p.length, 1); + + kosmosFileSystem.delete(baseDir); + assertFalse(kosmosFileSystem.exists(baseDir)); + } + + // @Test + // Check the file API's + public void testFiles() throws Exception { + Path baseDir = new Path("/tmp/test/kfs-test"); + Path subDir1 = new Path("dir.1"); + Path file1 = new Path("dir.1/foo.1"); + Path file2 = new Path("dir.1/foo.2"); + + kosmosFileSystem.mkdirs(baseDir); + assertTrue(kosmosFileSystem.isDirectory(baseDir)); + kosmosFileSystem.setWorkingDirectory(baseDir); + + kosmosFileSystem.mkdirs(subDir1); + + FSDataOutputStream s1 = kosmosFileSystem.create(file1, true, 4096, (short) 1, (long) 4096, null); + FSDataOutputStream s2 = kosmosFileSystem.create(file2, true, 4096, (short) 1, (long) 4096, null); + + s1.close(); + s2.close(); + + Path[] p = kosmosFileSystem.listPaths(subDir1); + assertEquals(p.length, 2); + + kosmosFileSystem.delete(file1); + p = kosmosFileSystem.listPaths(subDir1); + assertEquals(p.length, 1); + + kosmosFileSystem.delete(file2); + p = kosmosFileSystem.listPaths(subDir1); + assertEquals(p, null); + + kosmosFileSystem.delete(baseDir); + assertFalse(kosmosFileSystem.exists(baseDir)); + } + + // @Test + // Check file/read write + public void testFileIO() throws Exception { + Path baseDir = new Path("/tmp/test/kfs-test"); + Path subDir1 = new Path("dir.1"); + Path file1 = new Path("dir.1/foo.1"); + + kosmosFileSystem.mkdirs(baseDir); + assertTrue(kosmosFileSystem.isDirectory(baseDir)); + kosmosFileSystem.setWorkingDirectory(baseDir); + + kosmosFileSystem.mkdirs(subDir1); + + FSDataOutputStream s1 = kosmosFileSystem.create(file1, true, 4096, (short) 1, (long) 4096, null); + + int bufsz = 4096; + byte[] data = new byte[bufsz]; + + for (int i = 0; i < data.length; i++) + data[i] = (byte) (i % 16); + + // write an integer + s1.write(32); + // write some data + s1.write(data, 0, data.length); + // flush out the changes + s1.close(); + + // Read the stuff back and verify it is correct + FSDataInputStream s2 = kosmosFileSystem.open(file1, 4096); + int v; + + v = s2.read(); + assertEquals(v, 32); + + assertEquals(s2.available(), data.length); + + byte[] buf = new byte[bufsz]; + s2.read(buf, 0, buf.length); + for (int i = 0; i < data.length; i++) + assertEquals(data[i], buf[i]); + + assertEquals(s2.available(), 0); + + s2.close(); + + kosmosFileSystem.delete(file1); + assertFalse(kosmosFileSystem.exists(file1)); + kosmosFileSystem.delete(subDir1); + assertFalse(kosmosFileSystem.exists(subDir1)); + kosmosFileSystem.delete(baseDir); + assertFalse(kosmosFileSystem.exists(baseDir)); + } + +}