[
https://issues.apache.org/jira/browse/STORM-150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13969825#comment-13969825
]
ASF GitHub Bot commented on STORM-150:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/71#discussion_r11648026
--- Diff: storm-core/src/jvm/backtype/storm/torrent/SupervisorPeer.java ---
@@ -0,0 +1,67 @@
+package backtype.storm.torrent;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.NoSuchAlgorithmException;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+
+import com.turn.ttorrent.client.Client;
+import com.turn.ttorrent.client.Client.ClientState;
+import com.turn.ttorrent.client.SharedTorrent;
+
+public class SupervisorPeer extends BasePeer{
+ private static final Logger LOG =
LoggerFactory.getLogger(SupervisorPeer.class);
+
+ private Integer seedDuration;
+
+ public SupervisorPeer(Map conf){
+ LOG.info("Creating supervisor bt tracker.");
+ this.maxDownload =
(Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_DOWNLOAD_RATE);
+ this.maxUpload =
(Double)conf.get(Config.SUPERVISOR_BITTORRENT_MAX_UPLOAD_RATE);
+ this.seedDuration =
(Integer)conf.get(Config.SUPERVISOR_BITTORRENT_SEED_DURATION);
+ LOG.info("Download rates [U/D]: {}/{} kB/sec",
format(this.maxDownload), format(this.maxDownload));
+ }
+
+ public void stop(String topologyId){
+ LOG.info("Stopping bt client for topology {}", topologyId);
+ Client client = this.clients.remove(topologyId);
+ if(client != null){
+ client.stop();
+ }
+ rebalanceRates();
+ }
+
+ public void download(String torrentPath, String topologyId) throws
IOException, NoSuchAlgorithmException{
+ LOG.info("Initiating BitTorrent download.");
+ InetAddress netAddr = InetAddress.getLocalHost();
+ File torrentFile = new File(torrentPath);
+ File destDir = torrentFile.getParentFile();
+ LOG.info("Downloading with torrent file: {}",
torrentFile.getAbsolutePath());
+ LOG.info("Saving files to directory: {}",
destDir.getAbsolutePath());
+ SharedTorrent st = SharedTorrent.fromFile(torrentFile, destDir);
+
+ Client client = new Client(netAddr, st);
+ this.clients.put(topologyId, client);
+ rebalanceRates();
+ client.share(this.seedDuration);
+ if(this.seedDuration == 0){
+ client.waitForCompletion();
+ } else {
+ LOG.info("Waiting for seeding to begin...");
+ while(client.getState() != ClientState.SEEDING &&
client.getState() != ClientState.ERROR){
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
--- End diff --
Do we really want to throw away the interrupted exception? This means
someone interrupted us and wanted us to stop, but we just go on. Could we turn
it into a RuntimeException instead?
> Replace jar distribution strategy with bittorent
> ------------------------------------------------
>
> Key: STORM-150
> URL: https://issues.apache.org/jira/browse/STORM-150
> Project: Apache Storm (Incubating)
> Issue Type: Improvement
> Reporter: James Xu
> Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/435
> Consider using http://turn.github.com/ttorrent/
> ----------
> ptgoetz: I've been looking into implementing this, but have a design question
> that boils down to this:
> Should the client (storm jar command) be the initial seeder, or should we
> wait and let nimbus do the seeding?
> The benefit of doing the seeding client-side is that we would only have to
> transfer a small .torrent through the nimbus thrift API. But I can imagine
> situations where the network environment would prevent BitTorrent clients
> from connecting back to the machine that's submitting the topology. This
> would create an indefinitely "stalled submission" since none of the cluster
> nodes would be able to connect to the seeder.
> The alternative would be to use the current technique of uploading the jar to
> nimbus, and have nimbus generate and distribute the .torrent file, and
> provide the initial seed. If the cluster is properly configured, we're pretty
> much guaranteed connectivity between nimbus and supervisor nodes.
> I'm leaning toward the latter approach, but would be interested in others'
> opinions.
> ----------
> nathanmarz: @ptgoetz I think Nimbus should do the seeding. That ensures that
> when the client finishes submitting, it can disconnect/go away without having
> to worry about making the topology unlaunchable.
> ----------
> jasonjckn: @nathanmarz How does this solve the nimbuses dependency on
> reliable local disk state (as you talked about in person)?
> What happens when zookeeper is offline for 1 hour? All the workers will die,
> and nimbus will be continually restarting. The onus is still on nimbus to
> store topology jars on local disk, so that when the workers and supervisors
> reboots it can seed all this again.
> You -can- solve the local disk persistence problem with replicated state to
> the non-elected nimbuses, but that's orthogonal to a distribution strategy.
> Yes there is some replication going on in bittorrent, but it's not really a
> protocol that delivers reliable persistence of state.
> I think it's still a good feature if it gives us performant topology submit
> times even with 500 workers, which take 3 minutes for us.
> Particularly with the worker heartbeat start-up timeout of 120s, you want to
> be able to start 500 workers within 120s, or even 1500 workers within 120s,
> the current distribution strategy is not scalable in that way.
> ----------
> nathanmarz: @jasonjckn On top of the bittorrent stuff we can ensure that a
> topology is considered submitted only when the artifacts exist on at least N
> nodes. Nimbus would only be the initial seed for topology files. Also, it
> wouldn't have to only be Nimbus that acts as a seed, that work could be
> shared by the supervisors. That's less relevant in the storm-mesos world, but
> you could still fairly easily run multiple Nimbus's to get replication.
> ----------
> jasonjckn: This PR might be aided by "topology packages" #557, as it bundles
> all the state that needs to be replicated.
--
This message was sent by Atlassian JIRA
(v6.2#6252)