Index: test/upload_test.rb
===================================================================
--- test/upload_test.rb	(revision 7136)
+++ test/upload_test.rb	(working copy)
@@ -51,7 +51,23 @@
     assert_equal 1, upload.failed
     assert_equal 1, upload.completed
   end
-
+  
+  def test_upload_error_should_include_accessor_with_host_array
+    sftp = mock_sftp
+    sftp.expects(:open).with("test.txt", @mode, 0660).yields(mock("status1", :code => Net::SFTP::Session::FX_OK), :file_handle)
+    sftp.expects(:write).with(:file_handle, "data").yields(mock("status2", :code => "bad status", :message => "bad status"))
+    session = mock("session", :sftp => sftp, :xserver => server("capistrano"))
+    upload = Capistrano::Upload.new([session], "test.txt", :data => "data", :logger => stub_everything)
+    
+    begin
+      upload.process!
+      flunk "expected an exception to be raised"
+    rescue Capistrano::UploadError => e
+      assert e.respond_to?(:hosts)
+      assert_equal %w(capistrano), e.hosts.map { |h| h.to_s }
+    end
+  end
+  
   def test_process_when_sftp_succeeds_should_raise_nothing
     sftp = mock_sftp
     sftp.expects(:open).with("test.txt", @mode, 0660).yields(mock("status1", :code => Net::SFTP::Session::FX_OK), :file_handle)
Index: test/gateway_test.rb
===================================================================
--- test/gateway_test.rb	(revision 7136)
+++ test/gateway_test.rb	(working copy)
@@ -77,6 +77,20 @@
     gateway.expects(:warn).times(2)
     assert_raises(Capistrano::ConnectionError) { gateway.connect_to(server("app1")) }
   end
+  
+  def test_connection_error_should_include_accessor_with_host_array
+    gateway = new_gateway
+    expect_connect_to(:host => "127.0.0.1").raises(RuntimeError)
+    gateway.expects(:warn).times(2)
+  
+    begin
+      gateway.connect_to(server("app1"))
+      flunk "expected an exception to be raised"
+    rescue Capistrano::ConnectionError => e
+      assert e.respond_to?(:hosts)
+      assert_equal %w(app1), e.hosts.map { |h| h.to_s }
+    end
+  end
 
   private
 
Index: test/configuration/connections_test.rb
===================================================================
--- test/configuration/connections_test.rb	(revision 7136)
+++ test/configuration/connections_test.rb	(working copy)
@@ -92,7 +92,37 @@
     @config.establish_connections_to(%w(cap1 cap2 cap3).map { |s| server(s) })
     assert %w(cap1 cap2 cap3), @config.sessions.keys.sort.map { |s| s.host }
   end
+  
+  def test_establish_connections_to_should_raise_one_connection_error_on_failure
+    Capistrano::SSH.expects(:connect).times(2).raises(Exception)
+    assert_raises(Capistrano::ConnectionError) {
+      @config.establish_connections_to(%w(cap1 cap2)).map { |s| servers(s) }
+    }
+  end
 
+  def test_connection_error_should_include_accessor_with_host_array
+    Capistrano::SSH.expects(:connect).times(2).raises(Exception)
+
+    begin
+      @config.establish_connections_to(%w(cap1 cap2)).map { |s| servers(s) }      
+      flunk "expected an exception to be raised"
+    rescue Capistrano::ConnectionError => e
+      assert e.respond_to?(:hosts)
+      assert_equal %w(cap1 cap2), e.hosts.map { |h| h.to_s }
+    end
+  end
+  
+  def test_connection_error_should_only_include_failed_hosts
+    Capistrano::SSH.expects(:connect).times(2).raises(Exception).then.returns(:success)
+
+    begin
+      @config.establish_connections_to(%w(cap1 cap2)).map { |s| servers(s) }      
+      flunk "expected an exception to be raised"
+    rescue Capistrano::ConnectionError => e
+      assert_equal %w(cap1), e.hosts.map { |h| h.to_s }
+    end
+  end
+
   def test_execute_on_servers_should_require_a_block
     assert_raises(ArgumentError) { @config.execute_on_servers }
   end
@@ -111,7 +141,7 @@
     assert_raises(ScriptError) { @config.execute_on_servers(:a => :b, :c => :d) { |list| } }
   end
 
-  def test_execute_on_servers_should_raise_an_error_if_the_current_task_has_no_matching_servers
+  def test_execute_on_servers_should_raise_an_error_if_the_current_task_has_no_matching_servers_by_default
     @config.current_task = stub("task", :fully_qualified_name => "name", :options => {})
     @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns([])
     assert_raises(ScriptError) do
@@ -120,10 +150,20 @@
       end
     end
   end
