Index: test/configuration/connections_test.rb
===================================================================
--- test/configuration/connections_test.rb	(revision 8182)
+++ test/configuration/connections_test.rb	(working copy)
@@ -217,7 +217,6 @@
     @config.current_task = mock_task(:on_error => :continue)
     @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns(list)
     Capistrano::SSH.expects(:connect).times(2).raises(Exception).then.returns(:success)
-    @config.expects(:failed!).with(server("cap1"))
     @config.execute_on_servers do |servers|
       assert_equal %w(cap2), servers.map { |s| s.host }
     end
@@ -260,7 +259,7 @@
       assert_equal %w(cap2), servers.map { |s| s.host }
     end
   end
-
+  
   def test_connect_should_establish_connections_to_all_servers_in_scope
     assert @config.sessions.empty?
     @config.current_task = mock_task
@@ -269,7 +268,25 @@
     @config.connect!
     assert_equal %w(cap1 cap2 cap3), @config.sessions.keys.sort.map { |s| s.host }
   end
-
+  
+  def test_execute_on_servers_should_only_run_on_max_hosts_hosts_at_once
+    cap1 = server("cap1")
+    cap2 = server("cap2")
+    connection1 = mock()
+    connection2 = mock()
+    connection1.expects(:close)
+    connection2.expects(:close)
+    @config.current_task = mock_task
+    @config.expects(:find_servers_for_task).with(@config.current_task, {:max_hosts => 1}).returns([cap1, cap2])
+    Capistrano::SSH.expects(:connect).times(2).returns(connection1).then.returns(connection2)
+    block_called = 0
+    @config.execute_on_servers(:max_hosts => 1) do |servers|
+      block_called += 1
+      assert_equal 1, servers.size
+    end
+    assert_equal 2, block_called
+  end
+  
   def test_connect_should_honor_once_option
     assert @config.sessions.empty?
     @config.current_task = mock_task
Index: lib/capistrano/configuration/connections.rb
===================================================================
--- lib/capistrano/configuration/connections.rb	(revision 8182)
+++ lib/capistrano/configuration/connections.rb	(working copy)
@@ -1,3 +1,6 @@
+
+require 'ruby-debug'
+require 'enumerator'
 require 'capistrano/gateway'
 require 'capistrano/ssh'
 
@@ -120,22 +123,34 @@
         servers = [servers.first] if options[:once]
         logger.trace "servers: #{servers.map { |s| s.host }.inspect}"
 
-        # establish connections to those servers, as necessary
-        begin
-          establish_connections_to(servers)
-        rescue ConnectionError => error
-          raise error unless task && task.continue_on_error?
-          error.hosts.each do |h|
-            servers.delete(h)
-            failed!(h)
+        # establish connections to those servers in groups of max_hosts, as necessary
+        max_hosts = options.fetch(:max_hosts, servers.size)
+        servers.each_slice(max_hosts) do |servers_slice|
+          begin
+            establish_connections_to(servers_slice)
+          rescue ConnectionError => error
+            raise error unless task && task.continue_on_error?
+            error.hosts.each do |h|
+              servers_slice.delete(h)
+              failed!(h)
+            end
           end
-        end
 
-        begin
-          yield servers
-        rescue RemoteError => error
-          raise error unless task && task.continue_on_error?
-          error.hosts.each { |h| failed!(h) }
+          begin
+            yield servers_slice
+          rescue RemoteError => error
+            raise error unless task && task.continue_on_error?
+            error.hosts.each { |h| failed!(h) }
+          end
+
+          if max_hosts < servers.size
+            # Nasty hack? Force the sessions closed so that we don't tie up more
+            # than max_hosts outbound connections
+            servers_slice.each do |server|
+              @sessions[server].close
+              @sessions.delete(server)
+            end
+          end
         end
       end
 
