Hi everybody,
I have developed a Filesystem that supports SFTP, it uses JSch in order to 
manage SFTP.

It is a little buggy, but it is just a starting point.


Íñigo Goiri





Ahora también puedes acceder a tu correo Terra desde el móvil.

Infórmate pinchando aquí.





package org.apache.hadoop.fs.sftp;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Vector;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;

import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
import com.jcraft.jsch.UserInfo;
import com.jcraft.jsch.ChannelSftp.LsEntry;


public class SFTPFileSystem extends FileSystem {
	public static final Log LOG = LogFactory.getLog(SFTPFileSystem.class);

	public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;

	public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;

	private static final int DEFAULT_PORT = 22;

	private URI uri;

	@Override
	public void initialize(URI uri, Configuration conf) throws IOException { // get
		super.initialize(uri, conf);
		// get host information from uri (overrides info in conf)
		String host = uri.getHost();
		host = (host == null) ? conf.get("fs.sftp.host", null) : host;
		if (host == null) {
			throw new IOException("Invalid host specified");
		}
		conf.set("fs.sftp.host", host);

		// get port information from uri, (overrides info in conf)
		int port = uri.getPort();
		port = (port == -1) ? DEFAULT_PORT : port;
		conf.setInt("fs.sftp.host.port", port);

		// get user/password information from URI (overrides info in conf)
		String userAndPassword = uri.getUserInfo();
		if (userAndPassword == null) {
			userAndPassword = (conf.get("fs.sftp.user." + host, null) + ":" + conf.get("fs.sftp.password." + host, null));
			if (userAndPassword == null) {
				throw new IOException("Invalid user/passsword specified");
			}
		}
		String[] userPasswdInfo = userAndPassword.split(":");
		conf.set("fs.sftp.user." + host, userPasswdInfo[0]);
		if (userPasswdInfo.length > 1) {
			conf.set("fs.sftp.password." + host, userPasswdInfo[1]);
		} else {
			conf.set("fs.sftp.password." + host, null);
		}
		setConf(conf);
		this.uri = uri;
	}

	private ChannelSftp connect() throws IOException {
		ChannelSftp c = null;

		Configuration conf = getConf();
		String host = conf.get("fs.sftp.host");
		int port = conf.getInt("fs.sftp.host.port", DEFAULT_PORT);
		String user = conf.get("fs.sftp.user." + host);
		String password = conf.get("fs.sftp.password." + host);

		JSch jsch = new JSch();

		try {
			Session session = jsch.getSession(user, host, port);
			session.setConfig("StrictHostKeyChecking", "no"); 
			// username and password will be given via UserInfo interface.
			MyUserInfo ui = new MyUserInfo();
			ui.setPassword(password);
			session.setUserInfo(ui);

			session.connect();

			Channel channel= session.openChannel("sftp");
			channel.connect();
			c = (ChannelSftp) channel;
		} catch (JSchException e) {
			throw new IOException(e.getMessage());
		}

		return c;


		/*FTPClient client = null;
		Configuration conf = getConf();
		String host = conf.get("fs.ftp.host");
		int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT);
		String user = conf.get("fs.ftp.user." + host);
		String password = conf.get("fs.ftp.password." + host);
		client = new FTPClient();
		client.connect(host, port);
		int reply = client.getReplyCode();
		if (!FTPReply.isPositiveCompletion(reply)) {
			throw new IOException("Server - " + host
					+ " refused connection on port - " + port);
		} else if (client.login(user, password)) {
			client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
			client.setFileType(FTP.BINARY_FILE_TYPE);
			client.setBufferSize(DEFAULT_BUFFER_SIZE);
		} else {
			throw new IOException("Login failed on server - " + host + ", port - "
					+ port);
		}

		return client;*/
	}

	private void disconnect(ChannelSftp client) throws IOException {
		if (client != null) {
			if (!client.isConnected()) {
				throw new IOException("Client not connected");
			}
			client.disconnect();
		}
	}

