Hi everybody,
I have developed a Filesystem that supports SFTP, it uses JSch in order to
manage SFTP.
It is a little buggy, but it is just a starting point.
Íñigo Goiri
Ahora también puedes acceder a tu correo Terra desde el móvil.
Infórmate pinchando aquí.
package org.apache.hadoop.fs.sftp;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import com.jcraft.jsch.Channel;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
import com.jcraft.jsch.UserInfo;
import com.jcraft.jsch.ChannelSftp.LsEntry;
public class SFTPFileSystem extends FileSystem {
public static final Log LOG = LogFactory.getLog(SFTPFileSystem.class);
public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
private static final int DEFAULT_PORT = 22;
private URI uri;
@Override
public void initialize(URI uri, Configuration conf) throws IOException { // get
super.initialize(uri, conf);
// get host information from uri (overrides info in conf)
String host = uri.getHost();
host = (host == null) ? conf.get("fs.sftp.host", null) : host;
if (host == null) {
throw new IOException("Invalid host specified");
}
conf.set("fs.sftp.host", host);
// get port information from uri, (overrides info in conf)
int port = uri.getPort();
port = (port == -1) ? DEFAULT_PORT : port;
conf.setInt("fs.sftp.host.port", port);
// get user/password information from URI (overrides info in conf)
String userAndPassword = uri.getUserInfo();
if (userAndPassword == null) {
userAndPassword = (conf.get("fs.sftp.user." + host, null) + ":" + conf.get("fs.sftp.password." + host, null));
if (userAndPassword == null) {
throw new IOException("Invalid user/passsword specified");
}
}
String[] userPasswdInfo = userAndPassword.split(":");
conf.set("fs.sftp.user." + host, userPasswdInfo[0]);
if (userPasswdInfo.length > 1) {
conf.set("fs.sftp.password." + host, userPasswdInfo[1]);
} else {
conf.set("fs.sftp.password." + host, null);
}
setConf(conf);
this.uri = uri;
}
private ChannelSftp connect() throws IOException {
ChannelSftp c = null;
Configuration conf = getConf();
String host = conf.get("fs.sftp.host");
int port = conf.getInt("fs.sftp.host.port", DEFAULT_PORT);
String user = conf.get("fs.sftp.user." + host);
String password = conf.get("fs.sftp.password." + host);
JSch jsch = new JSch();
try {
Session session = jsch.getSession(user, host, port);
session.setConfig("StrictHostKeyChecking", "no");
// username and password will be given via UserInfo interface.
MyUserInfo ui = new MyUserInfo();
ui.setPassword(password);
session.setUserInfo(ui);
session.connect();
Channel channel= session.openChannel("sftp");
channel.connect();
c = (ChannelSftp) channel;
} catch (JSchException e) {
throw new IOException(e.getMessage());
}
return c;
/*FTPClient client = null;
Configuration conf = getConf();
String host = conf.get("fs.ftp.host");
int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT);
String user = conf.get("fs.ftp.user." + host);
String password = conf.get("fs.ftp.password." + host);
client = new FTPClient();
client.connect(host, port);
int reply = client.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
throw new IOException("Server - " + host
+ " refused connection on port - " + port);
} else if (client.login(user, password)) {
client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
client.setFileType(FTP.BINARY_FILE_TYPE);
client.setBufferSize(DEFAULT_BUFFER_SIZE);
} else {
throw new IOException("Login failed on server - " + host + ", port - "
+ port);
}
return client;*/
}
private void disconnect(ChannelSftp client) throws IOException {
if (client != null) {
if (!client.isConnected()) {
throw new IOException("Client not connected");
}
client.disconnect();
}
}
private Path makeAbsolute(Path workDir, Path path) {
if (path.isAbsolute()) {
return path;
}
return new Path(workDir, path);
}
@Override
public FSDataInputStream open(Path file, int bufferSize) throws IOException {
FSDataInputStream fis = null;
try {
ChannelSftp client = connect();
//Path workDir = new Path(client.printWorkingDirectory());
Path workDir = new Path(client.pwd());
Path absolute = makeAbsolute(workDir, file);
FileStatus fileStat = getFileStatus(client, absolute);
if (fileStat.isDir()) {
disconnect(client);
throw new IOException("Path " + file + " is a directory.");
}
//client.allocate(bufferSize);
Path parent = absolute.getParent();
// Change to parent directory on the
// server. Only then can we read the
// file
// on the server by opening up an InputStream. As a side effect the working
// directory on the server is changed to the parent directory of the file.
// The FTP client connection is closed when close() is called on the
// FSDataInputStream.
client.cd(parent.toUri().getPath());
//client.changeWorkingDirectory(parent.toUri().getPath());
InputStream is = client.get(file.getName());
//retrieveFileStream(file.getName());
fis = new FSDataInputStream(new SFTPInputStream(is, client, statistics));
/*if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
// The ftpClient is an inconsistent state. Must close the stream
// which in turn will logout and disconnect from FTP server
fis.close();
throw new IOException("Unable to open file: " + file + ", Aborting");
}*/
} catch (SftpException e) {
throw new IOException(e.getMessage());
}
return fis;
}
// This optional operation is not yet supported.
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new IOException("Not supported");
}
@Override
public FSDataOutputStream create(Path file, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
FSDataOutputStream fos = null;
try {
final ChannelSftp client = connect();
Path workDir = new Path(client.pwd());
Path absolute = makeAbsolute(workDir, file);
if (exists(client, file)) {
if (overwrite) {
delete(client, file);
} else {
disconnect(client);
throw new IOException("File already exists: " + file);
}
}
Path parent = absolute.getParent();
if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
parent = (parent == null) ? new Path("/") : parent;
disconnect(client);
throw new IOException("create(): Mkdirs failed to create: " + parent);
}
//client.allocate(bufferSize);
// Change to parent directory on the server. Only then can we write to the
// file on the server by opening up an OutputStream. As a side effect the
// working directory on the server is changed to the parent directory of the
// file. The FTP client connection is closed when close() is called on the
// FSDataOutputStream.
client.cd(parent.toUri().getPath());
fos = new FSDataOutputStream(client.put(file.getName()), statistics) {
@Override
public void close() throws IOException {
super.close();
if (!client.isConnected()) {
throw new SFTPException("Client not connected");
}
//boolean cmdCompleted = client.completePendingCommand();
disconnect(client);
/*if (!cmdCompleted) {
throw new SFTPException("Could not complete transfer, Reply Code - " + client.getReplyCode());
}*/
}
};
/*if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
// The ftpClient is an inconsistent state. Must close the stream
// which in turn will logout and disconnect from FTP server
fos.close();
throw new IOException("Unable to create file: " + file + ", Aborting");
}*/
} catch (SftpException e) {
throw new IOException(e.getMessage());
}
return fos;
}
private boolean exists(ChannelSftp client, Path file) {
try {
return getFileStatus(client, file) != null;
} catch (FileNotFoundException fnfe) {
return false;
} catch (IOException ioe) {
throw new SFTPException("Failed to get file status", ioe);
}
}
@Override
public boolean delete(Path file, boolean recursive) throws IOException {
ChannelSftp client = connect();
try {
boolean success = delete(client, file, recursive);
return success;
} finally {
disconnect(client);
}
}
@Override
public boolean delete(Path file) throws IOException {
return delete(file, true);
}
private boolean delete(ChannelSftp client, Path file) throws IOException {
return delete(client, file, true);
}
private boolean delete(ChannelSftp client, Path file, boolean recursive) throws IOException {
try {
Path workDir = new Path(client.pwd());
Path absolute = makeAbsolute(workDir, file);
String pathName = absolute.toUri().getPath();
FileStatus fileStat = getFileStatus(client, absolute);
if (!fileStat.isDir()) {
client.rm(pathName);
// TODO
return true;
//return client.deleteFile(pathName);
}
FileStatus[] dirEntries = listStatus(client, absolute);
if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
throw new IOException("Directory: " + file + " is not empty.");
}
if (dirEntries != null) {
for (int i = 0; i < dirEntries.length; i++) {
delete(client, new Path(absolute, dirEntries[i].getPath()), recursive);
}
}
client.rmdir(pathName);
} catch (SftpException e) {
throw new IOException(e.getMessage());
}
// TODO
return true;
}
@Override
public FileStatus getFileStatus(Path file) throws IOException {
ChannelSftp client = connect();
try {
FileStatus status = getFileStatus(client, file);
return status;
} finally {
disconnect(client);
}
}
private FileStatus getFileStatus(ChannelSftp client, Path file) throws IOException {
FileStatus fileStat = null;
try {
Path workDir = new Path(client.pwd());
Path absolute = makeAbsolute(workDir, file);
Path parentPath = absolute.getParent();
if (parentPath == null) { // root dir
long length = -1; // Length of root dir on server not known
boolean isDir = true;
int blockReplication = 1;
long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
long modTime = -1; // Modification time of root dir not known.
Path root = new Path("/");
return new FileStatus(length, isDir, blockReplication, blockSize, modTime, root.makeQualified(this));
}
String pathName = parentPath.toUri().getPath();
//FTPFile[] ftpFiles = client.listFiles(pathName);
Vector<LsEntry> sftpFiles = (Vector<LsEntry>) client.ls(pathName);
if (sftpFiles != null) {
for (LsEntry sftpFile : sftpFiles) {
if (sftpFile.getFilename().equals(file.getName())) { // file found in dir
fileStat = getFileStatus(sftpFile, parentPath);
break;
}
}
if (fileStat == null) {
throw new FileNotFoundException("File " + file + " does not exist.");
}
} else {
throw new FileNotFoundException("File " + file + " does not exist.");
}
} catch (SftpException e) {
throw new IOException(e.getMessage());
}
return fileStat;
}
private FileStatus getFileStatus(LsEntry sftpFile, Path parentPath) {
long length = sftpFile.getAttrs().getSize();
boolean isDir = sftpFile.getAttrs().isDir();
int blockReplication = 1;
// Using default block size since there is no way in FTP client to know of
// block sizes on server. The assumption could be less than ideal.
long blockSize = DEFAULT_BLOCK_SIZE;
//getTimestamp().getTimeInMillis();
long modTime = sftpFile.getAttrs().getATime()*1000;
long accessTime = 0;
FsPermission permission = new FsPermission((short) sftpFile.getAttrs().getPermissions());
String user = Integer.toString(sftpFile.getAttrs().getUId());
String group = Integer.toString(sftpFile.getAttrs().getGId());
Path filePath = new Path(parentPath, sftpFile.getFilename());
return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
accessTime, permission, user, group, filePath.makeQualified(this));
}
@Override
public URI getUri() {
return uri;
}
@Override
public Path getWorkingDirectory() {
// Return home directory always since we do not maintain state.
return getHomeDirectory();
}
@Override
public Path getHomeDirectory() {
ChannelSftp client = null;
try {
client = connect();
Path homeDir = new Path(client.pwd());
return homeDir;
} catch (IOException ioe) {
throw new SFTPException("Failed to get home directory", ioe);
} catch (SftpException e) {
throw new SFTPException("Failed to get home directory", e);
} finally {
try {
disconnect(client);
} catch (IOException ioe) {
throw new SFTPException("Failed to disconnect", ioe);
}
}
}
@Override
public FileStatus[] listStatus(Path file) throws IOException {
ChannelSftp client = connect();
try {
FileStatus[] stats = listStatus(client, file);
return stats;
} finally {
disconnect(client);
}
}
private FileStatus[] listStatus(ChannelSftp client, Path file) throws IOException {
FileStatus[] fileStats = null;
try {
Path workDir = new Path(client.pwd());
Path absolute = makeAbsolute(workDir, file);
FileStatus fileStat = getFileStatus(client, absolute);
if (!fileStat.isDir()) {
return new FileStatus[] { fileStat };
}
Vector<LsEntry> sftpFiles = (Vector<LsEntry>) client.ls(absolute.toUri().getPath());
fileStats = new FileStatus[sftpFiles.size()];
for (int i = 0; i < sftpFiles.size(); i++) {
fileStats[i] = getFileStatus(sftpFiles.get(i), absolute);
}
} catch (SftpException e) {
throw new IOException(e.getMessage());
}
return fileStats;
}
@Override
public boolean mkdirs(Path file, FsPermission permission) throws IOException {
ChannelSftp client = connect();
try {
boolean success = mkdirs(client, file, permission);
return success;
} finally {
disconnect(client);
}
}
private boolean mkdirs(ChannelSftp client, Path file, FsPermission permission) throws IOException {
boolean created = true;
try {
Path workDir = new Path(client.pwd());
Path absolute = makeAbsolute(workDir, file);
String pathName = absolute.getName();
if (!exists(client, absolute)) {
Path parent = absolute.getParent();
created = (parent == null || mkdirs(client, parent, FsPermission.getDefault()));
if (created) {
String parentDir = parent.toUri().getPath();
client.cd(parentDir);
//created = created &
client.mkdir(pathName);
}
} else if (isFile(client, absolute)) {
throw new IOException(String.format("Can't make directory for path %s since it is a file.", absolute));
}
} catch (Exception e) {
throw new IOException(e.getMessage());
}
return created;
}
private boolean isFile(ChannelSftp client, Path file) {
try {
return !getFileStatus(client, file).isDir();
} catch (FileNotFoundException e) {
return false; // file does not exist
} catch (IOException ioe) {
throw new SFTPException("File check failed", ioe);
}
}
@Override
public boolean rename(Path src, Path dst) throws IOException {
ChannelSftp client = connect();
try {
boolean success = rename(client, src, dst);
return success;
} finally {
disconnect(client);
}
}
private boolean rename(ChannelSftp client, Path src, Path dst) throws IOException {
try {
Path workDir = new Path(client.pwd());
Path absoluteSrc = makeAbsolute(workDir, src);
Path absoluteDst = makeAbsolute(workDir, dst);
if (!exists(client, absoluteSrc)) {
throw new IOException("Source path " + src + " does not exist");
}
if (exists(client, absoluteDst)) {
throw new IOException("Destination path " + dst
+ " already exist, cannot rename!");
}
String parentSrc = absoluteSrc.getParent().toUri().toString();
String parentDst = absoluteDst.getParent().toUri().toString();
String from = src.getName();
String to = dst.getName();
if (!parentSrc.equals(parentDst)) {
throw new IOException("Cannot rename parent(source): " + parentSrc
+ ", parent(destination): " + parentDst);
}
client.cd(parentSrc);
client.rename(from, to);
} catch (SftpException e) {
throw new IOException(e.getMessage());
}
// TODO
return true;
}
@Override
public void setWorkingDirectory(Path newDir) {
// we do not maintain the working directory state
}
public static class MyUserInfo implements UserInfo {
private String password = null;
public void setPassword(String password) {
this.password = password;
}
@Override
public String getPassword() {
return this.password;
}
@Override
public String getPassphrase() {
return null;
}
@Override
public boolean promptPassphrase(String arg0) {
return false;
}
@Override
public boolean promptPassword(String arg0) {
return true;
}
@Override
public boolean promptYesNo(String arg0) {
return false;
}
@Override
public void showMessage(String arg0) {}
}
}
package org.apache.hadoop.fs.sftp;
public class SFTPException extends RuntimeException {
private static final long serialVersionUID = 1L;
public SFTPException(String message) {
super(message);
}
public SFTPException(Throwable t) {
super(t);
}
public SFTPException(String message, Throwable t) {
super(message, t);
}
}
package org.apache.hadoop.fs.sftp;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.fs.FileSystem;
import com.jcraft.jsch.ChannelSftp;
public class SFTPInputStream extends InputStream {
InputStream wrappedStream;
ChannelSftp client;
FileSystem.Statistics stats;
boolean closed;
long pos;
public SFTPInputStream(InputStream stream, ChannelSftp client, FileSystem.Statistics stats) {
if (stream == null) {
throw new IllegalArgumentException("Null InputStream");
}
if (client == null || !client.isConnected()) {
throw new IllegalArgumentException("FTP client null or not connected");
}
this.wrappedStream = stream;
this.client = client;
this.stats = stats;
this.pos = 0;
this.closed = false;
}
public long getPos() throws IOException {
return pos;
}
// We don't support seek.
public void seek(long pos) throws IOException {
throw new IOException("Seek not supported");
}
public boolean seekToNewSource(long targetPos) throws IOException {
throw new IOException("Seek not supported");
}
public synchronized int read() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
int byteRead = wrappedStream.read();
if (byteRead >= 0) {
pos++;
}
if (stats != null & byteRead >= 0) {
stats.incrementBytesRead(1);
}
return byteRead;
}
public synchronized int read(byte buf[], int off, int len) throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
int result = wrappedStream.read(buf, off, len);
if (result > 0) {
pos += result;
}
if (stats != null & result > 0) {
stats.incrementBytesRead(result);
}
return result;
}
public synchronized void close() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
super.close();
closed = true;
if (!client.isConnected()) {
throw new SFTPException("Client not connected");
}
client.disconnect();
}
// Not supported.
public boolean markSupported() {
return false;
}
public void mark(int readLimit) {
// Do nothing
}
public void reset() throws IOException {
throw new IOException("Mark not supported");
}
}