http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/protocol/mod.rs ---------------------------------------------------------------------- diff --git a/lib/rs/src/protocol/mod.rs b/lib/rs/src/protocol/mod.rs new file mode 100644 index 0000000..b230d63 --- /dev/null +++ b/lib/rs/src/protocol/mod.rs @@ -0,0 +1,709 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Types used to send and receive primitives between a Thrift client and server. +//! +//! # Examples +//! +//! Create and use a `TOutputProtocol`. +//! +//! ```no_run +//! use std::cell::RefCell; +//! use std::rc::Rc; +//! use thrift::protocol::{TBinaryOutputProtocol, TFieldIdentifier, TOutputProtocol, TType}; +//! use thrift::transport::{TTcpTransport, TTransport}; +//! +//! // create the I/O channel +//! let mut transport = TTcpTransport::new(); +//! transport.open("127.0.0.1:9090").unwrap(); +//! let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>)); +//! +//! // create the protocol to encode types into bytes +//! let mut o_prot = TBinaryOutputProtocol::new(transport.clone(), true); +//! +//! // write types +//! o_prot.write_field_begin(&TFieldIdentifier::new("string_thing", TType::String, 1)).unwrap(); +//! o_prot.write_string("foo").unwrap(); +//! o_prot.write_field_end().unwrap(); +//! ``` +//! +//! Create and use a `TInputProtocol`. +//! +//! ```no_run +//! use std::cell::RefCell; +//! use std::rc::Rc; +//! use thrift::protocol::{TBinaryInputProtocol, TInputProtocol}; +//! use thrift::transport::{TTcpTransport, TTransport}; +//! +//! // create the I/O channel +//! let mut transport = TTcpTransport::new(); +//! transport.open("127.0.0.1:9090").unwrap(); +//! let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>)); +//! +//! // create the protocol to decode bytes into types +//! let mut i_prot = TBinaryInputProtocol::new(transport.clone(), true); +//! +//! // read types from the wire +//! let field_identifier = i_prot.read_field_begin().unwrap(); +//! let field_contents = i_prot.read_string().unwrap(); +//! let field_end = i_prot.read_field_end().unwrap(); +//! ``` + +use std::cell::RefCell; +use std::fmt; +use std::fmt::{Display, Formatter}; +use std::convert::From; +use std::rc::Rc; +use try_from::TryFrom; + +use ::{ProtocolError, ProtocolErrorKind}; +use ::transport::TTransport; + +mod binary; +mod compact; +mod multiplexed; +mod stored; + +pub use self::binary::{TBinaryInputProtocol, TBinaryInputProtocolFactory, TBinaryOutputProtocol, + TBinaryOutputProtocolFactory}; +pub use self::compact::{TCompactInputProtocol, TCompactInputProtocolFactory, + TCompactOutputProtocol, TCompactOutputProtocolFactory}; +pub use self::multiplexed::TMultiplexedOutputProtocol; +pub use self::stored::TStoredInputProtocol; + +// Default maximum depth to which `TInputProtocol::skip` will skip a Thrift +// field. A default is necessary because Thrift structs or collections may +// contain nested structs and collections, which could result in indefinite +// recursion. +const MAXIMUM_SKIP_DEPTH: i8 = 64; + +/// Converts a stream of bytes into Thrift identifiers, primitives, +/// containers, or structs. +/// +/// This trait does not deal with higher-level Thrift concepts like structs or +/// exceptions - only with primitives and message or container boundaries. Once +/// bytes are read they are deserialized and an identifier (for example +/// `TMessageIdentifier`) or a primitive is returned. +/// +/// All methods return a `thrift::Result`. If an `Err` is returned the protocol +/// instance and its underlying transport should be terminated. +/// +/// # Examples +/// +/// Create and use a `TInputProtocol` +/// +/// ```no_run +/// use std::cell::RefCell; +/// use std::rc::Rc; +/// use thrift::protocol::{TBinaryInputProtocol, TInputProtocol}; +/// use thrift::transport::{TTcpTransport, TTransport}; +/// +/// let mut transport = TTcpTransport::new(); +/// transport.open("127.0.0.1:9090").unwrap(); +/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>)); +/// +/// let mut i_prot = TBinaryInputProtocol::new(transport.clone(), true); +/// +/// let field_identifier = i_prot.read_field_begin().unwrap(); +/// let field_contents = i_prot.read_string().unwrap(); +/// let field_end = i_prot.read_field_end().unwrap(); +/// ``` +pub trait TInputProtocol { + /// Read the beginning of a Thrift message. + fn read_message_begin(&mut self) -> ::Result<TMessageIdentifier>; + /// Read the end of a Thrift message. + fn read_message_end(&mut self) -> ::Result<()>; + /// Read the beginning of a Thrift struct. + fn read_struct_begin(&mut self) -> ::Result<Option<TStructIdentifier>>; + /// Read the end of a Thrift struct. + fn read_struct_end(&mut self) -> ::Result<()>; + /// Read the beginning of a Thrift struct field. + fn read_field_begin(&mut self) -> ::Result<TFieldIdentifier>; + /// Read the end of a Thrift struct field. + fn read_field_end(&mut self) -> ::Result<()>; + /// Read a bool. + fn read_bool(&mut self) -> ::Result<bool>; + /// Read a fixed-length byte array. + fn read_bytes(&mut self) -> ::Result<Vec<u8>>; + /// Read a word. + fn read_i8(&mut self) -> ::Result<i8>; + /// Read a 16-bit signed integer. + fn read_i16(&mut self) -> ::Result<i16>; + /// Read a 32-bit signed integer. + fn read_i32(&mut self) -> ::Result<i32>; + /// Read a 64-bit signed integer. + fn read_i64(&mut self) -> ::Result<i64>; + /// Read a 64-bit float. + fn read_double(&mut self) -> ::Result<f64>; + /// Read a fixed-length string (not null terminated). + fn read_string(&mut self) -> ::Result<String>; + /// Read the beginning of a list. + fn read_list_begin(&mut self) -> ::Result<TListIdentifier>; + /// Read the end of a list. + fn read_list_end(&mut self) -> ::Result<()>; + /// Read the beginning of a set. + fn read_set_begin(&mut self) -> ::Result<TSetIdentifier>; + /// Read the end of a set. + fn read_set_end(&mut self) -> ::Result<()>; + /// Read the beginning of a map. + fn read_map_begin(&mut self) -> ::Result<TMapIdentifier>; + /// Read the end of a map. + fn read_map_end(&mut self) -> ::Result<()>; + /// Skip a field with type `field_type` recursively until the default + /// maximum skip depth is reached. + fn skip(&mut self, field_type: TType) -> ::Result<()> { + self.skip_till_depth(field_type, MAXIMUM_SKIP_DEPTH) + } + /// Skip a field with type `field_type` recursively up to `depth` levels. + fn skip_till_depth(&mut self, field_type: TType, depth: i8) -> ::Result<()> { + if depth == 0 { + return Err(::Error::Protocol(ProtocolError { + kind: ProtocolErrorKind::DepthLimit, + message: format!("cannot parse past {:?}", field_type), + })); + } + + match field_type { + TType::Bool => self.read_bool().map(|_| ()), + TType::I08 => self.read_i8().map(|_| ()), + TType::I16 => self.read_i16().map(|_| ()), + TType::I32 => self.read_i32().map(|_| ()), + TType::I64 => self.read_i64().map(|_| ()), + TType::Double => self.read_double().map(|_| ()), + TType::String => self.read_string().map(|_| ()), + TType::Struct => { + self.read_struct_begin()?; + loop { + let field_ident = self.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + self.skip_till_depth(field_ident.field_type, depth - 1)?; + } + self.read_struct_end() + } + TType::List => { + let list_ident = self.read_list_begin()?; + for _ in 0..list_ident.size { + self.skip_till_depth(list_ident.element_type, depth - 1)?; + } + self.read_list_end() + } + TType::Set => { + let set_ident = self.read_set_begin()?; + for _ in 0..set_ident.size { + self.skip_till_depth(set_ident.element_type, depth - 1)?; + } + self.read_set_end() + } + TType::Map => { + let map_ident = self.read_map_begin()?; + for _ in 0..map_ident.size { + let key_type = map_ident.key_type + .expect("non-zero sized map should contain key type"); + let val_type = map_ident.value_type + .expect("non-zero sized map should contain value type"); + self.skip_till_depth(key_type, depth - 1)?; + self.skip_till_depth(val_type, depth - 1)?; + } + self.read_map_end() + } + u => { + Err(::Error::Protocol(ProtocolError { + kind: ProtocolErrorKind::Unknown, + message: format!("cannot skip field type {:?}", &u), + })) + } + } + } + + // utility (DO NOT USE IN GENERATED CODE!!!!) + // + + /// Read an unsigned byte. + /// + /// This method should **never** be used in generated code. + fn read_byte(&mut self) -> ::Result<u8>; +} + +/// Converts Thrift identifiers, primitives, containers or structs into a +/// stream of bytes. +/// +/// This trait does not deal with higher-level Thrift concepts like structs or +/// exceptions - only with primitives and message or container boundaries. +/// Write methods take an identifier (for example, `TMessageIdentifier`) or a +/// primitive. Any or all of the fields in an identifier may be omitted when +/// writing to the transport. Write methods may even be noops. All of this is +/// transparent to the caller; as long as a matching `TInputProtocol` +/// implementation is used, received messages will be decoded correctly. +/// +/// All methods return a `thrift::Result`. If an `Err` is returned the protocol +/// instance and its underlying transport should be terminated. +/// +/// # Examples +/// +/// Create and use a `TOutputProtocol` +/// +/// ```no_run +/// use std::cell::RefCell; +/// use std::rc::Rc; +/// use thrift::protocol::{TBinaryOutputProtocol, TFieldIdentifier, TOutputProtocol, TType}; +/// use thrift::transport::{TTcpTransport, TTransport}; +/// +/// let mut transport = TTcpTransport::new(); +/// transport.open("127.0.0.1:9090").unwrap(); +/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>)); +/// +/// let mut o_prot = TBinaryOutputProtocol::new(transport.clone(), true); +/// +/// o_prot.write_field_begin(&TFieldIdentifier::new("string_thing", TType::String, 1)).unwrap(); +/// o_prot.write_string("foo").unwrap(); +/// o_prot.write_field_end().unwrap(); +/// ``` +pub trait TOutputProtocol { + /// Write the beginning of a Thrift message. + fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> ::Result<()>; + /// Write the end of a Thrift message. + fn write_message_end(&mut self) -> ::Result<()>; + /// Write the beginning of a Thrift struct. + fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> ::Result<()>; + /// Write the end of a Thrift struct. + fn write_struct_end(&mut self) -> ::Result<()>; + /// Write the beginning of a Thrift field. + fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> ::Result<()>; + /// Write the end of a Thrift field. + fn write_field_end(&mut self) -> ::Result<()>; + /// Write a STOP field indicating that all the fields in a struct have been + /// written. + fn write_field_stop(&mut self) -> ::Result<()>; + /// Write a bool. + fn write_bool(&mut self, b: bool) -> ::Result<()>; + /// Write a fixed-length byte array. + fn write_bytes(&mut self, b: &[u8]) -> ::Result<()>; + /// Write an 8-bit signed integer. + fn write_i8(&mut self, i: i8) -> ::Result<()>; + /// Write a 16-bit signed integer. + fn write_i16(&mut self, i: i16) -> ::Result<()>; + /// Write a 32-bit signed integer. + fn write_i32(&mut self, i: i32) -> ::Result<()>; + /// Write a 64-bit signed integer. + fn write_i64(&mut self, i: i64) -> ::Result<()>; + /// Write a 64-bit float. + fn write_double(&mut self, d: f64) -> ::Result<()>; + /// Write a fixed-length string. + fn write_string(&mut self, s: &str) -> ::Result<()>; + /// Write the beginning of a list. + fn write_list_begin(&mut self, identifier: &TListIdentifier) -> ::Result<()>; + /// Write the end of a list. + fn write_list_end(&mut self) -> ::Result<()>; + /// Write the beginning of a set. + fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> ::Result<()>; + /// Write the end of a set. + fn write_set_end(&mut self) -> ::Result<()>; + /// Write the beginning of a map. + fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> ::Result<()>; + /// Write the end of a map. + fn write_map_end(&mut self) -> ::Result<()>; + /// Flush buffered bytes to the underlying transport. + fn flush(&mut self) -> ::Result<()>; + + // utility (DO NOT USE IN GENERATED CODE!!!!) + // + + /// Write an unsigned byte. + /// + /// This method should **never** be used in generated code. + fn write_byte(&mut self, b: u8) -> ::Result<()>; // FIXME: REMOVE +} + +/// Helper type used by servers to create `TInputProtocol` instances for +/// accepted client connections. +/// +/// # Examples +/// +/// Create a `TInputProtocolFactory` and use it to create a `TInputProtocol`. +/// +/// ```no_run +/// use std::cell::RefCell; +/// use std::rc::Rc; +/// use thrift::protocol::{TBinaryInputProtocolFactory, TInputProtocolFactory}; +/// use thrift::transport::{TTcpTransport, TTransport}; +/// +/// let mut transport = TTcpTransport::new(); +/// transport.open("127.0.0.1:9090").unwrap(); +/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>)); +/// +/// let mut i_proto_factory = TBinaryInputProtocolFactory::new(); +/// let i_prot = i_proto_factory.create(transport); +/// ``` +pub trait TInputProtocolFactory { + /// Create a `TInputProtocol` that reads bytes from `transport`. + fn create(&mut self, transport: Rc<RefCell<Box<TTransport>>>) -> Box<TInputProtocol>; +} + +/// Helper type used by servers to create `TOutputProtocol` instances for +/// accepted client connections. +/// +/// # Examples +/// +/// Create a `TOutputProtocolFactory` and use it to create a `TOutputProtocol`. +/// +/// ```no_run +/// use std::cell::RefCell; +/// use std::rc::Rc; +/// use thrift::protocol::{TBinaryOutputProtocolFactory, TOutputProtocolFactory}; +/// use thrift::transport::{TTcpTransport, TTransport}; +/// +/// let mut transport = TTcpTransport::new(); +/// transport.open("127.0.0.1:9090").unwrap(); +/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>)); +/// +/// let mut o_proto_factory = TBinaryOutputProtocolFactory::new(); +/// let o_prot = o_proto_factory.create(transport); +/// ``` +pub trait TOutputProtocolFactory { + /// Create a `TOutputProtocol` that writes bytes to `transport`. + fn create(&mut self, transport: Rc<RefCell<Box<TTransport>>>) -> Box<TOutputProtocol>; +} + +/// Thrift message identifier. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct TMessageIdentifier { + /// Service call the message is associated with. + pub name: String, + /// Message type. + pub message_type: TMessageType, + /// Ordered sequence number identifying the message. + pub sequence_number: i32, +} + +impl TMessageIdentifier { + /// Create a `TMessageIdentifier` for a Thrift service-call named `name` + /// with message type `message_type` and sequence number `sequence_number`. + pub fn new<S: Into<String>>(name: S, + message_type: TMessageType, + sequence_number: i32) + -> TMessageIdentifier { + TMessageIdentifier { + name: name.into(), + message_type: message_type, + sequence_number: sequence_number, + } + } +} + +/// Thrift struct identifier. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct TStructIdentifier { + /// Name of the encoded Thrift struct. + pub name: String, +} + +impl TStructIdentifier { + /// Create a `TStructIdentifier` for a struct named `name`. + pub fn new<S: Into<String>>(name: S) -> TStructIdentifier { + TStructIdentifier { name: name.into() } + } +} + +/// Thrift field identifier. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct TFieldIdentifier { + /// Name of the Thrift field. + /// + /// `None` if it's not sent over the wire. + pub name: Option<String>, + /// Field type. + /// + /// This may be a primitive, container, or a struct. + pub field_type: TType, + /// Thrift field id. + /// + /// `None` only if `field_type` is `TType::Stop`. + pub id: Option<i16>, +} + +impl TFieldIdentifier { + /// Create a `TFieldIdentifier` for a field named `name` with type + /// `field_type` and field id `id`. + /// + /// `id` should be `None` if `field_type` is `TType::Stop`. + pub fn new<N, S, I>(name: N, field_type: TType, id: I) -> TFieldIdentifier + where N: Into<Option<S>>, + S: Into<String>, + I: Into<Option<i16>> + { + TFieldIdentifier { + name: name.into().map(|n| n.into()), + field_type: field_type, + id: id.into(), + } + } +} + +/// Thrift list identifier. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct TListIdentifier { + /// Type of the elements in the list. + pub element_type: TType, + /// Number of elements in the list. + pub size: i32, +} + +impl TListIdentifier { + /// Create a `TListIdentifier` for a list with `size` elements of type + /// `element_type`. + pub fn new(element_type: TType, size: i32) -> TListIdentifier { + TListIdentifier { + element_type: element_type, + size: size, + } + } +} + +/// Thrift set identifier. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct TSetIdentifier { + /// Type of the elements in the set. + pub element_type: TType, + /// Number of elements in the set. + pub size: i32, +} + +impl TSetIdentifier { + /// Create a `TSetIdentifier` for a set with `size` elements of type + /// `element_type`. + pub fn new(element_type: TType, size: i32) -> TSetIdentifier { + TSetIdentifier { + element_type: element_type, + size: size, + } + } +} + +/// Thrift map identifier. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct TMapIdentifier { + /// Map key type. + pub key_type: Option<TType>, + /// Map value type. + pub value_type: Option<TType>, + /// Number of entries in the map. + pub size: i32, +} + +impl TMapIdentifier { + /// Create a `TMapIdentifier` for a map with `size` entries of type + /// `key_type -> value_type`. + pub fn new<K, V>(key_type: K, value_type: V, size: i32) -> TMapIdentifier + where K: Into<Option<TType>>, + V: Into<Option<TType>> + { + TMapIdentifier { + key_type: key_type.into(), + value_type: value_type.into(), + size: size, + } + } +} + +/// Thrift message types. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum TMessageType { + /// Service-call request. + Call, + /// Service-call response. + Reply, + /// Unexpected error in the remote service. + Exception, + /// One-way service-call request (no response is expected). + OneWay, +} + +impl Display for TMessageType { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match *self { + TMessageType::Call => write!(f, "Call"), + TMessageType::Reply => write!(f, "Reply"), + TMessageType::Exception => write!(f, "Exception"), + TMessageType::OneWay => write!(f, "OneWay"), + } + } +} + +impl From<TMessageType> for u8 { + fn from(message_type: TMessageType) -> Self { + match message_type { + TMessageType::Call => 0x01, + TMessageType::Reply => 0x02, + TMessageType::Exception => 0x03, + TMessageType::OneWay => 0x04, + } + } +} + +impl TryFrom<u8> for TMessageType { + type Err = ::Error; + fn try_from(b: u8) -> ::Result<Self> { + match b { + 0x01 => Ok(TMessageType::Call), + 0x02 => Ok(TMessageType::Reply), + 0x03 => Ok(TMessageType::Exception), + 0x04 => Ok(TMessageType::OneWay), + unkn => { + Err(::Error::Protocol(ProtocolError { + kind: ProtocolErrorKind::InvalidData, + message: format!("cannot convert {} to TMessageType", unkn), + })) + } + } + } +} + +/// Thrift struct-field types. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum TType { + /// Indicates that there are no more serialized fields in this Thrift struct. + Stop, + /// Void (`()`) field. + Void, + /// Boolean. + Bool, + /// Signed 8-bit int. + I08, + /// Double-precision number. + Double, + /// Signed 16-bit int. + I16, + /// Signed 32-bit int. + I32, + /// Signed 64-bit int. + I64, + /// UTF-8 string. + String, + /// UTF-7 string. *Unsupported*. + Utf7, + /// Thrift struct. + Struct, + /// Map. + Map, + /// Set. + Set, + /// List. + List, + /// UTF-8 string. + Utf8, + /// UTF-16 string. *Unsupported*. + Utf16, +} + +impl Display for TType { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + match *self { + TType::Stop => write!(f, "STOP"), + TType::Void => write!(f, "void"), + TType::Bool => write!(f, "bool"), + TType::I08 => write!(f, "i08"), + TType::Double => write!(f, "double"), + TType::I16 => write!(f, "i16"), + TType::I32 => write!(f, "i32"), + TType::I64 => write!(f, "i64"), + TType::String => write!(f, "string"), + TType::Utf7 => write!(f, "UTF7"), + TType::Struct => write!(f, "struct"), + TType::Map => write!(f, "map"), + TType::Set => write!(f, "set"), + TType::List => write!(f, "list"), + TType::Utf8 => write!(f, "UTF8"), + TType::Utf16 => write!(f, "UTF16"), + } + } +} + +/// Compare the expected message sequence number `expected` with the received +/// message sequence number `actual`. +/// +/// Return `()` if `actual == expected`, `Err` otherwise. +pub fn verify_expected_sequence_number(expected: i32, actual: i32) -> ::Result<()> { + if expected == actual { + Ok(()) + } else { + Err(::Error::Application(::ApplicationError { + kind: ::ApplicationErrorKind::BadSequenceId, + message: format!("expected {} got {}", expected, actual), + })) + } +} + +/// Compare the expected service-call name `expected` with the received +/// service-call name `actual`. +/// +/// Return `()` if `actual == expected`, `Err` otherwise. +pub fn verify_expected_service_call(expected: &str, actual: &str) -> ::Result<()> { + if expected == actual { + Ok(()) + } else { + Err(::Error::Application(::ApplicationError { + kind: ::ApplicationErrorKind::WrongMethodName, + message: format!("expected {} got {}", expected, actual), + })) + } +} + +/// Compare the expected message type `expected` with the received message type +/// `actual`. +/// +/// Return `()` if `actual == expected`, `Err` otherwise. +pub fn verify_expected_message_type(expected: TMessageType, actual: TMessageType) -> ::Result<()> { + if expected == actual { + Ok(()) + } else { + Err(::Error::Application(::ApplicationError { + kind: ::ApplicationErrorKind::InvalidMessageType, + message: format!("expected {} got {}", expected, actual), + })) + } +} + +/// Check if a required Thrift struct field exists. +/// +/// Return `()` if it does, `Err` otherwise. +pub fn verify_required_field_exists<T>(field_name: &str, field: &Option<T>) -> ::Result<()> { + match *field { + Some(_) => Ok(()), + None => { + Err(::Error::Protocol(::ProtocolError { + kind: ::ProtocolErrorKind::Unknown, + message: format!("missing required field {}", field_name), + })) + } + } +} + +/// Extract the field id from a Thrift field identifier. +/// +/// `field_ident` must *not* have `TFieldIdentifier.field_type` of type `TType::Stop`. +/// +/// Return `TFieldIdentifier.id` if an id exists, `Err` otherwise. +pub fn field_id(field_ident: &TFieldIdentifier) -> ::Result<i16> { + field_ident.id.ok_or_else(|| { + ::Error::Protocol(::ProtocolError { + kind: ::ProtocolErrorKind::Unknown, + message: format!("missing field in in {:?}", field_ident), + }) + }) +}
http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/protocol/multiplexed.rs ---------------------------------------------------------------------- diff --git a/lib/rs/src/protocol/multiplexed.rs b/lib/rs/src/protocol/multiplexed.rs new file mode 100644 index 0000000..15fe608 --- /dev/null +++ b/lib/rs/src/protocol/multiplexed.rs @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, + TOutputProtocol, TSetIdentifier, TStructIdentifier}; + +/// `TOutputProtocol` that prefixes the service name to all outgoing Thrift +/// messages. +/// +/// A `TMultiplexedOutputProtocol` should be used when multiple Thrift services +/// send messages over a single I/O channel. By prefixing service identifiers +/// to outgoing messages receivers are able to demux them and route them to the +/// appropriate service processor. Rust receivers must use a `TMultiplexedProcessor` +/// to process incoming messages, while other languages must use their +/// corresponding multiplexed processor implementations. +/// +/// For example, given a service `TestService` and a service call `test_call`, +/// this implementation would identify messages as originating from +/// `TestService:test_call`. +/// +/// # Examples +/// +/// Create and use a `TMultiplexedOutputProtocol`. +/// +/// ```no_run +/// use std::cell::RefCell; +/// use std::rc::Rc; +/// use thrift::protocol::{TMessageIdentifier, TMessageType, TOutputProtocol}; +/// use thrift::protocol::{TBinaryOutputProtocol, TMultiplexedOutputProtocol}; +/// use thrift::transport::{TTcpTransport, TTransport}; +/// +/// let mut transport = TTcpTransport::new(); +/// transport.open("localhost:9090").unwrap(); +/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>)); +/// +/// let o_prot = TBinaryOutputProtocol::new(transport, true); +/// let mut o_prot = TMultiplexedOutputProtocol::new("service_name", Box::new(o_prot)); +/// +/// let ident = TMessageIdentifier::new("svc_call", TMessageType::Call, 1); +/// o_prot.write_message_begin(&ident).unwrap(); +/// ``` +pub struct TMultiplexedOutputProtocol { + service_name: String, + inner: Box<TOutputProtocol>, +} + +impl TMultiplexedOutputProtocol { + /// Create a `TMultiplexedOutputProtocol` that identifies outgoing messages + /// as originating from a service named `service_name` and sends them over + /// the `wrapped` `TOutputProtocol`. Outgoing messages are encoded and sent + /// by `wrapped`, not by this instance. + pub fn new(service_name: &str, wrapped: Box<TOutputProtocol>) -> TMultiplexedOutputProtocol { + TMultiplexedOutputProtocol { + service_name: service_name.to_owned(), + inner: wrapped, + } + } +} + +// FIXME: avoid passthrough methods +impl TOutputProtocol for TMultiplexedOutputProtocol { + fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> ::Result<()> { + match identifier.message_type { // FIXME: is there a better way to override identifier here? + TMessageType::Call | TMessageType::OneWay => { + let identifier = TMessageIdentifier { + name: format!("{}:{}", self.service_name, identifier.name), + ..*identifier + }; + self.inner.write_message_begin(&identifier) + } + _ => self.inner.write_message_begin(identifier), + } + } + + fn write_message_end(&mut self) -> ::Result<()> { + self.inner.write_message_end() + } + + fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> ::Result<()> { + self.inner.write_struct_begin(identifier) + } + + fn write_struct_end(&mut self) -> ::Result<()> { + self.inner.write_struct_end() + } + + fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> ::Result<()> { + self.inner.write_field_begin(identifier) + } + + fn write_field_end(&mut self) -> ::Result<()> { + self.inner.write_field_end() + } + + fn write_field_stop(&mut self) -> ::Result<()> { + self.inner.write_field_stop() + } + + fn write_bytes(&mut self, b: &[u8]) -> ::Result<()> { + self.inner.write_bytes(b) + } + + fn write_bool(&mut self, b: bool) -> ::Result<()> { + self.inner.write_bool(b) + } + + fn write_i8(&mut self, i: i8) -> ::Result<()> { + self.inner.write_i8(i) + } + + fn write_i16(&mut self, i: i16) -> ::Result<()> { + self.inner.write_i16(i) + } + + fn write_i32(&mut self, i: i32) -> ::Result<()> { + self.inner.write_i32(i) + } + + fn write_i64(&mut self, i: i64) -> ::Result<()> { + self.inner.write_i64(i) + } + + fn write_double(&mut self, d: f64) -> ::Result<()> { + self.inner.write_double(d) + } + + fn write_string(&mut self, s: &str) -> ::Result<()> { + self.inner.write_string(s) + } + + fn write_list_begin(&mut self, identifier: &TListIdentifier) -> ::Result<()> { + self.inner.write_list_begin(identifier) + } + + fn write_list_end(&mut self) -> ::Result<()> { + self.inner.write_list_end() + } + + fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> ::Result<()> { + self.inner.write_set_begin(identifier) + } + + fn write_set_end(&mut self) -> ::Result<()> { + self.inner.write_set_end() + } + + fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> ::Result<()> { + self.inner.write_map_begin(identifier) + } + + fn write_map_end(&mut self) -> ::Result<()> { + self.inner.write_map_end() + } + + fn flush(&mut self) -> ::Result<()> { + self.inner.flush() + } + + // utility + // + + fn write_byte(&mut self, b: u8) -> ::Result<()> { + self.inner.write_byte(b) + } +} + +#[cfg(test)] +mod tests { + + use std::cell::RefCell; + use std::rc::Rc; + + use ::protocol::{TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol}; + use ::transport::{TPassThruTransport, TTransport}; + use ::transport::mem::TBufferTransport; + + use super::*; + + #[test] + fn must_write_message_begin_with_prefixed_service_name() { + let (trans, mut o_prot) = test_objects(); + + let ident = TMessageIdentifier::new("bar", TMessageType::Call, 2); + assert_success!(o_prot.write_message_begin(&ident)); + + let expected: [u8; 19] = + [0x80, 0x01 /* protocol identifier */, 0x00, 0x01 /* message type */, 0x00, + 0x00, 0x00, 0x07, 0x66, 0x6F, 0x6F /* "foo" */, 0x3A /* ":" */, 0x62, 0x61, + 0x72 /* "bar" */, 0x00, 0x00, 0x00, 0x02 /* sequence number */]; + + assert_eq!(&trans.borrow().write_buffer_to_vec(), &expected); + } + + fn test_objects() -> (Rc<RefCell<Box<TBufferTransport>>>, TMultiplexedOutputProtocol) { + let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity(40, 40)))); + + let inner: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() }); + let inner = Rc::new(RefCell::new(inner)); + + let o_prot = TBinaryOutputProtocol::new(inner.clone(), true); + let o_prot = TMultiplexedOutputProtocol::new("foo", Box::new(o_prot)); + + (mem, o_prot) + } +} http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/protocol/stored.rs ---------------------------------------------------------------------- diff --git a/lib/rs/src/protocol/stored.rs b/lib/rs/src/protocol/stored.rs new file mode 100644 index 0000000..6826c00 --- /dev/null +++ b/lib/rs/src/protocol/stored.rs @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::convert::Into; + +use ::ProtocolErrorKind; +use super::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TInputProtocol, + TSetIdentifier, TStructIdentifier}; + +/// `TInputProtocol` required to use a `TMultiplexedProcessor`. +/// +/// A `TMultiplexedProcessor` reads incoming message identifiers to determine to +/// which `TProcessor` requests should be forwarded. However, once read, those +/// message identifier bytes are no longer on the wire. Since downstream +/// processors expect to read message identifiers from the given input protocol +/// we need some way of supplying a `TMessageIdentifier` with the service-name +/// stripped. This implementation stores the received `TMessageIdentifier` +/// (without the service name) and passes it to the wrapped `TInputProtocol` +/// when `TInputProtocol::read_message_begin(...)` is called. It delegates all +/// other calls directly to the wrapped `TInputProtocol`. +/// +/// This type **should not** be used by application code. +/// +/// # Examples +/// +/// Create and use a `TStoredInputProtocol`. +/// +/// ```no_run +/// use std::cell::RefCell; +/// use std::rc::Rc; +/// use thrift; +/// use thrift::protocol::{TInputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol}; +/// use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, TStoredInputProtocol}; +/// use thrift::server::TProcessor; +/// use thrift::transport::{TTcpTransport, TTransport}; +/// +/// // sample processor +/// struct ActualProcessor; +/// impl TProcessor for ActualProcessor { +/// fn process( +/// &mut self, +/// _: &mut TInputProtocol, +/// _: &mut TOutputProtocol +/// ) -> thrift::Result<()> { +/// unimplemented!() +/// } +/// } +/// let mut processor = ActualProcessor {}; +/// +/// // construct the shared transport +/// let mut transport = TTcpTransport::new(); +/// transport.open("localhost:9090").unwrap(); +/// let transport = Rc::new(RefCell::new(Box::new(transport) as Box<TTransport>)); +/// +/// // construct the actual input and output protocols +/// let mut i_prot = TBinaryInputProtocol::new(transport.clone(), true); +/// let mut o_prot = TBinaryOutputProtocol::new(transport.clone(), true); +/// +/// // message identifier received from remote and modified to remove the service name +/// let new_msg_ident = TMessageIdentifier::new("service_call", TMessageType::Call, 1); +/// +/// // construct the proxy input protocol +/// let mut proxy_i_prot = TStoredInputProtocol::new(&mut i_prot, new_msg_ident); +/// let res = processor.process(&mut proxy_i_prot, &mut o_prot); +/// ``` +pub struct TStoredInputProtocol<'a> { + inner: &'a mut TInputProtocol, + message_ident: Option<TMessageIdentifier>, +} + +impl<'a> TStoredInputProtocol<'a> { + /// Create a `TStoredInputProtocol` that delegates all calls other than + /// `TInputProtocol::read_message_begin(...)` to a `wrapped` + /// `TInputProtocol`. `message_ident` is the modified message identifier - + /// with service name stripped - that will be passed to + /// `wrapped.read_message_begin(...)`. + pub fn new(wrapped: &mut TInputProtocol, + message_ident: TMessageIdentifier) + -> TStoredInputProtocol { + TStoredInputProtocol { + inner: wrapped, + message_ident: message_ident.into(), + } + } +} + +impl<'a> TInputProtocol for TStoredInputProtocol<'a> { + fn read_message_begin(&mut self) -> ::Result<TMessageIdentifier> { + self.message_ident.take().ok_or_else(|| { + ::errors::new_protocol_error(ProtocolErrorKind::Unknown, + "message identifier already read") + }) + } + + fn read_message_end(&mut self) -> ::Result<()> { + self.inner.read_message_end() + } + + fn read_struct_begin(&mut self) -> ::Result<Option<TStructIdentifier>> { + self.inner.read_struct_begin() + } + + fn read_struct_end(&mut self) -> ::Result<()> { + self.inner.read_struct_end() + } + + fn read_field_begin(&mut self) -> ::Result<TFieldIdentifier> { + self.inner.read_field_begin() + } + + fn read_field_end(&mut self) -> ::Result<()> { + self.inner.read_field_end() + } + + fn read_bytes(&mut self) -> ::Result<Vec<u8>> { + self.inner.read_bytes() + } + + fn read_bool(&mut self) -> ::Result<bool> { + self.inner.read_bool() + } + + fn read_i8(&mut self) -> ::Result<i8> { + self.inner.read_i8() + } + + fn read_i16(&mut self) -> ::Result<i16> { + self.inner.read_i16() + } + + fn read_i32(&mut self) -> ::Result<i32> { + self.inner.read_i32() + } + + fn read_i64(&mut self) -> ::Result<i64> { + self.inner.read_i64() + } + + fn read_double(&mut self) -> ::Result<f64> { + self.inner.read_double() + } + + fn read_string(&mut self) -> ::Result<String> { + self.inner.read_string() + } + + fn read_list_begin(&mut self) -> ::Result<TListIdentifier> { + self.inner.read_list_begin() + } + + fn read_list_end(&mut self) -> ::Result<()> { + self.inner.read_list_end() + } + + fn read_set_begin(&mut self) -> ::Result<TSetIdentifier> { + self.inner.read_set_begin() + } + + fn read_set_end(&mut self) -> ::Result<()> { + self.inner.read_set_end() + } + + fn read_map_begin(&mut self) -> ::Result<TMapIdentifier> { + self.inner.read_map_begin() + } + + fn read_map_end(&mut self) -> ::Result<()> { + self.inner.read_map_end() + } + + // utility + // + + fn read_byte(&mut self) -> ::Result<u8> { + self.inner.read_byte() + } +} http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/server/mod.rs ---------------------------------------------------------------------- diff --git a/lib/rs/src/server/mod.rs b/lib/rs/src/server/mod.rs new file mode 100644 index 0000000..ceac18a --- /dev/null +++ b/lib/rs/src/server/mod.rs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Types required to implement a Thrift server. + +use ::protocol::{TInputProtocol, TOutputProtocol}; + +mod simple; +mod multiplexed; + +pub use self::simple::TSimpleServer; +pub use self::multiplexed::TMultiplexedProcessor; + +/// Handles incoming Thrift messages and dispatches them to the user-defined +/// handler functions. +/// +/// An implementation is auto-generated for each Thrift service. When used by a +/// server (for example, a `TSimpleServer`), it will demux incoming service +/// calls and invoke the corresponding user-defined handler function. +/// +/// # Examples +/// +/// Create and start a server using the auto-generated `TProcessor` for +/// a Thrift service `SimpleService`. +/// +/// ```no_run +/// use thrift; +/// use thrift::protocol::{TInputProtocol, TOutputProtocol}; +/// use thrift::server::TProcessor; +/// +/// // +/// // auto-generated +/// // +/// +/// // processor for `SimpleService` +/// struct SimpleServiceSyncProcessor; +/// impl SimpleServiceSyncProcessor { +/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor { +/// unimplemented!(); +/// } +/// } +/// +/// // `TProcessor` implementation for `SimpleService` +/// impl TProcessor for SimpleServiceSyncProcessor { +/// fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> { +/// unimplemented!(); +/// } +/// } +/// +/// // service functions for SimpleService +/// trait SimpleServiceSyncHandler { +/// fn service_call(&mut self) -> thrift::Result<()>; +/// } +/// +/// // +/// // user-code follows +/// // +/// +/// // define a handler that will be invoked when `service_call` is received +/// struct SimpleServiceHandlerImpl; +/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl { +/// fn service_call(&mut self) -> thrift::Result<()> { +/// unimplemented!(); +/// } +/// } +/// +/// // instantiate the processor +/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {}); +/// +/// // at this point you can pass the processor to the server +/// // let server = TSimpleServer::new(..., processor); +/// ``` +pub trait TProcessor { + /// Process a Thrift service call. + /// + /// Reads arguments from `i`, executes the user's handler code, and writes + /// the response to `o`. + /// + /// Returns `()` if the handler was executed; `Err` otherwise. + fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> ::Result<()>; +} http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/server/multiplexed.rs ---------------------------------------------------------------------- diff --git a/lib/rs/src/server/multiplexed.rs b/lib/rs/src/server/multiplexed.rs new file mode 100644 index 0000000..d2314a1 --- /dev/null +++ b/lib/rs/src/server/multiplexed.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::convert::Into; + +use ::{new_application_error, ApplicationErrorKind}; +use ::protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol}; + +use super::TProcessor; + +/// A `TProcessor` that can demux service calls to multiple underlying +/// Thrift services. +/// +/// Users register service-specific `TProcessor` instances with a +/// `TMultiplexedProcessor`, and then register that processor with a server +/// implementation. Following that, all incoming service calls are automatically +/// routed to the service-specific `TProcessor`. +/// +/// A `TMultiplexedProcessor` can only handle messages sent by a +/// `TMultiplexedOutputProtocol`. +pub struct TMultiplexedProcessor { + processors: HashMap<String, Box<TProcessor>>, +} + +impl TMultiplexedProcessor { + /// Register a service-specific `processor` for the service named + /// `service_name`. + /// + /// Return `true` if this is the first registration for `service_name`. + /// + /// Return `false` if a mapping previously existed (the previous mapping is + /// *not* overwritten). + #[cfg_attr(feature = "cargo-clippy", allow(map_entry))] + pub fn register_processor<S: Into<String>>(&mut self, + service_name: S, + processor: Box<TProcessor>) + -> bool { + let name = service_name.into(); + if self.processors.contains_key(&name) { + false + } else { + self.processors.insert(name, processor); + true + } + } +} + +impl TProcessor for TMultiplexedProcessor { + fn process(&mut self, + i_prot: &mut TInputProtocol, + o_prot: &mut TOutputProtocol) + -> ::Result<()> { + let msg_ident = i_prot.read_message_begin()?; + let sep_index = msg_ident.name + .find(':') + .ok_or_else(|| { + new_application_error(ApplicationErrorKind::Unknown, + "no service separator found in incoming message") + })?; + + let (svc_name, svc_call) = msg_ident.name.split_at(sep_index); + + match self.processors.get_mut(svc_name) { + Some(ref mut processor) => { + let new_msg_ident = TMessageIdentifier::new(svc_call, + msg_ident.message_type, + msg_ident.sequence_number); + let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident); + processor.process(&mut proxy_i_prot, o_prot) + } + None => { + Err(new_application_error(ApplicationErrorKind::Unknown, + format!("no processor found for service {}", svc_name))) + } + } + } +} http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/server/simple.rs ---------------------------------------------------------------------- diff --git a/lib/rs/src/server/simple.rs b/lib/rs/src/server/simple.rs new file mode 100644 index 0000000..89ed977 --- /dev/null +++ b/lib/rs/src/server/simple.rs @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cell::RefCell; +use std::net::{TcpListener, TcpStream}; +use std::rc::Rc; + +use ::{ApplicationError, ApplicationErrorKind}; +use ::protocol::{TInputProtocolFactory, TOutputProtocolFactory}; +use ::transport::{TTcpTransport, TTransport, TTransportFactory}; + +use super::TProcessor; + +/// Single-threaded blocking Thrift socket server. +/// +/// A `TSimpleServer` listens on a given address and services accepted +/// connections *synchronously* and *sequentially* - i.e. in a blocking manner, +/// one at a time - on the main thread. Each accepted connection has an input +/// half and an output half, each of which uses a `TTransport` and `TProtocol` +/// to translate messages to and from byes. Any combination of `TProtocol` and +/// `TTransport` may be used. +/// +/// # Examples +/// +/// Creating and running a `TSimpleServer` using Thrift-compiler-generated +/// service code. +/// +/// ```no_run +/// use thrift; +/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory}; +/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory}; +/// use thrift::protocol::{TInputProtocol, TOutputProtocol}; +/// use thrift::transport::{TBufferedTransportFactory, TTransportFactory}; +/// use thrift::server::{TProcessor, TSimpleServer}; +/// +/// // +/// // auto-generated +/// // +/// +/// // processor for `SimpleService` +/// struct SimpleServiceSyncProcessor; +/// impl SimpleServiceSyncProcessor { +/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor { +/// unimplemented!(); +/// } +/// } +/// +/// // `TProcessor` implementation for `SimpleService` +/// impl TProcessor for SimpleServiceSyncProcessor { +/// fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> { +/// unimplemented!(); +/// } +/// } +/// +/// // service functions for SimpleService +/// trait SimpleServiceSyncHandler { +/// fn service_call(&mut self) -> thrift::Result<()>; +/// } +/// +/// // +/// // user-code follows +/// // +/// +/// // define a handler that will be invoked when `service_call` is received +/// struct SimpleServiceHandlerImpl; +/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl { +/// fn service_call(&mut self) -> thrift::Result<()> { +/// unimplemented!(); +/// } +/// } +/// +/// // instantiate the processor +/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {}); +/// +/// // instantiate the server +/// let i_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new()); +/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new()); +/// let o_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new()); +/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new()); +/// +/// let mut server = TSimpleServer::new( +/// i_tr_fact, +/// i_pr_fact, +/// o_tr_fact, +/// o_pr_fact, +/// processor +/// ); +/// +/// // start listening for incoming connections +/// match server.listen("127.0.0.1:8080") { +/// Ok(_) => println!("listen completed"), +/// Err(e) => println!("listen failed with error {:?}", e), +/// } +/// ``` +pub struct TSimpleServer<PR: TProcessor> { + i_trans_factory: Box<TTransportFactory>, + i_proto_factory: Box<TInputProtocolFactory>, + o_trans_factory: Box<TTransportFactory>, + o_proto_factory: Box<TOutputProtocolFactory>, + processor: PR, +} + +impl<PR: TProcessor> TSimpleServer<PR> { + /// Create a `TSimpleServer`. + /// + /// Each accepted connection has an input and output half, each of which + /// requires a `TTransport` and `TProtocol`. `TSimpleServer` uses + /// `input_transport_factory` and `input_protocol_factory` to create + /// implementations for the input, and `output_transport_factory` and + /// `output_protocol_factory` to create implementations for the output. + pub fn new(input_transport_factory: Box<TTransportFactory>, + input_protocol_factory: Box<TInputProtocolFactory>, + output_transport_factory: Box<TTransportFactory>, + output_protocol_factory: Box<TOutputProtocolFactory>, + processor: PR) + -> TSimpleServer<PR> { + TSimpleServer { + i_trans_factory: input_transport_factory, + i_proto_factory: input_protocol_factory, + o_trans_factory: output_transport_factory, + o_proto_factory: output_protocol_factory, + processor: processor, + } + } + + /// Listen for incoming connections on `listen_address`. + /// + /// `listen_address` should be in the form `host:port`, + /// for example: `127.0.0.1:8080`. + /// + /// Return `()` if successful. + /// + /// Return `Err` when the server cannot bind to `listen_address` or there + /// is an unrecoverable error. + pub fn listen(&mut self, listen_address: &str) -> ::Result<()> { + let listener = TcpListener::bind(listen_address)?; + for stream in listener.incoming() { + match stream { + Ok(s) => self.handle_incoming_connection(s), + Err(e) => warn!("failed to accept remote connection with error {:?}", e), + } + } + + Err(::Error::Application(ApplicationError { + kind: ApplicationErrorKind::Unknown, + message: "aborted listen loop".into(), + })) + } + + fn handle_incoming_connection(&mut self, stream: TcpStream) { + // create the shared tcp stream + let stream = TTcpTransport::with_stream(stream); + let stream: Box<TTransport> = Box::new(stream); + let stream = Rc::new(RefCell::new(stream)); + + // input protocol and transport + let i_tran = self.i_trans_factory.create(stream.clone()); + let i_tran = Rc::new(RefCell::new(i_tran)); + let mut i_prot = self.i_proto_factory.create(i_tran); + + // output protocol and transport + let o_tran = self.o_trans_factory.create(stream.clone()); + let o_tran = Rc::new(RefCell::new(o_tran)); + let mut o_prot = self.o_proto_factory.create(o_tran); + + // process loop + loop { + let r = self.processor.process(&mut *i_prot, &mut *o_prot); + if let Err(e) = r { + warn!("processor failed with error: {:?}", e); + break; // FIXME: close here + } + } + } +} http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/transport/buffered.rs ---------------------------------------------------------------------- diff --git a/lib/rs/src/transport/buffered.rs b/lib/rs/src/transport/buffered.rs new file mode 100644 index 0000000..3f240d8 --- /dev/null +++ b/lib/rs/src/transport/buffered.rs @@ -0,0 +1,400 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cell::RefCell; +use std::cmp; +use std::io; +use std::io::{Read, Write}; +use std::rc::Rc; + +use super::{TTransport, TTransportFactory}; + +/// Default capacity of the read buffer in bytes. +const DEFAULT_RBUFFER_CAPACITY: usize = 4096; + +/// Default capacity of the write buffer in bytes.. +const DEFAULT_WBUFFER_CAPACITY: usize = 4096; + +/// Transport that communicates with endpoints using a byte stream. +/// +/// A `TBufferedTransport` maintains a fixed-size internal write buffer. All +/// writes are made to this buffer and are sent to the wrapped transport only +/// when `TTransport::flush()` is called. On a flush a fixed-length header with a +/// count of the buffered bytes is written, followed by the bytes themselves. +/// +/// A `TBufferedTransport` also maintains a fixed-size internal read buffer. +/// On a call to `TTransport::read(...)` one full message - both fixed-length +/// header and bytes - is read from the wrapped transport and buffered. +/// Subsequent read calls are serviced from the internal buffer until it is +/// exhausted, at which point the next full message is read from the wrapped +/// transport. +/// +/// # Examples +/// +/// Create and use a `TBufferedTransport`. +/// +/// ```no_run +/// use std::cell::RefCell; +/// use std::rc::Rc; +/// use std::io::{Read, Write}; +/// use thrift::transport::{TBufferedTransport, TTcpTransport, TTransport}; +/// +/// let mut t = TTcpTransport::new(); +/// t.open("localhost:9090").unwrap(); +/// +/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>)); +/// let mut t = TBufferedTransport::new(t); +/// +/// // read +/// t.read(&mut vec![0u8; 1]).unwrap(); +/// +/// // write +/// t.write(&[0x00]).unwrap(); +/// t.flush().unwrap(); +/// ``` +pub struct TBufferedTransport { + rbuf: Box<[u8]>, + rpos: usize, + rcap: usize, + wbuf: Vec<u8>, + inner: Rc<RefCell<Box<TTransport>>>, +} + +impl TBufferedTransport { + /// Create a `TBufferedTransport` with default-sized internal read and + /// write buffers that wraps an `inner` `TTransport`. + pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TBufferedTransport { + TBufferedTransport::with_capacity(DEFAULT_RBUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner) + } + + /// Create a `TBufferedTransport` with an internal read buffer of size + /// `read_buffer_capacity` and an internal write buffer of size + /// `write_buffer_capacity` that wraps an `inner` `TTransport`. + pub fn with_capacity(read_buffer_capacity: usize, + write_buffer_capacity: usize, + inner: Rc<RefCell<Box<TTransport>>>) + -> TBufferedTransport { + TBufferedTransport { + rbuf: vec![0; read_buffer_capacity].into_boxed_slice(), + rpos: 0, + rcap: 0, + wbuf: Vec::with_capacity(write_buffer_capacity), + inner: inner, + } + } + + fn get_bytes(&mut self) -> io::Result<&[u8]> { + if self.rcap - self.rpos == 0 { + self.rpos = 0; + self.rcap = self.inner.borrow_mut().read(&mut self.rbuf)?; + } + + Ok(&self.rbuf[self.rpos..self.rcap]) + } + + fn consume(&mut self, consumed: usize) { + // TODO: was a bug here += <-- test somehow + self.rpos = cmp::min(self.rcap, self.rpos + consumed); + } +} + +impl Read for TBufferedTransport { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + let mut bytes_read = 0; + + loop { + let nread = { + let avail_bytes = self.get_bytes()?; + let avail_space = buf.len() - bytes_read; + let nread = cmp::min(avail_space, avail_bytes.len()); + buf[bytes_read..(bytes_read + nread)].copy_from_slice(&avail_bytes[..nread]); + nread + }; + + self.consume(nread); + bytes_read += nread; + + if bytes_read == buf.len() || nread == 0 { + break; + } + } + + Ok(bytes_read) + } +} + +impl Write for TBufferedTransport { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + let avail_bytes = cmp::min(buf.len(), self.wbuf.capacity() - self.wbuf.len()); + self.wbuf.extend_from_slice(&buf[..avail_bytes]); + assert!(self.wbuf.len() <= self.wbuf.capacity(), + "copy overflowed buffer"); + Ok(avail_bytes) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.borrow_mut().write_all(&self.wbuf)?; + self.inner.borrow_mut().flush()?; + self.wbuf.clear(); + Ok(()) + } +} + +/// Factory for creating instances of `TBufferedTransport` +#[derive(Default)] +pub struct TBufferedTransportFactory; + +impl TBufferedTransportFactory { + /// Create a `TBufferedTransportFactory`. + pub fn new() -> TBufferedTransportFactory { + TBufferedTransportFactory {} + } +} + +impl TTransportFactory for TBufferedTransportFactory { + fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> { + Box::new(TBufferedTransport::new(inner)) as Box<TTransport> + } +} + +#[cfg(test)] +mod tests { + use std::cell::RefCell; + use std::io::{Read, Write}; + use std::rc::Rc; + + use super::*; + use ::transport::{TPassThruTransport, TTransport}; + use ::transport::mem::TBufferTransport; + + macro_rules! new_transports { + ($wbc:expr, $rbc:expr) => ( + { + let mem = Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity($wbc, $rbc)))); + let thru: Box<TTransport> = Box::new(TPassThruTransport { inner: mem.clone() }); + let thru = Rc::new(RefCell::new(thru)); + (mem, thru) + } + ); + } + + #[test] + fn must_return_zero_if_read_buffer_is_empty() { + let (_, thru) = new_transports!(10, 0); + let mut t = TBufferedTransport::with_capacity(10, 0, thru); + + let mut b = vec![0; 10]; + let read_result = t.read(&mut b); + + assert_eq!(read_result.unwrap(), 0); + } + + #[test] + fn must_return_zero_if_caller_reads_into_zero_capacity_buffer() { + let (_, thru) = new_transports!(10, 0); + let mut t = TBufferedTransport::with_capacity(10, 0, thru); + + let read_result = t.read(&mut []); + + assert_eq!(read_result.unwrap(), 0); + } + + #[test] + fn must_return_zero_if_nothing_more_can_be_read() { + let (mem, thru) = new_transports!(4, 0); + let mut t = TBufferedTransport::with_capacity(4, 0, thru); + + mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]); + + // read buffer is exactly the same size as bytes available + let mut buf = vec![0u8; 4]; + let read_result = t.read(&mut buf); + + // we've read exactly 4 bytes + assert_eq!(read_result.unwrap(), 4); + assert_eq!(&buf, &[0, 1, 2, 3]); + + // try read again + let buf_again = vec![0u8; 4]; + let read_result = t.read(&mut buf); + + // this time, 0 bytes and we haven't changed the buffer + assert_eq!(read_result.unwrap(), 0); + assert_eq!(&buf_again, &[0, 0, 0, 0]) + } + + #[test] + fn must_fill_user_buffer_with_only_as_many_bytes_as_available() { + let (mem, thru) = new_transports!(4, 0); + let mut t = TBufferedTransport::with_capacity(4, 0, thru); + + mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]); + + // read buffer is much larger than the bytes available + let mut buf = vec![0u8; 8]; + let read_result = t.read(&mut buf); + + // we've read exactly 4 bytes + assert_eq!(read_result.unwrap(), 4); + assert_eq!(&buf[..4], &[0, 1, 2, 3]); + + // try read again + let read_result = t.read(&mut buf[4..]); + + // this time, 0 bytes and we haven't changed the buffer + assert_eq!(read_result.unwrap(), 0); + assert_eq!(&buf, &[0, 1, 2, 3, 0, 0, 0, 0]) + } + + #[test] + fn must_read_successfully() { + // this test involves a few loops within the buffered transport + // itself where it has to drain the underlying transport in order + // to service a read + + // we have a much smaller buffer than the + // underlying transport has bytes available + let (mem, thru) = new_transports!(10, 0); + let mut t = TBufferedTransport::with_capacity(2, 0, thru); + + // fill the underlying transport's byte buffer + let mut readable_bytes = [0u8; 10]; + for i in 0..10 { + readable_bytes[i] = i as u8; + } + mem.borrow_mut().set_readable_bytes(&readable_bytes); + + // we ask to read into a buffer that's much larger + // than the one the buffered transport has; as a result + // it's going to have to keep asking the underlying + // transport for more bytes + let mut buf = [0u8; 8]; + let read_result = t.read(&mut buf); + + // we should have read 8 bytes + assert_eq!(read_result.unwrap(), 8); + assert_eq!(&buf, &[0, 1, 2, 3, 4, 5, 6, 7]); + + // let's clear out the buffer and try read again + for i in 0..8 { + buf[i] = 0; + } + let read_result = t.read(&mut buf); + + // this time we were only able to read 2 bytes + // (all that's remaining from the underlying transport) + // let's also check that the remaining bytes are untouched + assert_eq!(read_result.unwrap(), 2); + assert_eq!(&buf[0..2], &[8, 9]); + assert_eq!(&buf[2..], &[0, 0, 0, 0, 0, 0]); + + // try read again (we should get 0) + // and all the existing bytes were untouched + let read_result = t.read(&mut buf); + assert_eq!(read_result.unwrap(), 0); + assert_eq!(&buf[0..2], &[8, 9]); + assert_eq!(&buf[2..], &[0, 0, 0, 0, 0, 0]); + } + + #[test] + fn must_return_zero_if_nothing_can_be_written() { + let (_, thru) = new_transports!(0, 0); + let mut t = TBufferedTransport::with_capacity(0, 0, thru); + + let b = vec![0; 10]; + let r = t.write(&b); + + assert_eq!(r.unwrap(), 0); + } + + #[test] + fn must_return_zero_if_caller_calls_write_with_empty_buffer() { + let (mem, thru) = new_transports!(0, 10); + let mut t = TBufferedTransport::with_capacity(0, 10, thru); + + let r = t.write(&[]); + + assert_eq!(r.unwrap(), 0); + assert_eq!(mem.borrow_mut().write_buffer_as_ref(), &[]); + } + + #[test] + fn must_return_zero_if_write_buffer_full() { + let (_, thru) = new_transports!(0, 0); + let mut t = TBufferedTransport::with_capacity(0, 4, thru); + + let b = [0x00, 0x01, 0x02, 0x03]; + + // we've now filled the write buffer + let r = t.write(&b); + assert_eq!(r.unwrap(), 4); + + // try write the same bytes again - nothing should be writable + let r = t.write(&b); + assert_eq!(r.unwrap(), 0); + } + + #[test] + fn must_only_write_to_inner_transport_on_flush() { + let (mem, thru) = new_transports!(10, 10); + let mut t = TBufferedTransport::new(thru); + + let b: [u8; 5] = [0, 1, 2, 3, 4]; + assert_eq!(t.write(&b).unwrap(), 5); + assert_eq!(mem.borrow_mut().write_buffer_as_ref().len(), 0); + + assert!(t.flush().is_ok()); + + { + let inner = mem.borrow_mut(); + let underlying_buffer = inner.write_buffer_as_ref(); + assert_eq!(b, underlying_buffer); + } + } + + #[test] + fn must_write_successfully_after_flush() { + let (mem, thru) = new_transports!(0, 5); + let mut t = TBufferedTransport::with_capacity(0, 5, thru); + + // write and flush + let b: [u8; 5] = [0, 1, 2, 3, 4]; + assert_eq!(t.write(&b).unwrap(), 5); + assert!(t.flush().is_ok()); + + // check the flushed bytes + { + let inner = mem.borrow_mut(); + let underlying_buffer = inner.write_buffer_as_ref(); + assert_eq!(b, underlying_buffer); + } + + // reset our underlying transport + mem.borrow_mut().empty_write_buffer(); + + // write and flush again + assert_eq!(t.write(&b).unwrap(), 5); + assert!(t.flush().is_ok()); + + // check the flushed bytes + { + let inner = mem.borrow_mut(); + let underlying_buffer = inner.write_buffer_as_ref(); + assert_eq!(b, underlying_buffer); + } + } +} http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/transport/framed.rs ---------------------------------------------------------------------- diff --git a/lib/rs/src/transport/framed.rs b/lib/rs/src/transport/framed.rs new file mode 100644 index 0000000..75c12f4 --- /dev/null +++ b/lib/rs/src/transport/framed.rs @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::cell::RefCell; +use std::cmp; +use std::io; +use std::io::{ErrorKind, Read, Write}; +use std::rc::Rc; + +use super::{TTransport, TTransportFactory}; + +/// Default capacity of the read buffer in bytes. +const WRITE_BUFFER_CAPACITY: usize = 4096; + +/// Default capacity of the write buffer in bytes.. +const DEFAULT_WBUFFER_CAPACITY: usize = 4096; + +/// Transport that communicates with endpoints using framed messages. +/// +/// A `TFramedTransport` maintains a fixed-size internal write buffer. All +/// writes are made to this buffer and are sent to the wrapped transport only +/// when `TTransport::flush()` is called. On a flush a fixed-length header with a +/// count of the buffered bytes is written, followed by the bytes themselves. +/// +/// A `TFramedTransport` also maintains a fixed-size internal read buffer. +/// On a call to `TTransport::read(...)` one full message - both fixed-length +/// header and bytes - is read from the wrapped transport and buffered. +/// Subsequent read calls are serviced from the internal buffer until it is +/// exhausted, at which point the next full message is read from the wrapped +/// transport. +/// +/// # Examples +/// +/// Create and use a `TFramedTransport`. +/// +/// ```no_run +/// use std::cell::RefCell; +/// use std::rc::Rc; +/// use std::io::{Read, Write}; +/// use thrift::transport::{TFramedTransport, TTcpTransport, TTransport}; +/// +/// let mut t = TTcpTransport::new(); +/// t.open("localhost:9090").unwrap(); +/// +/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>)); +/// let mut t = TFramedTransport::new(t); +/// +/// // read +/// t.read(&mut vec![0u8; 1]).unwrap(); +/// +/// // write +/// t.write(&[0x00]).unwrap(); +/// t.flush().unwrap(); +/// ``` +pub struct TFramedTransport { + rbuf: Box<[u8]>, + rpos: usize, + rcap: usize, + wbuf: Box<[u8]>, + wpos: usize, + inner: Rc<RefCell<Box<TTransport>>>, +} + +impl TFramedTransport { + /// Create a `TFramedTransport` with default-sized internal read and + /// write buffers that wraps an `inner` `TTransport`. + pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TFramedTransport { + TFramedTransport::with_capacity(WRITE_BUFFER_CAPACITY, DEFAULT_WBUFFER_CAPACITY, inner) + } + + /// Create a `TFramedTransport` with an internal read buffer of size + /// `read_buffer_capacity` and an internal write buffer of size + /// `write_buffer_capacity` that wraps an `inner` `TTransport`. + pub fn with_capacity(read_buffer_capacity: usize, + write_buffer_capacity: usize, + inner: Rc<RefCell<Box<TTransport>>>) + -> TFramedTransport { + TFramedTransport { + rbuf: vec![0; read_buffer_capacity].into_boxed_slice(), + rpos: 0, + rcap: 0, + wbuf: vec![0; write_buffer_capacity].into_boxed_slice(), + wpos: 0, + inner: inner, + } + } +} + +impl Read for TFramedTransport { + fn read(&mut self, b: &mut [u8]) -> io::Result<usize> { + if self.rcap - self.rpos == 0 { + let message_size = self.inner.borrow_mut().read_i32::<BigEndian>()? as usize; + if message_size > self.rbuf.len() { + return Err(io::Error::new(ErrorKind::Other, + format!("bytes to be read ({}) exceeds buffer \ + capacity ({})", + message_size, + self.rbuf.len()))); + } + self.inner.borrow_mut().read_exact(&mut self.rbuf[..message_size])?; + self.rpos = 0; + self.rcap = message_size as usize; + } + + let nread = cmp::min(b.len(), self.rcap - self.rpos); + b[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]); + self.rpos += nread; + + Ok(nread) + } +} + +impl Write for TFramedTransport { + fn write(&mut self, b: &[u8]) -> io::Result<usize> { + if b.len() > (self.wbuf.len() - self.wpos) { + return Err(io::Error::new(ErrorKind::Other, + format!("bytes to be written ({}) exceeds buffer \ + capacity ({})", + b.len(), + self.wbuf.len() - self.wpos))); + } + + let nwrite = b.len(); // always less than available write buffer capacity + self.wbuf[self.wpos..(self.wpos + nwrite)].clone_from_slice(b); + self.wpos += nwrite; + Ok(nwrite) + } + + fn flush(&mut self) -> io::Result<()> { + let message_size = self.wpos; + + if let 0 = message_size { + return Ok(()); + } else { + self.inner.borrow_mut().write_i32::<BigEndian>(message_size as i32)?; + } + + let mut byte_index = 0; + while byte_index < self.wpos { + let nwrite = self.inner.borrow_mut().write(&self.wbuf[byte_index..self.wpos])?; + byte_index = cmp::min(byte_index + nwrite, self.wpos); + } + + self.wpos = 0; + self.inner.borrow_mut().flush() + } +} + +/// Factory for creating instances of `TFramedTransport`. +#[derive(Default)] +pub struct TFramedTransportFactory; + +impl TFramedTransportFactory { + // Create a `TFramedTransportFactory`. + pub fn new() -> TFramedTransportFactory { + TFramedTransportFactory {} + } +} + +impl TTransportFactory for TFramedTransportFactory { + fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> { + Box::new(TFramedTransport::new(inner)) as Box<TTransport> + } +} + +#[cfg(test)] +mod tests { + // use std::io::{Read, Write}; + // + // use super::*; + // use ::transport::mem::TBufferTransport; +}