	private Path makeAbsolute(Path workDir, Path path) {
		if (path.isAbsolute()) {
			return path;
		}
		return new Path(workDir, path);
	}

	@Override
	public FSDataInputStream open(Path file, int bufferSize) throws IOException {
		FSDataInputStream fis = null;

		try {
			ChannelSftp client = connect();
			//Path workDir = new Path(client.printWorkingDirectory());
			Path workDir = new Path(client.pwd());
			Path absolute = makeAbsolute(workDir, file);
			FileStatus fileStat = getFileStatus(client, absolute);
			if (fileStat.isDir()) {
				disconnect(client);
				throw new IOException("Path " + file + " is a directory.");
			}

			//client.allocate(bufferSize);
			Path parent = absolute.getParent();
			// Change to parent directory on the
			// server. Only then can we read the
			// file
			// on the server by opening up an InputStream. As a side effect the working
			// directory on the server is changed to the parent directory of the file.
			// The FTP client connection is closed when close() is called on the
			// FSDataInputStream.
			client.cd(parent.toUri().getPath());
			//client.changeWorkingDirectory(parent.toUri().getPath());
			InputStream is = client.get(file.getName());
			//retrieveFileStream(file.getName());
			fis = new FSDataInputStream(new SFTPInputStream(is, client, statistics));
			/*if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
			// The ftpClient is an inconsistent state. Must close the stream
			// which in turn will logout and disconnect from FTP server
			fis.close();
			throw new IOException("Unable to open file: " + file + ", Aborting");
			}*/
		} catch (SftpException e) {
			throw new IOException(e.getMessage());
		}
		return fis;
	}

	// This optional operation is not yet supported.
	public FSDataOutputStream append(Path f, int bufferSize,
			Progressable progress) throws IOException {
		throw new IOException("Not supported");
	}