+  
+  def test_execute_on_servers_should_not_raise_an_error_if_the_current_task_has_no_matching_servers_with_on_errors_continue
+    @config.current_task = stub("task", :fully_qualified_name => "name", :options => { :on_errors => :continue })
+    @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns([])
+    assert_nothing_raised do
+      @config.execute_on_servers do
+        flunk "should not get here"
+      end
+    end
+  end
 
   def test_execute_on_servers_should_determine_server_list_from_active_task
     assert @config.sessions.empty?
-    @config.current_task = stub("task")
+    @config.current_task = stub("task", :options => {})
     @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns([server("cap1"), server("cap2"), server("cap3")])
     Capistrano::SSH.expects(:connect).times(3).returns(:success)
     @config.execute_on_servers {}
@@ -132,7 +172,7 @@
 
   def test_execute_on_servers_should_yield_server_list_to_block
     assert @config.sessions.empty?
-    @config.current_task = stub("task")
+    @config.current_task = stub("task", :options => {})
     @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns([server("cap1"), server("cap2"), server("cap3")])
     Capistrano::SSH.expects(:connect).times(3).returns(:success)
     block_called = false
@@ -148,7 +188,7 @@
 
   def test_execute_on_servers_with_once_option_should_establish_connection_to_and_yield_only_the_first_server
     assert @config.sessions.empty?
-    @config.current_task = stub("task")
+    @config.current_task = stub("task", :options => {})
     @config.expects(:find_servers_for_task).with(@config.current_task, :once => true).returns([server("cap1"), server("cap2"), server("cap3")])
     Capistrano::SSH.expects(:connect).returns(:success)
     block_called = false
@@ -159,10 +199,81 @@
     assert block_called
     assert_equal %w(cap1), @config.sessions.keys.sort.map { |s| s.host }
   end
+  
+  def test_execute_servers_should_raise_connection_error_on_failure_by_default
+    @config.current_task = stub("task", :options => {})
+    @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns([server("cap1")])
+    Capistrano::SSH.expects(:connect).raises(Exception)
+    assert_raises(Capistrano::ConnectionError) {
+      @config.execute_on_servers do
+        flunk "expected an exception to be raised"
+      end
+    }
+  end
+  
+  def test_execute_servers_should_not_raise_connection_error_on_failure_with_on_errors_continue
+    @config.current_task = stub("task", :options => { :on_errors => :continue })
+    @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns([server("cap1"), server("cap2")])
+    Capistrano::SSH.expects(:connect).times(2).raises(Exception).then.returns(:success)
+    assert_nothing_raised {
+      @config.execute_on_servers do |servers|
+        assert_equal %w(cap2), servers.map { |s| s.host }
+      end
+    }
+  end
+  
+  def test_execute_on_servers_should_not_try_to_connect_to_hosts_with_connection_errors_with_on_errors_continue
+    list = [server("cap1"), server("cap2")]
+    @config.current_task = stub("task", :options => { :on_errors => :continue })
+    @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns(list)
+    Capistrano::SSH.expects(:connect).times(2).raises(Exception).then.returns(:success)
+    @config.expects(:failed!).with(server("cap1"))
+    @config.execute_on_servers do |servers|
+      assert_equal %w(cap2), servers.map { |s| s.host }
+    end
+    @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns(list)
+    @config.execute_on_servers do |servers|
+      assert_equal %w(cap2), servers.map { |s| s.host }
+    end
+  end
+  
+  def test_execute_on_servers_should_not_try_to_connect_to_hosts_with_command_errors_with_on_errors_continue
+    cap1 = server("cap1")
+    cap2 = server("cap2")
+    @config.current_task = stub("task", :options => { :on_errors => :continue })
+    @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns([cap1, cap2])
+    Capistrano::SSH.expects(:connect).times(2).returns(:success)
+    @config.execute_on_servers do |servers|
+      error = Capistrano::CommandError.new
+      error.hosts = [cap1]
+      raise error
+    end
+    @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns([cap1, cap2])
+    @config.execute_on_servers do |servers|
+      assert_equal %w(cap2), servers.map { |s| s.host }
+    end
+  end
+  
+  def test_execute_on_servers_should_not_try_to_connect_to_hosts_with_upload_errors_with_on_errors_continue
+    cap1 = server("cap1")
+    cap2 = server("cap2")
+    @config.current_task = stub("task", :options => { :on_errors => :continue })
+    @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns([cap1, cap2])
+    Capistrano::SSH.expects(:connect).times(2).returns(:success)
+    @config.execute_on_servers do |servers|
+      error = Capistrano::UploadError.new
+      error.hosts = [cap1]
+      raise error
+    end
+    @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns([cap1, cap2])
+    @config.execute_on_servers do |servers|
+      assert_equal %w(cap2), servers.map { |s| s.host }
+    end
+  end
 
   def test_connect_should_establish_connections_to_all_servers_in_scope
     assert @config.sessions.empty?
