STORM-166: deleting the bittorrent code distributor as the ttorent library does not support Distrubted hash table for trackerless torrents.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dc24e440 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dc24e440 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dc24e440 Branch: refs/heads/0.11.x-branch Commit: dc24e440fa98f62e232e7929fd075387e2e56e4e Parents: 27d6b4c Author: Parth Brahmbhatt <[email protected]> Authored: Fri Dec 19 14:37:38 2014 -0800 Committer: Parth Brahmbhatt <[email protected]> Committed: Fri Dec 19 14:37:38 2014 -0800 ---------------------------------------------------------------------- conf/defaults.yaml | 6 - storm-core/src/jvm/backtype/storm/Config.java | 26 --- .../BitTorrentCodeDistributor.java | 191 ------------------- 3 files changed, 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/dc24e440/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index e189966..9fd9c32 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -50,11 +50,6 @@ storm.auth.simple-acl.admins: [] storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate" storm.codedistributor.class: "backtype.storm.codedistributor.LocalFileSystemCodeDistributor" -### bittorrent configuration -bittorrent.port: 6969 -bittorrent.max.upload.rate: 0.0 -bittorrent.max.download.rate: 0.0 - ### nimbus.* configs are for the master nimbus.thrift.port: 6627 nimbus.thrift.threads: 64 @@ -129,7 +124,6 @@ supervisor.monitor.frequency.secs: 3 #how frequently the supervisor heartbeats to the cluster state (for nimbus) supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true -supervisor.bittorrent.seed.duration: 0 supervisor.supervisors: [] supervisor.supervisors.commands: [] http://git-wip-us.apache.org/repos/asf/storm/blob/dc24e440/storm-core/src/jvm/backtype/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 9746565..4678177 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -1258,32 +1258,6 @@ public class Config extends HashMap<String, Object> { public static final Object STORM_CODE_DISTRIBUTOR_CLASS_SCHEMA = String.class; /** - * Which port the BitTorrent tracker should bind to. - */ - public static final String BITTORRENT_PORT = "bittorrent.port"; - public static final Object BITTORRENT_PORT_SCHEMA = Number.class; - - /** - * Max upload rate for topology torrents in kB/sec. 0.0 == unlimited. - */ - public static final String BITTORRENT_MAX_UPLOAD_RATE = "bittorrent.max.upload.rate"; - public static final Object BITTORRENT_MAX_UPLOAD_RATE_SCHEMA = Number.class; - - /** - * Max download rate for topology torrents in kB/sec. 0.0 == unlimited. - */ - public static final String BITTORRENT_MAX_DOWNLOAD_RATE = "bittorrent.max.download.rate"; - public static final Object BITTORRENT_MAX_DOWNLOAD_RATE_SCHEMA = Number.class; - - /** - * Time in seconds that a supervisor should seed after completing a topology torrent download. - * A value of 0 will disable seeding (download only). A value of -1 indicates that the supervisor - * should seed indefinitely (until the topology is killed). - */ - public static final String SUPERVISOR_BITTORRENT_SEED_DURATION = "supervisor.bittorrent.seed.duration"; - public static final Object SUPERVISOR_BITTORRENT_SEED_DURATION_SCHEMA = Number.class; - - /** * Minimum number of nimbus hosts where the code must be replicated before leader nimbus * is allowed to perform topology activation tasks like setting up heartbeats/assignments * and marking the topology as active. default is 0. http://git-wip-us.apache.org/repos/asf/storm/blob/dc24e440/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java b/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java deleted file mode 100644 index 190cc5f..0000000 --- a/storm-core/src/jvm/backtype/storm/codedistributor/BitTorrentCodeDistributor.java +++ /dev/null @@ -1,191 +0,0 @@ -package backtype.storm.codedistributor; - -import backtype.storm.Config; -import com.google.common.collect.Lists; -import com.google.common.primitives.Shorts; -import com.turn.ttorrent.client.Client; -import com.turn.ttorrent.client.SharedTorrent; -import com.turn.ttorrent.common.Torrent; -import com.turn.ttorrent.tracker.TrackedTorrent; -import com.turn.ttorrent.tracker.Tracker; -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileOutputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.URI; -import java.net.UnknownHostException; -import java.util.*; - -public class BitTorrentCodeDistributor implements ICodeDistributor { - private static final Logger LOG = LoggerFactory.getLogger(BitTorrentCodeDistributor.class); - private Tracker tracker; - private String hostName; - private InetSocketAddress address; - private Integer port; - protected HashMap<String, Client> clients = new HashMap<String, Client>(); - protected Double maxDownload; - protected Double maxUpload; - private Integer seedDuration; - - @Override - public void prepare(Map conf) throws Exception { - this.hostName = InetAddress.getLocalHost().getCanonicalHostName(); - this.port = (Integer) conf.get(Config.BITTORRENT_PORT); - this.maxDownload = (Double) conf.get(Config.BITTORRENT_MAX_DOWNLOAD_RATE); - this.maxUpload = (Double) conf.get(Config.BITTORRENT_MAX_UPLOAD_RATE); - this.seedDuration = (Integer) conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION); - - LOG.info("Download rates [U/D]: {}/{} kB/sec", format(this.maxUpload), format(this.maxDownload)); - - LOG.info("Starting bt tracker bound to hostname '{}'", hostName); - //using "0.0.0.0" to ensure we bind to all IPV4 network interfaces. - this.address = new InetSocketAddress("0.0.0.0", port); - - this.tracker = new Tracker(address); - LOG.info("Announce URL: {}", this.tracker.getAnnounceUrl()); - this.tracker.start(); - } - - @Override - public File upload(String dirPath, String topologyId) throws Exception { - File destDir = new File(dirPath); - LOG.info("Generating torrent for directory: {}", destDir.getAbsolutePath()); - - URI uri = URI.create("http://" + this.hostName + ":" + this.port + "/announce"); - LOG.info("Creating torrent with announce URL: {}", uri); - - //TODO: why does listing the directory not work? - ArrayList<File> files = new ArrayList<File>(); - files.add(new File(destDir, "stormjar.jar")); - files.add(new File(destDir, "stormconf.ser")); - files.add(new File(destDir, "stormcode.ser")); - - Torrent torrent = Torrent.create(destDir, files, uri, "storm-nimbus"); - File torrentFile = new File(destDir, "storm-code-distributor.meta"); - torrent.save(new FileOutputStream(torrentFile)); - LOG.info("Saved torrent: {}", torrentFile.getAbsolutePath()); - this.tracker.announce(new TrackedTorrent(torrent)); - - Client client = new Client(getInetAddress(), new SharedTorrent(torrent, destDir.getParentFile(), true)); - this.clients.put(topologyId, client); - rebalanceRates(); - client.share(); - LOG.info("Seeding torrent..."); - - /** - * - * TODO: Every time on prepare we need to call tracker.announce for all torrents that - * exists in the file system, other wise the tracker will reject any peer request - * with unknown torrents. You need to bootstrap trackers. - */ - return torrentFile; - } - - @Override - public List<File> download(String topologyId, File torrentFile) throws Exception { - LOG.info("Initiating BitTorrent download."); - - File destDir = torrentFile.getParentFile(); - LOG.info("Downloading with torrent file: {}", torrentFile.getAbsolutePath()); - LOG.info("Saving files to directory: {}", destDir.getAbsolutePath()); - SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir); - Client client = new Client(getInetAddress(), st); - this.clients.put(topologyId, client); - rebalanceRates(); - client.share(this.seedDuration); - - //TODO: Should have a timeout after which we just fail the supervisor. - if (this.seedDuration == 0) { - client.waitForCompletion(); - } else { - LOG.info("Waiting for seeding to begin..."); - while (client.getState() != Client.ClientState.SEEDING && client.getState() != Client.ClientState.ERROR) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - } - } - } - LOG.info("BitTorrent download complete."); - - /** - * This should not be needed. currently the bittorrent library uses the torrent name (which is topologyId) - * as the folder name and downloads all the files under that folder. so we need to either download - * the torrent files under /storm-local/supervisor/stormdist or nimbus/stormdist/ to ensure stormdist becomes - * the parent of all torrent files and the actual code will be downloaded under stormdist/topologyId/. - * Ideally we should be able to specify that the downloaded files must be downloaded under - * given folder only and no extra folder needs to be created. - */ - - File srcDir = new File(destDir, topologyId); - for (File file : srcDir.listFiles()) { - FileUtils.copyFileToDirectory(file, destDir); - file.delete(); - } - srcDir.delete(); - - return Lists.newArrayList(destDir.listFiles()); - } - - private InetAddress getInetAddress() throws UnknownHostException { - for (InetAddress addr : InetAddress.getAllByName(this.hostName)) { - if (!addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && !addr.isMulticastAddress()) { - return addr; - } - } - - throw new RuntimeException("No valid InetAddress could be obtained, something really wrong with network configuration."); - } - - @Override - public short getReplicationCount(String topologyId) { - Collection<TrackedTorrent> trackedTorrents = tracker.getTrackedTorrents(); - for (final TrackedTorrent trackedTorrent : trackedTorrents) { - if (trackedTorrent.getName().equals(topologyId)) { - return Shorts.checkedCast(trackedTorrent.seeders()); - } - } - - LOG.warn("No torrent found in tracker for topologyId = " + topologyId); - return 0; - } - - @Override - public void cleanup(String topologyId) { - LOG.info("Stop seeding/tracking for topology {}", topologyId); - Client client = this.clients.remove(topologyId); - if (client != null) { - Torrent torrent = client.getTorrent(); - client.stop(); - this.tracker.remove(torrent); - } - rebalanceRates(); - } - - @Override - public void close(Map conf) { - this.tracker.stop(); - } - - private synchronized void rebalanceRates() { - int clientCount = this.clients.size(); - if (clientCount > 0) { - double maxDl = this.maxDownload <= 0.0 ? this.maxDownload : this.maxDownload / clientCount; - double maxUl = this.maxUpload <= 0.0 ? this.maxUpload : this.maxUpload / clientCount; - LOG.info("Rebalancing bandwidth allocation based on {} topology torrents.", clientCount); - LOG.info("Per-torrent allocation [D/U]: {}/{} kB/sec.", format(maxDl), format(maxUl)); - for (Client client : this.clients.values()) { - client.setMaxDownloadRate(maxDl); - client.setMaxUploadRate(maxUl); - } - } - } - - private static String format(double val) { - return val <= 0.0 ? "UNLIMITED" : String.format("%.2f", val); - } -}