	@Override
	public FSDataOutputStream create(Path file, FsPermission permission,
			boolean overwrite, int bufferSize, short replication, long blockSize,
			Progressable progress) throws IOException {
		FSDataOutputStream fos = null;
		try {
			final ChannelSftp client = connect();
			Path workDir = new Path(client.pwd());
			Path absolute = makeAbsolute(workDir, file);
			if (exists(client, file)) {
				if (overwrite) {
					delete(client, file);
				} else {
					disconnect(client);
					throw new IOException("File already exists: " + file);
				}
			}
			Path parent = absolute.getParent();
			if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
				parent = (parent == null) ? new Path("/") : parent;
				disconnect(client);
				throw new IOException("create(): Mkdirs failed to create: " + parent);
			}
			//client.allocate(bufferSize);
			// Change to parent directory on the server. Only then can we write to the
			// file on the server by opening up an OutputStream. As a side effect the
			// working directory on the server is changed to the parent directory of the
			// file. The FTP client connection is closed when close() is called on the
			// FSDataOutputStream.
			client.cd(parent.toUri().getPath());
			fos = new FSDataOutputStream(client.put(file.getName()), statistics) {
				@Override
				public void close() throws IOException {
					super.close();
					if (!client.isConnected()) {
						throw new SFTPException("Client not connected");
					}
					//boolean cmdCompleted = client.completePendingCommand();
					disconnect(client);
					/*if (!cmdCompleted) {
					throw new SFTPException("Could not complete transfer, Reply Code - " + client.getReplyCode());
				}*/
				}
			};
			/*if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
			// The ftpClient is an inconsistent state. Must close the stream
			// which in turn will logout and disconnect from FTP server
			fos.close();
			throw new IOException("Unable to create file: " + file + ", Aborting");
			}*/
		} catch (SftpException e) {
			throw new IOException(e.getMessage());
		}
		return fos;
	}

	private boolean exists(ChannelSftp client, Path file) {
		try {
			return getFileStatus(client, file) != null;
		} catch (FileNotFoundException fnfe) {
			return false;
		} catch (IOException ioe) {
			throw new SFTPException("Failed to get file status", ioe);
		}
	}


	@Override
	public boolean delete(Path file, boolean recursive) throws IOException {
		ChannelSftp client = connect();
		try {
			boolean success = delete(client, file, recursive);
			return success;
		} finally {
			disconnect(client);
		}
	}

	@Override
	public boolean delete(Path file) throws IOException {
		return delete(file, true);
	}

	private boolean delete(ChannelSftp client, Path file) throws IOException {
		return delete(client, file, true);
	}

	private boolean delete(ChannelSftp client, Path file, boolean recursive) throws IOException {
		try {
			Path workDir = new Path(client.pwd());
			Path absolute = makeAbsolute(workDir, file);
			String pathName = absolute.toUri().getPath();
			FileStatus fileStat = getFileStatus(client, absolute);
			if (!fileStat.isDir()) {
				client.rm(pathName);
				// TODO
				return true;
				//return client.deleteFile(pathName);
			}
			FileStatus[] dirEntries = listStatus(client, absolute);
			if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
				throw new IOException("Directory: " + file + " is not empty.");
			}
			if (dirEntries != null) {
				for (int i = 0; i < dirEntries.length; i++) {
					delete(client, new Path(absolute, dirEntries[i].getPath()), recursive);
				}
			}
			client.rmdir(pathName);
		} catch (SftpException e) {
			throw new IOException(e.getMessage());
		}
		// TODO
		return true;
	}

	@Override
	public FileStatus getFileStatus(Path file) throws IOException {
		ChannelSftp client = connect();
		try {
			FileStatus status = getFileStatus(client, file);
			return status;
		} finally {
			disconnect(client);
		}
	}

	private FileStatus getFileStatus(ChannelSftp client, Path file) throws IOException {
		FileStatus fileStat = null;
		try {
			Path workDir = new Path(client.pwd());
			Path absolute = makeAbsolute(workDir, file);
			Path parentPath = absolute.getParent();
			if (parentPath == null) { // root dir
				long length = -1; // Length of root dir on server not known
				boolean isDir = true;
				int blockReplication = 1;
				long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
				long modTime = -1; // Modification time of root dir not known.
				Path root = new Path("/");
				return new FileStatus(length, isDir, blockReplication, blockSize, modTime, root.makeQualified(this));
			}
			String pathName = parentPath.toUri().getPath();
			//FTPFile[] ftpFiles = client.listFiles(pathName);
			Vector<LsEntry> sftpFiles = (Vector<LsEntry>) client.ls(pathName);
			if (sftpFiles != null) {
				for (LsEntry sftpFile : sftpFiles) {
					if (sftpFile.getFilename().equals(file.getName())) { // file found in dir
						fileStat = getFileStatus(sftpFile, parentPath);
						break;
					}
				}
				if (fileStat == null) {
					throw new FileNotFoundException("File " + file + " does not exist.");
				}
			} else {
				throw new FileNotFoundException("File " + file + " does not exist.");
			}
		} catch (SftpException e) {
			throw new IOException(e.getMessage());
		}

		return fileStat;
	}

	private FileStatus getFileStatus(LsEntry sftpFile, Path parentPath) {
		long length = sftpFile.getAttrs().getSize();
		boolean isDir = sftpFile.getAttrs().isDir();
		int blockReplication = 1;
		// Using default block size since there is no way in FTP client to know of
		// block sizes on server. The assumption could be less than ideal.
		long blockSize = DEFAULT_BLOCK_SIZE;
		//getTimestamp().getTimeInMillis();
		long modTime = sftpFile.getAttrs().getATime()*1000;
		long accessTime = 0;
		FsPermission permission = new FsPermission((short) sftpFile.getAttrs().getPermissions());
		String user = Integer.toString(sftpFile.getAttrs().getUId());
		String group = Integer.toString(sftpFile.getAttrs().getGId());
		Path filePath = new Path(parentPath, sftpFile.getFilename());
		return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
				accessTime, permission, user, group, filePath.makeQualified(this));
	}

	@Override
	public URI getUri() {
		return uri;
	}

	@Override
	public Path getWorkingDirectory() {
		// Return home directory always since we do not maintain state.
		return getHomeDirectory();
	}

	@Override
	public Path getHomeDirectory() {
		ChannelSftp client = null;
		try {
			client = connect();
			Path homeDir = new Path(client.pwd());
			return homeDir;
		} catch (IOException ioe) {
			throw new SFTPException("Failed to get home directory", ioe);
		} catch (SftpException e) {
			throw new SFTPException("Failed to get home directory", e);
		} finally {
			try {
				disconnect(client);
			} catch (IOException ioe) {
				throw new SFTPException("Failed to disconnect", ioe);
			}
		}
	}

	@Override
	public FileStatus[] listStatus(Path file) throws IOException {
		ChannelSftp client = connect();
		try {
			FileStatus[] stats = listStatus(client, file);
			return stats;
		} finally {
			disconnect(client);
		}
	}

	private FileStatus[] listStatus(ChannelSftp client, Path file) throws IOException {
		FileStatus[] fileStats = null;

		try {
			Path workDir = new Path(client.pwd());
			Path absolute = makeAbsolute(workDir, file);
			FileStatus fileStat = getFileStatus(client, absolute);
			if (!fileStat.isDir()) {
				return new FileStatus[] { fileStat };
			}
			Vector<LsEntry> sftpFiles = (Vector<LsEntry>) client.ls(absolute.toUri().getPath());
			fileStats = new FileStatus[sftpFiles.size()];
			for (int i = 0; i < sftpFiles.size(); i++) {
				fileStats[i] = getFileStatus(sftpFiles.get(i), absolute);
			}
		} catch (SftpException e) {
			throw new IOException(e.getMessage());
		}
		return fileStats;
	}

	@Override
	public boolean mkdirs(Path file, FsPermission permission) throws IOException {
		ChannelSftp client = connect();
		try {
			boolean success = mkdirs(client, file, permission);
			return success;
		} finally {
			disconnect(client);
		}
	}

	private boolean mkdirs(ChannelSftp client, Path file, FsPermission permission) throws IOException {
		boolean created = true;
		try {
			Path workDir = new Path(client.pwd());
			Path absolute = makeAbsolute(workDir, file);
			String pathName = absolute.getName();
			if (!exists(client, absolute)) {
				Path parent = absolute.getParent();
				created = (parent == null || mkdirs(client, parent, FsPermission.getDefault()));
				if (created) {
					String parentDir = parent.toUri().getPath();
					client.cd(parentDir);
					//created = created & 
					client.mkdir(pathName);
				}
			} else if (isFile(client, absolute)) {
				throw new IOException(String.format("Can't make directory for path %s since it is a file.", absolute));
			}
		} catch (Exception e) {
			throw new IOException(e.getMessage());
		}
		return created;
	}

	private boolean isFile(ChannelSftp client, Path file) {
		try {
			return !getFileStatus(client, file).isDir();
		} catch (FileNotFoundException e) {
			return false; // file does not exist
		} catch (IOException ioe) {
			throw new SFTPException("File check failed", ioe);
		}
	}

	@Override
	public boolean rename(Path src, Path dst) throws IOException {
		ChannelSftp client = connect();
		try {
			boolean success = rename(client, src, dst);
			return success;
		} finally {
			disconnect(client);
		}
	}


	private boolean rename(ChannelSftp client, Path src, Path dst) throws IOException {
		try {
			Path workDir = new Path(client.pwd());
			Path absoluteSrc = makeAbsolute(workDir, src);
			Path absoluteDst = makeAbsolute(workDir, dst);
			if (!exists(client, absoluteSrc)) {
				throw new IOException("Source path " + src + " does not exist");
			}
			if (exists(client, absoluteDst)) {
				throw new IOException("Destination path " + dst
						+ " already exist, cannot rename!");
			}
			String parentSrc = absoluteSrc.getParent().toUri().toString();
			String parentDst = absoluteDst.getParent().toUri().toString();
			String from = src.getName();
			String to = dst.getName();
			if (!parentSrc.equals(parentDst)) {
				throw new IOException("Cannot rename parent(source): " + parentSrc
						+ ", parent(destination):  " + parentDst);
			}
			client.cd(parentSrc);
			client.rename(from, to);
		} catch (SftpException e) {
			throw new IOException(e.getMessage());
		}
		// TODO
		return true;
	}

	@Override
	public void setWorkingDirectory(Path newDir) {
		// we do not maintain the working directory state
	}

	public static class MyUserInfo implements UserInfo {
		private String password = null;

		public void setPassword(String password) {
			this.password = password;
		}

		@Override
		public String getPassword() {
			return this.password;
		}

		@Override
		public String getPassphrase() {
			return null;
		}

		@Override
		public boolean promptPassphrase(String arg0) {
			return false;
		}

		@Override
		public boolean promptPassword(String arg0) {
			return true;
		}

		@Override
		public boolean promptYesNo(String arg0) {
			return false;
		}

		@Override
		public void showMessage(String arg0) {}
	}
}
package org.apache.hadoop.fs.sftp;

