defaulting to LocalFileSystemCodeDistributor, moving all code distributors to their own packages.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/547ed491 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/547ed491 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/547ed491 Branch: refs/heads/nimbus-ha-branch Commit: 547ed4916761508040f056c6812e047e04e8db3a Parents: e5a96f4 Author: Parth Brahmbhatt <[email protected]> Authored: Thu Dec 18 16:39:25 2014 -0800 Committer: Parth Brahmbhatt <[email protected]> Committed: Thu Dec 18 16:39:25 2014 -0800 ---------------------------------------------------------------------- conf/defaults.yaml | 2 +- .../ha/codedistributor/HDFSCodeDistributor.java | 2 +- .../src/clj/backtype/storm/daemon/nimbus.clj | 2 + storm-core/src/jvm/backtype/storm/Config.java | 2 +- .../BitTorrentCodeDistributor.java | 191 +++++++++++++++++++ .../storm/codedistributor/ICodeDistributor.java | 57 ++++++ .../LocalFileSystemCodeDistributor.java | 105 ++++++++++ .../backtype/storm/nimbus/ICodeDistributor.java | 57 ------ .../nimbus/LocalFileSystemCodeDistributor.java | 106 ---------- .../torrent/BitTorrentCodeDistributor.java | 191 ------------------- 10 files changed, 358 insertions(+), 357 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 7757b24..07230dc 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -48,7 +48,7 @@ storm.auth.simple-acl.users: [] storm.auth.simple-acl.users.commands: [] storm.auth.simple-acl.admins: [] storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate" -storm.codedistributor.class: "backtype.storm.torrent.BitTorrentCodeDistributor" +storm.codedistributor.class: "backtype.storm.codedistributor.LocalFileSystemCodeDistributor" ### bittorrent configuration bittorrent.port: 6969 http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java index 9935ae7..f6736bd 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/ha/codedistributor/HDFSCodeDistributor.java @@ -1,6 +1,6 @@ package org.apache.storm.hdfs.ha.codedistributor; -import backtype.storm.nimbus.ICodeDistributor; +import backtype.storm.codedistributor.ICodeDistributor; import com.google.common.collect.Lists; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.Validate; http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/clj/backtype/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index fbe8e77..5a3fa12 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -65,6 +65,8 @@ (defmulti sync-code cluster-mode) ;;TODO we should try genclass for zkLeaderElector and just set NIMBUS-LEADER-ELECTOR-CLASS in defaults.yaml +;;TODO we need to pass acls, looks like not posible as leader-latch does not work with ACLS +;;TODO we need to call .preapare or just get rid of the interface all together. (defn mk-leader-elector [conf] (if (conf NIMBUS-LEADER-ELECTOR-CLASS) (do (log-message "Using custom Leade elector: " (conf NIMBUS-LEADER-ELECTOR-CLASS)) http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 16216ea..81046b3 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -1257,7 +1257,7 @@ public class Config extends HashMap<String, Object> { public static final Object TOPOLOGY_HDFS_URI_SCHEMA = String.class; /** - * Which implementation of {@link backtype.storm.nimbus.ICodeDistributor} should be used by storm for code + * Which implementation of {@link backtype.storm.codedistributor.ICodeDistributor} should be used by storm for code * distribution. */ public static final String STORM_CODE_DISTRIBUTOR_CLASS = "storm.codedistributor.class"; http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java new file mode 100644 index 0000000..190cc5f --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java @@ -0,0 +1,191 @@ +package backtype.storm.codedistributor; + +import backtype.storm.Config; +import com.google.common.collect.Lists; +import com.google.common.primitives.Shorts; +import com.turn.ttorrent.client.Client; +import com.turn.ttorrent.client.SharedTorrent; +import com.turn.ttorrent.common.Torrent; +import com.turn.ttorrent.tracker.TrackedTorrent; +import com.turn.ttorrent.tracker.Tracker; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.util.*; + +public class BitTorrentCodeDistributor implements ICodeDistributor { + private static final Logger LOG = LoggerFactory.getLogger(BitTorrentCodeDistributor.class); + private Tracker tracker; + private String hostName; + private InetSocketAddress address; + private Integer port; + protected HashMap<String, Client> clients = new HashMap<String, Client>(); + protected Double maxDownload; + protected Double maxUpload; + private Integer seedDuration; + + @Override + public void prepare(Map conf) throws Exception { + this.hostName = InetAddress.getLocalHost().getCanonicalHostName(); + this.port = (Integer) conf.get(Config.BITTORRENT_PORT); + this.maxDownload = (Double) conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE); + this.maxUpload = (Double) conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE); + this.seedDuration = (Integer) conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION); + + LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxUpload), format(this.maxDownload)); + + LOG.info("Starting bt tracker bound to hostname '{}'", hostName); + //using "0.0.0.0" to ensure we bind to all IPV4 network interfaces. + this.address = new InetSocketAddress("0.0.0.0", port); + + this.tracker = new Tracker(address); + LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl()); + this.tracker.start(); + } + + @Override + public File upload(String dirPath, String topologyId) throws Exception { + File destDir = new File(dirPath); + LOG.info("Generating torrent for directory: {}", destDir.getAbsolutePath()); + + URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce"); + LOG.info("Creating torrent with announce URL: {}", uri); + + //TODO: why does listing the directory not work? + ArrayList<File> files = new ArrayList<File>(); + files.add(new File(destDir, "stormjar.jar")); + files.add(new File(destDir, "stormconf.ser")); + files.add(new File(destDir, "stormcode.ser")); + + Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus"); + File torrentFile = new File(destDir, "storm-code-distributor.meta"); + torrent.save(new FileOutputStream(torrentFile)); + LOG.info("Saved torrent: {}", torrentFile.getAbsolutePath()); + this.tracker.announce(new TrackedTorrent(torrent)); + + Client client = new Client(getInetAddress(), new SharedTorrent(torrent, destDir.getParentFile(), true)); + this.clients.put(topologyId, client); + rebalanceRates(); + client.share(); + LOG.info("Seeding torrent..."); + + /** + * + * TODO: Every time on prepare we need to call tracker.announce for all torrents that + * exists in the file system, other wise the tracker will reject any peer request + * with unknown torrents. You need to bootstrap trackers. + */ + return torrentFile; + } + + @Override + public List<File> download(String topologyId, File torrentFile) throws Exception { + LOG.info("Initiating BitTorrent download."); + + File destDir = torrentFile.getParentFile(); + LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath()); + LOG.info("Saving files to directory: {}", destDir.getAbsolutePath()); + SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); + Client client = new Client(getInetAddress(), st); + this.clients.put(topologyId, client); + rebalanceRates(); + client.share(this.seedDuration); + + //TODO: Should have a timeout after which we just fail the supervisor. + if (this.seedDuration == 0) { + client.waitForCompletion(); + } else { + LOG.info("Waiting for seeding to begin..."); + while (client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + LOG.info("BitTorrent download complete."); + + /** + * This should not be needed. currently the bittorrent library uses the torrent name (which is topologyId) + * as the folder name and downloads all the files under that folder. so we need to either download + * the torrent files under /storm-local/supervisor/stormdist or nimbus/stormdist/ to ensure stormdist becomes + * the parent of all torrent files and the actual code will be downloaded under stormdist/topologyId/. + * Ideally we should be able to specify that the downloaded files must be downloaded under + * given folder only and no extra folder needs to be created. + */ + + File srcDir = new File(destDir, topologyId); + for (File file : srcDir.listFiles()) { + FileUtils.copyFileToDirectory(file, destDir); + file.delete(); + } + srcDir.delete(); + + return Lists.newArrayList(destDir.listFiles()); + } + + private InetAddress getInetAddress() throws UnknownHostException { + for (InetAddress addr : InetAddress.getAllByName(this.hostName)) { + if (!addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && !addr.isMulticastAddress()) { + return addr; + } + } + + throw new RuntimeException("No valid InetAddress could be obtained, something really wrong with network configuration."); + } + + @Override + public short getReplicationCount(String topologyId) { + Collection<TrackedTorrent> trackedTorrents = tracker.getTrackedTorrents(); + for (final TrackedTorrent trackedTorrent : trackedTorrents) { + if (trackedTorrent.getName().equals(topologyId)) { + return Shorts.checkedCast(trackedTorrent.seeders()); + } + } + + LOG.warn("No torrent found in tracker for topologyId = " + topologyId); + return 0; + } + + @Override + public void cleanup(String topologyId) { + LOG.info("Stop seeding/tracking for topology {}", topologyId); + Client client = this.clients.remove(topologyId); + if (client != null) { + Torrent torrent = client.getTorrent(); + client.stop(); + this.tracker.remove(torrent); + } + rebalanceRates(); + } + + @Override + public void close(Map conf) { + this.tracker.stop(); + } + + private synchronized void rebalanceRates() { + int clientCount = this.clients.size(); + if (clientCount > 0) { + double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount; + double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount; + LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount); + LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl)); + for (Client client : this.clients.values()) { + client.setMaxDownloadRate(maxDl); + client.setMaxUploadRate(maxUl); + } + } + } + + private static String format(double val) { + return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java new file mode 100644 index 0000000..f536a2a --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/codedistributor/ICodeDistributor.java @@ -0,0 +1,57 @@ +package backtype.storm.codedistributor; + + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; + +/** + * Interface responsible to distribute code in the cluster. + */ +public interface ICodeDistributor { + /** + * Prepare this code distributor. + * @param conf + */ + void prepare(Map conf) throws Exception; + + /** + * This API will perform the actual upload of the code to the distribution implementation. + * The API should return a Meta file which should have enough information for downloader + * so it can download the code e.g. for bittorrent it will be a torrent file, in case of something like HDFS or s3 + * it might have the actual directory where all the code is put. + * @param dirPath directory where all the code to be distributed exists. + * @param topologyId the topologyId for which the meta file needs to be created. + * @return metaFile + */ + File upload(String dirPath, String topologyId) throws Exception; + + /** + * Given the topologyId and metafile, download the actual code and return the downloaded file's list. + * @param topologyid + * @param metafile + * @return + */ + List<File> download(String topologyid, File metafile) throws Exception; + + /** + * returns number of nodes to which the code is already replicated for the topology. + * @param topologyId + * @return + */ + short getReplicationCount(String topologyId) throws Exception; + + /** + * Performs the cleanup. + * @param topologyid + */ + void cleanup(String topologyid) throws IOException; + + /** + * Close this distributor. + * @param conf + */ + void close(Map conf); +} http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java new file mode 100644 index 0000000..96422e2 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/codedistributor/LocalFileSystemCodeDistributor.java @@ -0,0 +1,105 @@ +package backtype.storm.codedistributor; + +import backtype.storm.utils.ZookeeperAuthInfo; +import com.google.common.collect.Lists; +import org.apache.commons.io.FileUtils; +import org.apache.curator.framework.CuratorFramework; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static backtype.storm.Config.*; +import static backtype.storm.utils.Utils.downloadFromHost; +import static backtype.storm.utils.Utils.newCurator; + + +public class LocalFileSystemCodeDistributor implements ICodeDistributor { + private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemCodeDistributor.class); + private CuratorFramework zkClient; + private Map conf; + + @Override + public void prepare(Map conf) throws Exception { + this.conf = conf; + List<String> zkServers = (List<String>) conf.get(STORM_ZOOKEEPER_SERVERS); + int port = (Integer) conf.get(STORM_ZOOKEEPER_PORT); + ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf); + zkClient = newCurator(conf, zkServers, port, (String) conf.get(STORM_ZOOKEEPER_ROOT), zkAuthInfo); + zkClient.start(); + } + + @Override + public File upload(String dirPath, String topologyId) throws Exception { + ArrayList<File> files = new ArrayList<File>(); + File destDir = new File(dirPath); + File[] localFiles = destDir.listFiles(); + + List<String> filePaths = new ArrayList<String>(3); + for (File file : localFiles) { + filePaths.add(file.getAbsolutePath()); + } + + File metaFile = new File(destDir, "storm-code-distributor.meta"); + boolean isCreated = metaFile.createNewFile(); + if (isCreated) { + FileUtils.writeLines(metaFile, filePaths); + } else { + LOG.warn("metafile " + metaFile.getAbsolutePath() + " already exists."); + } + + LOG.info("Created meta file " + metaFile.getAbsolutePath() + " upload successful."); + + return metaFile; + } + + @Override + public List<File> download(String topologyid, File metafile) throws Exception { + List<String> hostInfos = zkClient.getChildren().forPath("/code-distributor/" + topologyid); + File destDir = metafile.getParentFile(); + List<File> downloadedFiles = Lists.newArrayList(); + for (String absoluteFilePath : FileUtils.readLines(metafile)) { + + File localFile = new File(destDir, new File(absoluteFilePath).getName()); + + boolean isSuccess = false; + for (String hostAndPort : hostInfos) { + String host = hostAndPort.split(":")[0]; + int port = Integer.parseInt(hostAndPort.split(":")[1]); + try { + downloadFromHost(conf, absoluteFilePath, localFile.getAbsolutePath(), host, port); + downloadedFiles.add(localFile); + isSuccess = true; + break; + } catch (Exception e) { + LOG.error("download failed from {}:{}, will try another endpoint ", host, port, e); + } + } + + if(!isSuccess) { + throw new RuntimeException("File " + absoluteFilePath +" could not be downloaded from any endpoint"); + } + } + + return downloadedFiles; + } + + @Override + public short getReplicationCount(String topologyId) throws Exception { + return (short) zkClient.getChildren().forPath("/code-distributor/" + topologyId).size(); + } + + @Override + public void cleanup(String topologyid) throws IOException { + //no op. + } + + @Override + public void close(Map conf) { + zkClient.close(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/nimbus/ICodeDistributor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/nimbus/ICodeDistributor.java b/storm-core/src/jvm/backtype/storm/nimbus/ICodeDistributor.java deleted file mode 100644 index 8189179..0000000 --- a/storm-core/src/jvm/backtype/storm/nimbus/ICodeDistributor.java +++ /dev/null @@ -1,57 +0,0 @@ -package backtype.storm.nimbus; - - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.util.List; -import java.util.Map; - -/** - * Interface responsible to distribute code in the cluster. - */ -public interface ICodeDistributor { - /** - * Prepare this code distributor. - * @param conf - */ - void prepare(Map conf) throws Exception; - - /** - * This API will perform the actual upload of the code to the distribution implementation. - * The API should return a Meta file which should have enough information for downloader - * so it can download the code e.g. for bittorrent it will be a torrent file, in case of something like HDFS or s3 - * it might have the actual directory where all the code is put. - * @param dirPath directory where all the code to be distributed exists. - * @param topologyId the topologyId for which the meta file needs to be created. - * @return metaFile - */ - File upload(String dirPath, String topologyId) throws Exception; - - /** - * Given the topologyId and metafile, download the actual code and return the downloaded file's list. - * @param topologyid - * @param metafile - * @return - */ - List<File> download(String topologyid, File metafile) throws Exception; - - /** - * returns number of nodes to which the code is already replicated for the topology. - * @param topologyId - * @return - */ - short getReplicationCount(String topologyId) throws Exception; - - /** - * Performs the cleanup. - * @param topologyid - */ - void cleanup(String topologyid) throws IOException; - - /** - * Close this distributor. - * @param conf - */ - void close(Map conf); -} http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java b/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java deleted file mode 100644 index bcf0167..0000000 --- a/storm-core/src/jvm/backtype/storm/nimbus/LocalFileSystemCodeDistributor.java +++ /dev/null @@ -1,106 +0,0 @@ -package backtype.storm.nimbus; - -import backtype.storm.Config; -import backtype.storm.utils.ZookeeperAuthInfo; -import com.google.common.collect.Lists; -import org.apache.commons.io.FileUtils; -import org.apache.curator.framework.CuratorFramework; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import static backtype.storm.Config.*; -import static backtype.storm.utils.Utils.downloadFromHost; -import static backtype.storm.utils.Utils.newCurator; - - -public class LocalFileSystemCodeDistributor implements ICodeDistributor { - private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemCodeDistributor.class); - private CuratorFramework zkClient; - private Map conf; - - @Override - public void prepare(Map conf) throws Exception { - this.conf = conf; - List<String> zkServers = (List<String>) conf.get(STORM_ZOOKEEPER_SERVERS); - int port = (Integer) conf.get(STORM_ZOOKEEPER_PORT); - ZookeeperAuthInfo zkAuthInfo = new ZookeeperAuthInfo(conf); - zkClient = newCurator(conf, zkServers, port, (String) conf.get(STORM_ZOOKEEPER_ROOT), zkAuthInfo); - zkClient.start(); - } - - @Override - public File upload(String dirPath, String topologyId) throws Exception { - ArrayList<File> files = new ArrayList<File>(); - File destDir = new File(dirPath); - File[] localFiles = destDir.listFiles(); - - List<String> filePaths = new ArrayList<String>(3); - for (File file : localFiles) { - filePaths.add(file.getAbsolutePath()); - } - - File metaFile = new File(destDir, "storm-code-distributor.meta"); - boolean isCreated = metaFile.createNewFile(); - if (isCreated) { - FileUtils.writeLines(metaFile, filePaths); - } else { - LOG.warn("metafile " + metaFile.getAbsolutePath() + " already exists."); - } - - LOG.info("Created meta file " + metaFile.getAbsolutePath() + " upload successful."); - - return metaFile; - } - - @Override - public List<File> download(String topologyid, File metafile) throws Exception { - List<String> hostInfos = zkClient.getChildren().forPath("/code-distributor/" + topologyid); - File destDir = metafile.getParentFile(); - List<File> downloadedFiles = Lists.newArrayList(); - for (String absoluteFilePath : FileUtils.readLines(metafile)) { - - File localFile = new File(destDir, new File(absoluteFilePath).getName()); - - boolean isSuccess = false; - for (String hostAndPort : hostInfos) { - String host = hostAndPort.split(":")[0]; - int port = Integer.parseInt(hostAndPort.split(":")[1]); - try { - downloadFromHost(conf, absoluteFilePath, localFile.getAbsolutePath(), host, port); - downloadedFiles.add(localFile); - isSuccess = true; - break; - } catch (Exception e) { - LOG.error("download failed from {}:{}, will try another endpoint ", host, port, e); - } - } - - if(!isSuccess) { - throw new RuntimeException("File " + absoluteFilePath +" could not be downloaded from any endpoint"); - } - } - - return downloadedFiles; - } - - @Override - public short getReplicationCount(String topologyId) throws Exception { - return (short) zkClient.getChildren().forPath("/code-distributor/" + topologyId).size(); - } - - @Override - public void cleanup(String topologyid) throws IOException { - //no op. - } - - @Override - public void close(Map conf) { - zkClient.close(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/547ed491/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java b/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java deleted file mode 100644 index a0d2fc8..0000000 --- a/storm-core/src/jvm/backtype/storm/torrent/BitTorrentCodeDistributor.java +++ /dev/null @@ -1,191 +0,0 @@ -package backtype.storm.torrent; - -import backtype.storm.Config; -import backtype.storm.nimbus.ICodeDistributor; -import com.google.common.collect.Lists; -import com.google.common.primitives.Shorts; -import com.turn.ttorrent.client.Client; -import com.turn.ttorrent.client.SharedTorrent; -import com.turn.ttorrent.common.Torrent; -import com.turn.ttorrent.tracker.TrackedTorrent; -import com.turn.ttorrent.tracker.Tracker; -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileOutputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.URI; -import java.net.UnknownHostException; -import java.util.*; - -public class BitTorrentCodeDistributor implements ICodeDistributor { - private static final Logger LOG = LoggerFactory.getLogger(BitTorrentCodeDistributor.class); - private Tracker tracker; - private String hostName; - private InetSocketAddress address; - private Integer port; - protected HashMap<String, Client> clients = new HashMap<String, Client>(); - protected Double maxDownload; - protected Double maxUpload; - private Integer seedDuration; - - @Override - public void prepare(Map conf) throws Exception { - this.hostName = InetAddress.getLocalHost().getCanonicalHostName(); - this.port = (Integer) conf.get(Config.BITTORRENT_PORT); - this.maxDownload = (Double) conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE); - this.maxUpload = (Double) conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE); - this.seedDuration = (Integer) conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION); - - LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxUpload), format(this.maxDownload)); - - LOG.info("Starting bt tracker bound to hostname '{}'", hostName); - //using "0.0.0.0" to ensure we bind to all IPV4 network interfaces. - this.address = new InetSocketAddress("0.0.0.0", port); - - this.tracker = new Tracker(address); - LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl()); - this.tracker.start(); - } - - @Override - public File upload(String dirPath, String topologyId) throws Exception { - File destDir = new File(dirPath); - LOG.info("Generating torrent for directory: {}", destDir.getAbsolutePath()); - - URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce"); - LOG.info("Creating torrent with announce URL: {}", uri); - - //TODO: why does listing the directory not work? - ArrayList<File> files = new ArrayList<File>(); - files.add(new File(destDir, "stormjar.jar")); - files.add(new File(destDir, "stormconf.ser")); - files.add(new File(destDir, "stormcode.ser")); - - Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus"); - File torrentFile = new File(destDir, "storm-code-distributor.meta"); - torrent.save(new FileOutputStream(torrentFile)); - LOG.info("Saved torrent: {}", torrentFile.getAbsolutePath()); - this.tracker.announce(new TrackedTorrent(torrent)); - - Client client = new Client(getInetAddress(), new SharedTorrent(torrent, destDir.getParentFile(), true)); - this.clients.put(topologyId, client); - rebalanceRates(); - client.share(); - LOG.info("Seeding torrent..."); - - /** - * Every time on prepare we need to call tracker.announce for all torrents that - * exists in the file system, other wise the tracker will reject any peer request - * with unknown torrents. You need to bootstrap trackers. - */ - return torrentFile; - } - - @Override - public List<File> download(String topologyId, File torrentFile) throws Exception { - LOG.info("Initiating BitTorrent download."); - - File destDir = torrentFile.getParentFile(); - LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath()); - LOG.info("Saving files to directory: {}", destDir.getAbsolutePath()); - SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); - Client client = new Client(getInetAddress(), st); - this.clients.put(topologyId, client); - rebalanceRates(); - client.share(this.seedDuration); - - //TODO: Should have a timeout after which we just fail the supervisor. - if (this.seedDuration == 0) { - client.waitForCompletion(); - } else { - LOG.info("Waiting for seeding to begin..."); - while (client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - } - } - LOG.info("BitTorrent download complete."); - - /** - * This should not be needed. currently the bittorrent library uses the torrent name (which is topologyId) - * as the folder name and downloads all the files under that folder. so we need to either download - * the torrent files under /storm-local/supervisor/stormdist or nimbus/stormdist/ to ensure stormdist becomes - * the parent of all torrent files and the actual code will be downloaded under stormdist/topologyId/. - * Ideally we should be able to specify that the downloaded files must be downloaded under - * given folder only and no extra folder needs to be created. - */ - - File srcDir = new File(destDir, topologyId); - for (File file : srcDir.listFiles()) { - FileUtils.copyFileToDirectory(file, destDir); - file.delete(); - } - srcDir.delete(); - - return Lists.newArrayList(destDir.listFiles()); - } - - private InetAddress getInetAddress() throws UnknownHostException { - for (InetAddress addr : InetAddress.getAllByName(this.hostName)) { - if (!addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && !addr.isMulticastAddress()) { - return addr; - } - } - - throw new RuntimeException("No valid InetAddress could be obtained, something really wrong with network configuration."); - } - - @Override - public short getReplicationCount(String topologyId) { - Collection<TrackedTorrent> trackedTorrents = tracker.getTrackedTorrents(); - for (final TrackedTorrent trackedTorrent : trackedTorrents) { - if (trackedTorrent.getName().equals(topologyId)) { - return Shorts.checkedCast(trackedTorrent.seeders()); - } - } - - LOG.warn("No torrent found in tracker for topologyId = " + topologyId); - return 0; - } - - @Override - public void cleanup(String topologyId) { - LOG.info("Stop seeding/tracking for topology {}", topologyId); - Client client = this.clients.remove(topologyId); - if (client != null) { - Torrent torrent = client.getTorrent(); - client.stop(); - this.tracker.remove(torrent); - } - rebalanceRates(); - } - - @Override - public void close(Map conf) { - this.tracker.stop(); - } - - private synchronized void rebalanceRates() { - int clientCount = this.clients.size(); - if (clientCount > 0) { - double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount; - double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount; - LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount); - LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl)); - for (Client client : this.clients.values()) { - client.setMaxDownloadRate(maxDl); - client.setMaxUploadRate(maxUl); - } - } - } - - private static String format(double val) { - return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val); - } -}
