PROTON-1537: [ruby] idle time-out conversion Idiomatic ruby expresses intervals in seconds, which can be Float or Rational for sub-second intervals.
Use seconds in the :idle_timeout connection options and Connection#idle_timeout. Transport remains in milliseconds but is no longer a user-facing API. Conversion is done using Rational to preserve millisecond accuracy, although the user is free to use Float if rounding is not a concern. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4678e74b Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4678e74b Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4678e74b Branch: refs/heads/go1 Commit: 4678e74bce48ea7b2457ba801128365c7be8b5bd Parents: cf4a3f6 Author: Alan Conway <[email protected]> Authored: Mon Dec 11 11:24:43 2017 -0500 Committer: Alan Conway <[email protected]> Committed: Wed Dec 13 13:16:48 2017 -0500 ---------------------------------------------------------------------- proton-c/bindings/ruby/lib/core/connection.rb | 16 +++++ .../bindings/ruby/lib/core/connection_driver.rb | 20 ++++--- proton-c/bindings/ruby/lib/core/transport.rb | 18 +++++- proton-c/bindings/ruby/tests/test_adapter.rb | 3 +- .../ruby/tests/test_connection_driver.rb | 62 ++++++++++---------- proton-c/bindings/ruby/tests/test_tools.rb | 33 +++++++++-- 6 files changed, 103 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4678e74b/proton-c/bindings/ruby/lib/core/connection.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb index ed82dc8..55873dd 100644 --- a/proton-c/bindings/ruby/lib/core/connection.rb +++ b/proton-c/bindings/ruby/lib/core/connection.rb @@ -133,6 +133,17 @@ module Qpid::Proton Codec::Data.from_object(Cproton.pn_connection_properties(@impl), opts[:properties]) end + # Idle-timeout advertised by the remote peer, in seconds. + # Set by {Connection#open} with the +:idle_timeout+ option. + # @return [Numeric] Idle-timeout advertised by the remote peer, in seconds. + # @return [nil] if The peer does not advertise an idle time-out + # @option :idle_timeout (see {#open}) + def idle_timeout() + if transport && (t = transport.remote_idle_timeout) + Rational(t, 1000) # More precise than Float + end + end + # @private Generate a unique link name, internal use only. def link_name() @link_prefix + "/" + (@link_count += 1).to_s(16) @@ -259,6 +270,11 @@ module Qpid::Proton Cproton.pn_error_code(Cproton.pn_connection_error(@impl)) end + # @private Generate a unique link name, internal use only. + def link_name() + @link_prefix + "/" + (@link_count += 1).to_s(16) + end + protected def _local_condition http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4678e74b/proton-c/bindings/ruby/lib/core/connection_driver.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb index 29bd299..b796d4d 100644 --- a/proton-c/bindings/ruby/lib/core/connection_driver.rb +++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb @@ -97,7 +97,9 @@ module Qpid::Proton # Non-blocking write to {#io} # IO errors are returned as transport errors by {#event}, not raised def write - n = @io.write_nonblock(Cproton.pn_connection_driver_write_buffer(@impl)) + data = Cproton.pn_connection_driver_write_buffer(@impl) + return unless data && data.size > 0 + n = @io.write_nonblock(data) Cproton.pn_connection_driver_write_done(@impl, n) if n > 0 rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR # Try again later. @@ -175,8 +177,13 @@ module Qpid::Proton attr_reader :handler # Dispatch all events available from {#event} to {#handler} - def dispatch() each_event do |e| - e.dispatch self # See private on_transport_ methods below + def dispatch() + each_event do |e| + case e.method # Events that affect the driver + when :on_transport_tail_closed then close_read + when :on_transport_head_closed then close_write + when :on_transport_authenticated then connection.user = transport.user + end e.dispatch @adapter end end @@ -190,13 +197,8 @@ module Qpid::Proton next_tick = tick(now) dispatch # Generate data for write write - dispatch # Consume all events + dispatch # Consume events generated by write return next_tick end - - private - def on_transport_tail_closed(event) close_read; end - def on_transport_head_closed(event) close_write; end - def on_transport_authenticated(event) connection.user = transport.user; end end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4678e74b/proton-c/bindings/ruby/lib/core/transport.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/transport.rb b/proton-c/bindings/ruby/lib/core/transport.rb index c1593a0..f840f3f 100644 --- a/proton-c/bindings/ruby/lib/core/transport.rb +++ b/proton-c/bindings/ruby/lib/core/transport.rb @@ -103,11 +103,19 @@ module Qpid::Proton # @!attribute idle_timeout # - # @return [Integer] The idle timeout. + # @deprecated use {Connection#open} with the +:idle_timeout+ option to set + # the timeout, and {Connection#idle_timeout} to query the remote timeout. + # + # The Connection timeout values are in *seconds* and are automatically + # converted. + # + # @return [Integer] The idle timeout in *milliseconds*. # proton_set_get :idle_timeout - # @!attribute [r] remote_idle_timeout + # @!attribute [r] remote_idle_timeout in milliseconds + # + # @deprecated Use {Connection#idle_timeout} to query the remote timeout. # # @return [Integer] The idle timeout for the transport's remote peer. # @@ -397,7 +405,11 @@ module Qpid::Proton end self.channel_max= opts[:channel_max] if opts.include? :channel_max self.max_frame_size= opts[:max_frame_size] if opts.include? :max_frame_size - self.idle_timeout= opts[:idle_timeout] if opts.include? :idle_timeout + # NOTE: The idle_timeout option is in Numeric *seconds*, can be Integer, Float or Rational. + # This is consistent with idiomatic ruby. + # The transport #idle_timeout property is in *milliseconds* passed direct to C. + # Direct use of the transport is deprecated. + self.idle_timeout= (opts[:idle_timeout]*1000).round if opts.include? :idle_timeout end end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4678e74b/proton-c/bindings/ruby/tests/test_adapter.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/tests/test_adapter.rb b/proton-c/bindings/ruby/tests/test_adapter.rb index 50f46c3..62ef109 100644 --- a/proton-c/bindings/ruby/tests/test_adapter.rb +++ b/proton-c/bindings/ruby/tests/test_adapter.rb @@ -77,7 +77,8 @@ class TestOldHandler < Minitest::Test assert_equal [:on_session_opening], @sh.names assert_equal [], @ch.names clear - @d.client.connection.close; @d.run + @d.client.connection.close; + 3.times { @d.process } assert_equal [:on_connection_closing], @sh.names assert_equal [], @ch.names @d.server.connection.close; @d.run http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4678e74b/proton-c/bindings/ruby/tests/test_connection_driver.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/tests/test_connection_driver.rb b/proton-c/bindings/ruby/tests/test_connection_driver.rb index 3617dce..a9982b5 100644 --- a/proton-c/bindings/ruby/tests/test_connection_driver.rb +++ b/proton-c/bindings/ruby/tests/test_connection_driver.rb @@ -21,10 +21,6 @@ include Qpid::Proton class HandlerDriverTest < Minitest::Test - def setup - @sockets = Socket.pair(:LOCAL, :STREAM, 0) - end - def test_send_recv send_class = Class.new(MessagingHandler) do attr_reader :accepted @@ -38,35 +34,39 @@ class HandlerDriverTest < Minitest::Test def on_message(event) @message = event.message; event.connection.close; end end - sender = HandlerDriver.new(@sockets[0], send_class.new) - sender.connection.open(:container_id => "sender"); - sender.connection.open_sender() - receiver = HandlerDriver.new(@sockets[1], recv_class.new) - drivers = [sender, receiver] - - until drivers.all? { |d| d.finished? } - rd = drivers.select {|d| d.can_read? } - wr = drivers.select {|d| d.can_write? } - IO.select(rd, wr) - drivers.each do |d| - d.process - end - end - assert_equal(receiver.handler.message.body, "foo") - assert(sender.handler.accepted) + d = DriverPair.new(send_class.new, recv_class.new) + d.client.connection.open(:container_id => "sender"); + d.client.connection.open_sender() + d.run + assert_equal(d.server.handler.message.body, "foo") + assert(d.client.handler.accepted) end def test_idle - drivers = [HandlerDriver.new(@sockets[0], nil), HandlerDriver.new(@sockets[1], nil)] - opts = {:idle_timeout=>10} - drivers[0].transport.apply(opts) - assert_equal 10, drivers[0].transport.idle_timeout - drivers[0].connection.open(opts) - drivers[1].transport.set_server - now = Time.now - drivers.each { |d| d.process(now) } until drivers[0].connection.open? - assert_equal(10, drivers[0].transport.idle_timeout) - assert_equal(5, drivers[1].transport.remote_idle_timeout) # proton changes the value - assert_in_delta(10, (drivers[0].tick(now) - now)*1000, 1) + + d = DriverPair.new(UnhandledHandler.new, UnhandledHandler.new) + ms = 444 + secs = Rational(ms, 1000) # Use rationals to keep it accurate + opts = {:idle_timeout => secs} + d.client.transport.apply(opts) + assert_equal(ms, d.client.transport.idle_timeout) # Transport converts to ms + d.server.transport.set_server + d.client.connection.open(opts) + + start = Time.at(1) # Dummy timeline + tick = d.run start # Process all IO events + assert_equal(secs/4, tick - start) + assert_equal [:on_connection_opened], d.client.handler.calls + assert_equal [:on_connection_opening, :on_connection_opened], d.server.handler.calls + assert_equal (ms), d.client.transport.idle_timeout + assert_equal (ms/2), d.server.transport.remote_idle_timeout # proton changes the value + assert_equal (secs/2), d.server.connection.idle_timeout + + # Now update the time till we get connections closing + d.each { |x| x.handler.calls.clear } + d.run(start + secs - 0.001) # Should nothing, timeout not reached + assert_equal [[],[]], d.collect { |x| x.handler.calls } + d.run(start + secs*2) # After 2x timeout, connections should close + assert_equal [[:on_transport_error, :on_transport_closed], [:on_connection_error, :on_connection_closed, :on_transport_closed]], d.collect { |x| x.handler.calls } end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4678e74b/proton-c/bindings/ruby/tests/test_tools.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/tests/test_tools.rb b/proton-c/bindings/ruby/tests/test_tools.rb index bc9c1d9..ad4a7cb 100644 --- a/proton-c/bindings/ruby/tests/test_tools.rb +++ b/proton-c/bindings/ruby/tests/test_tools.rb @@ -108,13 +108,20 @@ class DriverPair < Array alias client first alias server last - # Run till there is nothing to do - def run - begin - each { |d| d.process } - end while (IO.select(self, [], [], 0) rescue nil) + # Process each driver once, return time of next timed event + def process(now = Time.now, max_time=nil) + t = collect { |d| d.process(now) }.compact.min + t = max_time if max_time && t > max_time + t end + # Run till there is no IO activity - does not handle waiting for timed events + # but does pass +now+ to process and returns the min returned timed event time + def run(now=Time.now) + t = process(now) # Generate initial IO activity and get initial next-time + t = process(now, t) while (IO.select(self, [], [], 0) rescue nil) + t = process(now, t) # Final gulp to finish off events + end end # Container that listens on a random port for a single connection @@ -129,3 +136,19 @@ class TestContainer < Container def port() @server.addr[1]; end def url() "amqp://:#{port}"; end end + +# Raw handler to record on_xxx calls via on_unhandled. +# Handy as a base for raw test handlers +class UnhandledHandler + def initialize() @calls =[]; end + def on_unhandled(name, args) @calls << name; end + attr_reader :calls + + # Ruby mechanics to capture on_xxx calls + + def method_missing(name, *args) + if respond_to_missing?(name) then on_unhandled(name, *args) else super end; + end + def respond_to_missing?(name, private=false); (/^on_/ =~ name); end + def respond_to?(name, all=false) super || respond_to_missing?(name); end # For ruby < 1.9.2 +end --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