public class SFTPException extends RuntimeException {
	private static final long serialVersionUID = 1L;

	public SFTPException(String message) {
		super(message);
	}

	public SFTPException(Throwable t) {
		super(t);
	}

	public SFTPException(String message, Throwable t) {
		super(message, t);
	}
}
package org.apache.hadoop.fs.sftp;
import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.fs.FileSystem;

import com.jcraft.jsch.ChannelSftp;


public class SFTPInputStream extends InputStream {
	InputStream wrappedStream;
	ChannelSftp client;
	FileSystem.Statistics stats;
	boolean closed;
	long pos;

	public SFTPInputStream(InputStream stream, ChannelSftp client, FileSystem.Statistics stats) {
		if (stream == null) {
			throw new IllegalArgumentException("Null InputStream");
		}
		if (client == null || !client.isConnected()) {
			throw new IllegalArgumentException("FTP client null or not connected");
		}
		this.wrappedStream = stream;
		this.client = client;
		this.stats = stats;
		this.pos = 0;
		this.closed = false;
	}

	public long getPos() throws IOException {
		return pos;
	}

	// We don't support seek.
	public void seek(long pos) throws IOException {
		throw new IOException("Seek not supported");
	}

	public boolean seekToNewSource(long targetPos) throws IOException {
		throw new IOException("Seek not supported");
	}

	public synchronized int read() throws IOException {
		if (closed) {
			throw new IOException("Stream closed");
		}

		int byteRead = wrappedStream.read();
		if (byteRead >= 0) {
			pos++;
		}
		if (stats != null & byteRead >= 0) {
			stats.incrementBytesRead(1);
		}
		return byteRead;
	}

	public synchronized int read(byte buf[], int off, int len) throws IOException {
		if (closed) {
			throw new IOException("Stream closed");
		}

		int result = wrappedStream.read(buf, off, len);
		if (result > 0) {
			pos += result;
		}
		if (stats != null & result > 0) {
			stats.incrementBytesRead(result);
		}

		return result;
	}

	public synchronized void close() throws IOException {
		if (closed) {
			throw new IOException("Stream closed");
		}
		super.close();
		closed = true;
		if (!client.isConnected()) {
			throw new SFTPException("Client not connected");
		}

		client.disconnect();
	}

	// Not supported.
	public boolean markSupported() {
		return false;
	}

	public void mark(int readLimit) {
		// Do nothing
	}

	public void reset() throws IOException {
		throw new IOException("Mark not supported");
	}
}

Reply via email to