-    @config.current_task = stub("task")
+    @config.current_task = stub("task", :options => {})
     @config.expects(:find_servers_for_task).with(@config.current_task, {}).returns([server("cap1"), server("cap2"), server("cap3")])
     Capistrano::SSH.expects(:connect).times(3).returns(:success)
     @config.connect!
@@ -171,7 +282,7 @@
 
   def test_connect_should_honor_once_option
     assert @config.sessions.empty?
-    @config.current_task = stub("task")
+    @config.current_task = stub("task", :options => {})
     @config.expects(:find_servers_for_task).with(@config.current_task, :once => true).returns([server("cap1"), server("cap2"), server("cap3")])
     Capistrano::SSH.expects(:connect).returns(:success)
     @config.connect! :once => true
Index: lib/capistrano/gateway.rb
===================================================================
--- lib/capistrano/gateway.rb	(revision 7136)
+++ lib/capistrano/gateway.rb	(working copy)
@@ -104,7 +104,13 @@
       end
 
       thread.join
-      connection or raise ConnectionError, "could not establish connection to `#{server}'"
+      if connection.nil?
+        error = ConnectionError.new("could not establish connection to `#{server}'")
+        error.hosts = [server]
+        raise error
+      end
+    
+      connection
     end
 
     private
Index: lib/capistrano/configuration/connections.rb
===================================================================
--- lib/capistrano/configuration/connections.rb	(revision 7136)
+++ lib/capistrano/configuration/connections.rb	(working copy)
@@ -26,11 +26,20 @@
       # connections to those servers that have been the targets of one or more
       # executed tasks.
       attr_reader :sessions
-
+    
       def initialize_with_connections(*args) #:nodoc:
         initialize_without_connections(*args)
         @sessions = {}
+        @failed_sessions = []
       end
+      
+      def failed!(server)
+        @failed_sessions << server
+      end
+      
+      def has_failed?(server)
+        @failed_sessions.include?(server)
+      end
 
       # Used to force connections to be made to the current task's servers.
       # Connections are normally made lazily in Capistrano--you can use this
@@ -56,8 +65,22 @@
 
       # Ensures that there are active sessions for each server in the list.
       def establish_connections_to(servers)
-        threads = Array(servers).map { |server| establish_connection_to(server) }
+        failed_servers = []
+
+        threads = Array(servers).map { |server|
+          begin
+            establish_connection_to(server)
+          rescue Exception
+            failed_servers << server
+          end
+        }
         threads.each { |t| t.join }
+
+        if not failed_servers.empty?
+          error = ConnectionError.new("connection failed for: #{failed_servers.join(',')}")
+          error.hosts = failed_servers
+          raise error
+        end
       end
 
       # Determines the set of servers within the current task's scope and
@@ -65,24 +88,45 @@
       # servers.
       def execute_on_servers(options={})
         raise ArgumentError, "expected a block" unless block_given?
-
+        
         if task = current_task
           servers = find_servers_for_task(task, options)
 
+          servers.delete_if { |s| has_failed?(s) } if task.options[:on_errors] == :continue
+          
           if servers.empty?
+            return if task.options[:on_errors] == :continue
+            
             raise ScriptError, "`#{task.fully_qualified_name}' is only run for servers matching #{task.options.inspect}, but no servers matched"
           end
+          
         else
           servers = find_servers(options)
           raise ScriptError, "no servers found to match #{options.inspect}" if servers.empty?
         end
-
+        
         servers = [servers.first] if options[:once]
         logger.trace "servers: #{servers.map { |s| s.host }.inspect}"
 
         # establish connections to those servers, as necessary
-        establish_connections_to(servers)
-        yield servers
+        begin
+          establish_connections_to(servers)
+        rescue ConnectionError => error
+          raise error unless task.options[:on_errors] == :continue
+          error.hosts.each { |h|
+            servers.delete(h)
+            failed!(h)
+          }
+        end
+
+        begin
+          yield servers
+        rescue CommandError, UploadError => error
+          raise error unless task.options[:on_errors] == :continue
+          error.hosts.each { |h|
+            failed!(h)
+          }
+        end
       end
 
       private
Index: lib/capistrano/errors.rb
===================================================================
--- lib/capistrano/errors.rb	(revision 7136)
+++ lib/capistrano/errors.rb	(working copy)
@@ -2,13 +2,16 @@
   class Error < RuntimeError; end
 
   class CaptureError < Error; end
-  class ConnectionError < Error; end
   class NoSuchTaskError < Error; end
-
+  
+  class ConnectionError < Error
+    attr_accessor :hosts
+  end
+  
   class RemoteError < Error
     attr_accessor :hosts
   end
-
+  
   class UploadError < RemoteError; end
   class CommandError < RemoteError; end
 end
