Author: jmhodges
Date: Fri Jun 4 05:31:46 2010
New Revision: 951273
URL: http://svn.apache.org/viewvc?rev=951273&view=rev
Log:
AVRO-450. HTTP IPC for ruby. merge from trunk
Added:
avro/branches/branch-1.3/lang/ruby/test/sample_ipc_http_client.rb
- copied unchanged from r931026,
hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_client.rb
avro/branches/branch-1.3/lang/ruby/test/sample_ipc_http_server.rb
- copied unchanged from r931026,
hadoop/avro/trunk/lang/ruby/test/sample_ipc_http_server.rb
Modified:
avro/branches/branch-1.3/ (props changed)
avro/branches/branch-1.3/CHANGES.txt
avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb
Propchange: avro/branches/branch-1.3/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 4 05:31:46 2010
@@ -1,2 +1,2 @@
/avro/trunk:944035,944049
-/hadoop/avro/trunk:930458-930459,930461-930462,930599,935526,938347
+/hadoop/avro/trunk:930458-930459,930461-930462,930599,931026,935526,938347
Modified: avro/branches/branch-1.3/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.3/CHANGES.txt?rev=951273&r1=951272&r2=951273&view=diff
==============================================================================
--- avro/branches/branch-1.3/CHANGES.txt (original)
+++ avro/branches/branch-1.3/CHANGES.txt Fri Jun 4 05:31:46 2010
@@ -17,6 +17,8 @@ Avro 1.3.3 (Unreleased)
AVRO-491. Doing doubles and floats better in the ruby impl. (jmhodges)
+ AVRO-450. HTTP IPC for ruby. (jmhodges)
+
BUG FIXES
AVRO-461. Skipping primitives in the ruby side (jmhodges)
Modified: avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb?rev=951273&r1=951272&r2=951273&view=diff
==============================================================================
--- avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb (original)
+++ avro/branches/branch-1.3/lang/ruby/lib/avro/ipc.rb Fri Jun 4 05:31:46 2010
@@ -168,13 +168,13 @@ module Avro::IPC
true
when 'CLIENT'
raise AvroError.new('Handshake failure. match == CLIENT') if
send_protocol
- self.remote_protocol = handshake_response['serverProtocol']
+ self.remote_protocol =
Avro::Protocol.parse(handshake_response['serverProtocol'])
self.remote_hash = handshake_response['serverHash']
self.send_protocol = false
false
when 'NONE'
raise AvroError.new('Handshake failure. match == NONE') if
send_protocol
- self.remote_protocol = handshake_response['serverProtocol']
+ self.remote_protocol =
Avro::Protocol.parse(handshake_response['serverProtocol'])
self.remote_hash = handshake_response['serverHash']
self.send_protocol = true
false
@@ -236,11 +236,10 @@ module Avro::IPC
protocol_cache[local_hash] = local_protocol
end
- def respond(transport)
- # Called by a server to deserialize a request, compute and serialize
- # a response or error. Compare to 'handle()' in Thrift.
-
- call_request = transport.read_framed_message
+ # Called by a server to deserialize a request, compute and serialize
+ # a response or error. Compare to 'handle()' in Thrift.
+ def respond(call_request)
+ buffer_reader = StringIO.new(call_request)
buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request))
buffer_writer = StringIO.new('', 'w+')
buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
@@ -248,7 +247,7 @@ module Avro::IPC
response_metadata = {}
begin
- remote_protocol = process_handshake(transport, buffer_decoder,
buffer_encoder)
+ remote_protocol = process_handshake(buffer_decoder, buffer_encoder)
# handshake failure
unless remote_protocol
return buffer_writer.string
@@ -300,7 +299,7 @@ module Avro::IPC
buffer_writer.string
end
- def process_handshake(transport, decoder, encoder)
+ def process_handshake(decoder, encoder)
handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
handshake_response = {}
@@ -308,6 +307,7 @@ module Avro::IPC
client_hash = handshake_request['clientHash']
client_protocol = handshake_request['clientProtocol']
remote_protocol = protocol_cache[client_hash]
+
if !remote_protocol && client_protocol
remote_protocol = protocol.parse(client_protocol)
protocol_cache[client_hash] = remote_protocol
@@ -440,4 +440,95 @@ module Avro::IPC
sock.close
end
end
+
+ class ConnectionClosedError < StandardError; end
+
+ class FramedWriter
+ attr_reader :writer
+ def initialize(writer)
+ @writer = writer
+ end
+
+ def write_framed_message(message)
+ message_size = message.size
+ total_bytes_sent = 0
+ while message_size - total_bytes_sent > 0
+ if message_size - total_bytes_sent > BUFFER_SIZE
+ buffer_size = BUFFER_SIZE
+ else
+ buffer_size = message_size - total_bytes_sent
+ end
+ write_buffer(message[total_bytes_sent, buffer_size])
+ total_bytes_sent += buffer_size
+ end
+ write_buffer_size(0)
+ end
+
+ def to_s; writer.string; end
+
+ private
+ def write_buffer(chunk)
+ buffer_size = chunk.size
+ write_buffer_size(buffer_size)
+ writer << chunk
+ end
+
+ def write_buffer_size(n)
+ writer.write([n].pack('N'))
+ end
+ end
+
+ class FramedReader
+ attr_reader :reader
+
+ def initialize(reader)
+ @reader = reader
+ end
+
+ def read_framed_message
+ message = []
+ loop do
+ buffer = ""
+ buffer_size = read_buffer_size
+
+ return message.join if buffer_size == 0
+
+ while buffer.size < buffer_size
+ chunk = reader.read(buffer_size - buffer.size)
+ chunk_error?(chunk)
+ buffer << chunk
+ end
+ message << buffer
+ end
+ end
+
+ private
+ def read_buffer_size
+ header = reader.read(BUFFER_HEADER_LENGTH)
+ chunk_error?(header)
+ header.unpack('N')[0]
+ end
+
+ def chunk_error?(chunk)
+ raise ConnectionClosedError.new("Reader read 0 bytes") if chunk == ''
+ end
+ end
+
+ # Only works for clients. Sigh.
+ class HTTPTransceiver
+ attr_reader :remote_name, :host, :port
+ def initialize(host, port)
+ @host, @port = host, port
+ @remote_name = "#{host}:#{port}"
+ end
+
+ def transceive(message)
+ writer = FramedWriter.new(StringIO.new)
+ writer.write_framed_message(message)
+ resp = Net::HTTP.start(host, port) do |http|
+ http.post('/', writer.to_s, {'Content-Type' => 'avro/binary'})
+ end
+ FramedReader.new(StringIO.new(resp.body)).read_framed_message
+ end
+ end
end