http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/protocol/mod.rs
----------------------------------------------------------------------
diff --git a/lib/rs/src/protocol/mod.rs b/lib/rs/src/protocol/mod.rs
new file mode 100644
index 0000000..b230d63
--- /dev/null
+++ b/lib/rs/src/protocol/mod.rs
@@ -0,0 +1,709 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Types used to send and receive primitives between a Thrift client and 
server.
+//!
+//! # Examples
+//!
+//! Create and use a `TOutputProtocol`.
+//!
+//! ```no_run
+//! use std::cell::RefCell;
+//! use std::rc::Rc;
+//! use thrift::protocol::{TBinaryOutputProtocol, TFieldIdentifier, 
TOutputProtocol, TType};
+//! use thrift::transport::{TTcpTransport, TTransport};
+//!
+//! // create the I/O channel
+//! let mut transport = TTcpTransport::new();
+//! transport.open("127.0.0.1:9090").unwrap();
+//! let transport = Rc::new(RefCell::new(Box::new(transport) as 
Box<TTransport>));
+//!
+//! // create the protocol to encode types into bytes
+//! let mut o_prot = TBinaryOutputProtocol::new(transport.clone(), true);
+//!
+//! // write types
+//! o_prot.write_field_begin(&TFieldIdentifier::new("string_thing", 
TType::String, 1)).unwrap();
+//! o_prot.write_string("foo").unwrap();
+//! o_prot.write_field_end().unwrap();
+//! ```
+//!
+//! Create and use a `TInputProtocol`.
+//!
+//! ```no_run
+//! use std::cell::RefCell;
+//! use std::rc::Rc;
+//! use thrift::protocol::{TBinaryInputProtocol, TInputProtocol};
+//! use thrift::transport::{TTcpTransport, TTransport};
+//!
+//! // create the I/O channel
+//! let mut transport = TTcpTransport::new();
+//! transport.open("127.0.0.1:9090").unwrap();
+//! let transport = Rc::new(RefCell::new(Box::new(transport) as 
Box<TTransport>));
+//!
+//! // create the protocol to decode bytes into types
+//! let mut i_prot = TBinaryInputProtocol::new(transport.clone(), true);
+//!
+//! // read types from the wire
+//! let field_identifier = i_prot.read_field_begin().unwrap();
+//! let field_contents = i_prot.read_string().unwrap();
+//! let field_end = i_prot.read_field_end().unwrap();
+//! ```
+
+use std::cell::RefCell;
+use std::fmt;
+use std::fmt::{Display, Formatter};
+use std::convert::From;
+use std::rc::Rc;
+use try_from::TryFrom;
+
+use ::{ProtocolError, ProtocolErrorKind};
+use ::transport::TTransport;
+
+mod binary;
+mod compact;
+mod multiplexed;
+mod stored;
+
+pub use self::binary::{TBinaryInputProtocol, TBinaryInputProtocolFactory, 
TBinaryOutputProtocol,
+                       TBinaryOutputProtocolFactory};
+pub use self::compact::{TCompactInputProtocol, TCompactInputProtocolFactory,
+                        TCompactOutputProtocol, TCompactOutputProtocolFactory};
+pub use self::multiplexed::TMultiplexedOutputProtocol;
+pub use self::stored::TStoredInputProtocol;
+
+// Default maximum depth to which `TInputProtocol::skip` will skip a Thrift
+// field. A default is necessary because Thrift structs or collections may
+// contain nested structs and collections, which could result in indefinite
+// recursion.
+const MAXIMUM_SKIP_DEPTH: i8 = 64;
+
+/// Converts a stream of bytes into Thrift identifiers, primitives,
+/// containers, or structs.
+///
+/// This trait does not deal with higher-level Thrift concepts like structs or
+/// exceptions - only with primitives and message or container boundaries. Once
+/// bytes are read they are deserialized and an identifier (for example
+/// `TMessageIdentifier`) or a primitive is returned.
+///
+/// All methods return a `thrift::Result`. If an `Err` is returned the protocol
+/// instance and its underlying transport should be terminated.
+///
+/// # Examples
+///
+/// Create and use a `TInputProtocol`
+///
+/// ```no_run
+/// use std::cell::RefCell;
+/// use std::rc::Rc;
+/// use thrift::protocol::{TBinaryInputProtocol, TInputProtocol};
+/// use thrift::transport::{TTcpTransport, TTransport};
+///
+/// let mut transport = TTcpTransport::new();
+/// transport.open("127.0.0.1:9090").unwrap();
+/// let transport = Rc::new(RefCell::new(Box::new(transport) as 
Box<TTransport>));
+///
+/// let mut i_prot = TBinaryInputProtocol::new(transport.clone(), true);
+///
+/// let field_identifier = i_prot.read_field_begin().unwrap();
+/// let field_contents = i_prot.read_string().unwrap();
+/// let field_end = i_prot.read_field_end().unwrap();
+/// ```
+pub trait TInputProtocol {
+    /// Read the beginning of a Thrift message.
+    fn read_message_begin(&mut self) -> ::Result<TMessageIdentifier>;
+    /// Read the end of a Thrift message.
+    fn read_message_end(&mut self) -> ::Result<()>;
+    /// Read the beginning of a Thrift struct.
+    fn read_struct_begin(&mut self) -> ::Result<Option<TStructIdentifier>>;
+    /// Read the end of a Thrift struct.
+    fn read_struct_end(&mut self) -> ::Result<()>;
+    /// Read the beginning of a Thrift struct field.
+    fn read_field_begin(&mut self) -> ::Result<TFieldIdentifier>;
+    /// Read the end of a Thrift struct field.
+    fn read_field_end(&mut self) -> ::Result<()>;
+    /// Read a bool.
+    fn read_bool(&mut self) -> ::Result<bool>;
+    /// Read a fixed-length byte array.
+    fn read_bytes(&mut self) -> ::Result<Vec<u8>>;
+    /// Read a word.
+    fn read_i8(&mut self) -> ::Result<i8>;
+    /// Read a 16-bit signed integer.
+    fn read_i16(&mut self) -> ::Result<i16>;
+    /// Read a 32-bit signed integer.
+    fn read_i32(&mut self) -> ::Result<i32>;
+    /// Read a 64-bit signed integer.
+    fn read_i64(&mut self) -> ::Result<i64>;
+    /// Read a 64-bit float.
+    fn read_double(&mut self) -> ::Result<f64>;
+    /// Read a fixed-length string (not null terminated).
+    fn read_string(&mut self) -> ::Result<String>;
+    /// Read the beginning of a list.
+    fn read_list_begin(&mut self) -> ::Result<TListIdentifier>;
+    /// Read the end of a list.
+    fn read_list_end(&mut self) -> ::Result<()>;
+    /// Read the beginning of a set.
+    fn read_set_begin(&mut self) -> ::Result<TSetIdentifier>;
+    /// Read the end of a set.
+    fn read_set_end(&mut self) -> ::Result<()>;
+    /// Read the beginning of a map.
+    fn read_map_begin(&mut self) -> ::Result<TMapIdentifier>;
+    /// Read the end of a map.
+    fn read_map_end(&mut self) -> ::Result<()>;
+    /// Skip a field with type `field_type` recursively until the default
+    /// maximum skip depth is reached.
+    fn skip(&mut self, field_type: TType) -> ::Result<()> {
+        self.skip_till_depth(field_type, MAXIMUM_SKIP_DEPTH)
+    }
+    /// Skip a field with type `field_type` recursively up to `depth` levels.
+    fn skip_till_depth(&mut self, field_type: TType, depth: i8) -> 
::Result<()> {
+        if depth == 0 {
+            return Err(::Error::Protocol(ProtocolError {
+                kind: ProtocolErrorKind::DepthLimit,
+                message: format!("cannot parse past {:?}", field_type),
+            }));
+        }
+
+        match field_type {
+            TType::Bool => self.read_bool().map(|_| ()),
+            TType::I08 => self.read_i8().map(|_| ()),
+            TType::I16 => self.read_i16().map(|_| ()),
+            TType::I32 => self.read_i32().map(|_| ()),
+            TType::I64 => self.read_i64().map(|_| ()),
+            TType::Double => self.read_double().map(|_| ()),
+            TType::String => self.read_string().map(|_| ()),
+            TType::Struct => {
+                self.read_struct_begin()?;
+                loop {
+                    let field_ident = self.read_field_begin()?;
+                    if field_ident.field_type == TType::Stop {
+                        break;
+                    }
+                    self.skip_till_depth(field_ident.field_type, depth - 1)?;
+                }
+                self.read_struct_end()
+            }
+            TType::List => {
+                let list_ident = self.read_list_begin()?;
+                for _ in 0..list_ident.size {
+                    self.skip_till_depth(list_ident.element_type, depth - 1)?;
+                }
+                self.read_list_end()
+            }
+            TType::Set => {
+                let set_ident = self.read_set_begin()?;
+                for _ in 0..set_ident.size {
+                    self.skip_till_depth(set_ident.element_type, depth - 1)?;
+                }
+                self.read_set_end()
+            }
+            TType::Map => {
+                let map_ident = self.read_map_begin()?;
+                for _ in 0..map_ident.size {
+                    let key_type = map_ident.key_type
+                        .expect("non-zero sized map should contain key type");
+                    let val_type = map_ident.value_type
+                        .expect("non-zero sized map should contain value 
type");
+                    self.skip_till_depth(key_type, depth - 1)?;
+                    self.skip_till_depth(val_type, depth - 1)?;
+                }
+                self.read_map_end()
+            }
+            u => {
+                Err(::Error::Protocol(ProtocolError {
+                    kind: ProtocolErrorKind::Unknown,
+                    message: format!("cannot skip field type {:?}", &u),
+                }))
+            }
+        }
+    }
+
+    // utility (DO NOT USE IN GENERATED CODE!!!!)
+    //
+
+    /// Read an unsigned byte.
+    ///
+    /// This method should **never** be used in generated code.
+    fn read_byte(&mut self) -> ::Result<u8>;
+}
+
+/// Converts Thrift identifiers, primitives, containers or structs into a
+/// stream of bytes.
+///
+/// This trait does not deal with higher-level Thrift concepts like structs or
+/// exceptions - only with primitives and message or container boundaries.
+/// Write methods take an identifier (for example, `TMessageIdentifier`) or a
+/// primitive. Any or all of the fields in an identifier may be omitted when
+/// writing to the transport. Write methods may even be noops. All of this is
+/// transparent to the caller; as long as a matching `TInputProtocol`
+/// implementation is used, received messages will be decoded correctly.
+///
+/// All methods return a `thrift::Result`. If an `Err` is returned the protocol
+/// instance and its underlying transport should be terminated.
+///
+/// # Examples
+///
+/// Create and use a `TOutputProtocol`
+///
+/// ```no_run
+/// use std::cell::RefCell;
+/// use std::rc::Rc;
+/// use thrift::protocol::{TBinaryOutputProtocol, TFieldIdentifier, 
TOutputProtocol, TType};
+/// use thrift::transport::{TTcpTransport, TTransport};
+///
+/// let mut transport = TTcpTransport::new();
+/// transport.open("127.0.0.1:9090").unwrap();
+/// let transport = Rc::new(RefCell::new(Box::new(transport) as 
Box<TTransport>));
+///
+/// let mut o_prot = TBinaryOutputProtocol::new(transport.clone(), true);
+///
+/// o_prot.write_field_begin(&TFieldIdentifier::new("string_thing", 
TType::String, 1)).unwrap();
+/// o_prot.write_string("foo").unwrap();
+/// o_prot.write_field_end().unwrap();
+/// ```
+pub trait TOutputProtocol {
+    /// Write the beginning of a Thrift message.
+    fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> 
::Result<()>;
+    /// Write the end of a Thrift message.
+    fn write_message_end(&mut self) -> ::Result<()>;
+    /// Write the beginning of a Thrift struct.
+    fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> 
::Result<()>;
+    /// Write the end of a Thrift struct.
+    fn write_struct_end(&mut self) -> ::Result<()>;
+    /// Write the beginning of a Thrift field.
+    fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> 
::Result<()>;
+    /// Write the end of a Thrift field.
+    fn write_field_end(&mut self) -> ::Result<()>;
+    /// Write a STOP field indicating that all the fields in a struct have been
+    /// written.
+    fn write_field_stop(&mut self) -> ::Result<()>;
+    /// Write a bool.
+    fn write_bool(&mut self, b: bool) -> ::Result<()>;
+    /// Write a fixed-length byte array.
+    fn write_bytes(&mut self, b: &[u8]) -> ::Result<()>;
+    /// Write an 8-bit signed integer.
+    fn write_i8(&mut self, i: i8) -> ::Result<()>;
+    /// Write a 16-bit signed integer.
+    fn write_i16(&mut self, i: i16) -> ::Result<()>;
+    /// Write a 32-bit signed integer.
+    fn write_i32(&mut self, i: i32) -> ::Result<()>;
+    /// Write a 64-bit signed integer.
+    fn write_i64(&mut self, i: i64) -> ::Result<()>;
+    /// Write a 64-bit float.
+    fn write_double(&mut self, d: f64) -> ::Result<()>;
+    /// Write a fixed-length string.
+    fn write_string(&mut self, s: &str) -> ::Result<()>;
+    /// Write the beginning of a list.
+    fn write_list_begin(&mut self, identifier: &TListIdentifier) -> 
::Result<()>;
+    /// Write the end of a list.
+    fn write_list_end(&mut self) -> ::Result<()>;
+    /// Write the beginning of a set.
+    fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> ::Result<()>;
+    /// Write the end of a set.
+    fn write_set_end(&mut self) -> ::Result<()>;
+    /// Write the beginning of a map.
+    fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> ::Result<()>;
+    /// Write the end of a map.
+    fn write_map_end(&mut self) -> ::Result<()>;
+    /// Flush buffered bytes to the underlying transport.
+    fn flush(&mut self) -> ::Result<()>;
+
+    // utility (DO NOT USE IN GENERATED CODE!!!!)
+    //
+
+    /// Write an unsigned byte.
+    ///
+    /// This method should **never** be used in generated code.
+    fn write_byte(&mut self, b: u8) -> ::Result<()>; // FIXME: REMOVE
+}
+
+/// Helper type used by servers to create `TInputProtocol` instances for
+/// accepted client connections.
+///
+/// # Examples
+///
+/// Create a `TInputProtocolFactory` and use it to create a `TInputProtocol`.
+///
+/// ```no_run
+/// use std::cell::RefCell;
+/// use std::rc::Rc;
+/// use thrift::protocol::{TBinaryInputProtocolFactory, TInputProtocolFactory};
+/// use thrift::transport::{TTcpTransport, TTransport};
+///
+/// let mut transport = TTcpTransport::new();
+/// transport.open("127.0.0.1:9090").unwrap();
+/// let transport = Rc::new(RefCell::new(Box::new(transport) as 
Box<TTransport>));
+///
+/// let mut i_proto_factory = TBinaryInputProtocolFactory::new();
+/// let i_prot = i_proto_factory.create(transport);
+/// ```
+pub trait TInputProtocolFactory {
+    /// Create a `TInputProtocol` that reads bytes from `transport`.
+    fn create(&mut self, transport: Rc<RefCell<Box<TTransport>>>) -> 
Box<TInputProtocol>;
+}
+
+/// Helper type used by servers to create `TOutputProtocol` instances for
+/// accepted client connections.
+///
+/// # Examples
+///
+/// Create a `TOutputProtocolFactory` and use it to create a `TOutputProtocol`.
+///
+/// ```no_run
+/// use std::cell::RefCell;
+/// use std::rc::Rc;
+/// use thrift::protocol::{TBinaryOutputProtocolFactory, 
TOutputProtocolFactory};
+/// use thrift::transport::{TTcpTransport, TTransport};
+///
+/// let mut transport = TTcpTransport::new();
+/// transport.open("127.0.0.1:9090").unwrap();
+/// let transport = Rc::new(RefCell::new(Box::new(transport) as 
Box<TTransport>));
+///
+/// let mut o_proto_factory = TBinaryOutputProtocolFactory::new();
+/// let o_prot = o_proto_factory.create(transport);
+/// ```
+pub trait TOutputProtocolFactory {
+    /// Create a `TOutputProtocol` that writes bytes to `transport`.
+    fn create(&mut self, transport: Rc<RefCell<Box<TTransport>>>) -> 
Box<TOutputProtocol>;
+}
+
+/// Thrift message identifier.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct TMessageIdentifier {
+    /// Service call the message is associated with.
+    pub name: String,
+    /// Message type.
+    pub message_type: TMessageType,
+    /// Ordered sequence number identifying the message.
+    pub sequence_number: i32,
+}
+
+impl TMessageIdentifier {
+    /// Create a `TMessageIdentifier` for a Thrift service-call named `name`
+    /// with message type `message_type` and sequence number `sequence_number`.
+    pub fn new<S: Into<String>>(name: S,
+                                message_type: TMessageType,
+                                sequence_number: i32)
+                                -> TMessageIdentifier {
+        TMessageIdentifier {
+            name: name.into(),
+            message_type: message_type,
+            sequence_number: sequence_number,
+        }
+    }
+}
+
+/// Thrift struct identifier.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct TStructIdentifier {
+    /// Name of the encoded Thrift struct.
+    pub name: String,
+}
+
+impl TStructIdentifier {
+    /// Create a `TStructIdentifier` for a struct named `name`.
+    pub fn new<S: Into<String>>(name: S) -> TStructIdentifier {
+        TStructIdentifier { name: name.into() }
+    }
+}
+
+/// Thrift field identifier.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct TFieldIdentifier {
+    /// Name of the Thrift field.
+    ///
+    /// `None` if it's not sent over the wire.
+    pub name: Option<String>,
+    /// Field type.
+    ///
+    /// This may be a primitive, container, or a struct.
+    pub field_type: TType,
+    /// Thrift field id.
+    ///
+    /// `None` only if `field_type` is `TType::Stop`.
+    pub id: Option<i16>,
+}
+
+impl TFieldIdentifier {
+    /// Create a `TFieldIdentifier` for a field named `name` with type
+    /// `field_type` and field id `id`.
+    ///
+    /// `id` should be `None` if `field_type` is `TType::Stop`.
+    pub fn new<N, S, I>(name: N, field_type: TType, id: I) -> TFieldIdentifier
+        where N: Into<Option<S>>,
+              S: Into<String>,
+              I: Into<Option<i16>>
+    {
+        TFieldIdentifier {
+            name: name.into().map(|n| n.into()),
+            field_type: field_type,
+            id: id.into(),
+        }
+    }
+}
+
+/// Thrift list identifier.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct TListIdentifier {
+    /// Type of the elements in the list.
+    pub element_type: TType,
+    /// Number of elements in the list.
+    pub size: i32,
+}
+
+impl TListIdentifier {
+    /// Create a `TListIdentifier` for a list with `size` elements of type
+    /// `element_type`.
+    pub fn new(element_type: TType, size: i32) -> TListIdentifier {
+        TListIdentifier {
+            element_type: element_type,
+            size: size,
+        }
+    }
+}
+
+/// Thrift set identifier.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct TSetIdentifier {
+    /// Type of the elements in the set.
+    pub element_type: TType,
+    /// Number of elements in the set.
+    pub size: i32,
+}
+
+impl TSetIdentifier {
+    /// Create a `TSetIdentifier` for a set with `size` elements of type
+    /// `element_type`.
+    pub fn new(element_type: TType, size: i32) -> TSetIdentifier {
+        TSetIdentifier {
+            element_type: element_type,
+            size: size,
+        }
+    }
+}
+
+/// Thrift map identifier.
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct TMapIdentifier {
+    /// Map key type.
+    pub key_type: Option<TType>,
+    /// Map value type.
+    pub value_type: Option<TType>,
+    /// Number of entries in the map.
+    pub size: i32,
+}
+
+impl TMapIdentifier {
+    /// Create a `TMapIdentifier` for a map with `size` entries of type
+    /// `key_type -> value_type`.
+    pub fn new<K, V>(key_type: K, value_type: V, size: i32) -> TMapIdentifier
+        where K: Into<Option<TType>>,
+              V: Into<Option<TType>>
+    {
+        TMapIdentifier {
+            key_type: key_type.into(),
+            value_type: value_type.into(),
+            size: size,
+        }
+    }
+}
+
+/// Thrift message types.
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub enum TMessageType {
+    /// Service-call request.
+    Call,
+    /// Service-call response.
+    Reply,
+    /// Unexpected error in the remote service.
+    Exception,
+    /// One-way service-call request (no response is expected).
+    OneWay,
+}
+
+impl Display for TMessageType {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        match *self {
+            TMessageType::Call => write!(f, "Call"),
+            TMessageType::Reply => write!(f, "Reply"),
+            TMessageType::Exception => write!(f, "Exception"),
+            TMessageType::OneWay => write!(f, "OneWay"),
+        }
+    }
+}
+
+impl From<TMessageType> for u8 {
+    fn from(message_type: TMessageType) -> Self {
+        match message_type {
+            TMessageType::Call => 0x01,
+            TMessageType::Reply => 0x02,
+            TMessageType::Exception => 0x03,
+            TMessageType::OneWay => 0x04,
+        }
+    }
+}
+
+impl TryFrom<u8> for TMessageType {
+    type Err = ::Error;
+    fn try_from(b: u8) -> ::Result<Self> {
+        match b {
+            0x01 => Ok(TMessageType::Call),
+            0x02 => Ok(TMessageType::Reply),
+            0x03 => Ok(TMessageType::Exception),
+            0x04 => Ok(TMessageType::OneWay),
+            unkn => {
+                Err(::Error::Protocol(ProtocolError {
+                    kind: ProtocolErrorKind::InvalidData,
+                    message: format!("cannot convert {} to TMessageType", 
unkn),
+                }))
+            }
+        }
+    }
+}
+
+/// Thrift struct-field types.
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub enum TType {
+    /// Indicates that there are no more serialized fields in this Thrift 
struct.
+    Stop,
+    /// Void (`()`) field.
+    Void,
+    /// Boolean.
+    Bool,
+    /// Signed 8-bit int.
+    I08,
+    /// Double-precision number.
+    Double,
+    /// Signed 16-bit int.
+    I16,
+    /// Signed 32-bit int.
+    I32,
+    /// Signed 64-bit int.
+    I64,
+    /// UTF-8 string.
+    String,
+    /// UTF-7 string. *Unsupported*.
+    Utf7,
+    /// Thrift struct.
+    Struct,
+    /// Map.
+    Map,
+    /// Set.
+    Set,
+    /// List.
+    List,
+    /// UTF-8 string.
+    Utf8,
+    /// UTF-16 string. *Unsupported*.
+    Utf16,
+}
+
+impl Display for TType {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        match *self {
+            TType::Stop => write!(f, "STOP"),
+            TType::Void => write!(f, "void"),
+            TType::Bool => write!(f, "bool"),
+            TType::I08 => write!(f, "i08"),
+            TType::Double => write!(f, "double"),
+            TType::I16 => write!(f, "i16"),
+            TType::I32 => write!(f, "i32"),
+            TType::I64 => write!(f, "i64"),
+            TType::String => write!(f, "string"),
+            TType::Utf7 => write!(f, "UTF7"),
+            TType::Struct => write!(f, "struct"),
+            TType::Map => write!(f, "map"),
+            TType::Set => write!(f, "set"),
+            TType::List => write!(f, "list"),
+            TType::Utf8 => write!(f, "UTF8"),
+            TType::Utf16 => write!(f, "UTF16"),
+        }
+    }
+}
+
+/// Compare the expected message sequence number `expected` with the received
+/// message sequence number `actual`.
+///
+/// Return `()` if `actual == expected`, `Err` otherwise.
+pub fn verify_expected_sequence_number(expected: i32, actual: i32) -> 
::Result<()> {
+    if expected == actual {
+        Ok(())
+    } else {
+        Err(::Error::Application(::ApplicationError {
+            kind: ::ApplicationErrorKind::BadSequenceId,
+            message: format!("expected {} got {}", expected, actual),
+        }))
+    }
+}
+
+/// Compare the expected service-call name `expected` with the received
+/// service-call name `actual`.
+///
+/// Return `()` if `actual == expected`, `Err` otherwise.
+pub fn verify_expected_service_call(expected: &str, actual: &str) -> 
::Result<()> {
+    if expected == actual {
+        Ok(())
+    } else {
+        Err(::Error::Application(::ApplicationError {
+            kind: ::ApplicationErrorKind::WrongMethodName,
+            message: format!("expected {} got {}", expected, actual),
+        }))
+    }
+}
+
+/// Compare the expected message type `expected` with the received message type
+/// `actual`.
+///
+/// Return `()` if `actual == expected`, `Err` otherwise.
+pub fn verify_expected_message_type(expected: TMessageType, actual: 
TMessageType) -> ::Result<()> {
+    if expected == actual {
+        Ok(())
+    } else {
+        Err(::Error::Application(::ApplicationError {
+            kind: ::ApplicationErrorKind::InvalidMessageType,
+            message: format!("expected {} got {}", expected, actual),
+        }))
+    }
+}
+
+/// Check if a required Thrift struct field exists.
+///
+/// Return `()` if it does, `Err` otherwise.
+pub fn verify_required_field_exists<T>(field_name: &str, field: &Option<T>) -> 
::Result<()> {
+    match *field {
+        Some(_) => Ok(()),
+        None => {
+            Err(::Error::Protocol(::ProtocolError {
+                kind: ::ProtocolErrorKind::Unknown,
+                message: format!("missing required field {}", field_name),
+            }))
+        }
+    }
+}
+
+/// Extract the field id from a Thrift field identifier.
+///
+/// `field_ident` must *not* have `TFieldIdentifier.field_type` of type 
`TType::Stop`.
+///
+/// Return `TFieldIdentifier.id` if an id exists, `Err` otherwise.
+pub fn field_id(field_ident: &TFieldIdentifier) -> ::Result<i16> {
+    field_ident.id.ok_or_else(|| {
+        ::Error::Protocol(::ProtocolError {
+            kind: ::ProtocolErrorKind::Unknown,
+            message: format!("missing field in in {:?}", field_ident),
+        })
+    })
+}

http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/protocol/multiplexed.rs
----------------------------------------------------------------------
diff --git a/lib/rs/src/protocol/multiplexed.rs 
b/lib/rs/src/protocol/multiplexed.rs
new file mode 100644
index 0000000..15fe608
--- /dev/null
+++ b/lib/rs/src/protocol/multiplexed.rs
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::{TFieldIdentifier, TListIdentifier, TMapIdentifier, 
TMessageIdentifier, TMessageType,
+            TOutputProtocol, TSetIdentifier, TStructIdentifier};
+
+/// `TOutputProtocol` that prefixes the service name to all outgoing Thrift
+/// messages.
+///
+/// A `TMultiplexedOutputProtocol` should be used when multiple Thrift services
+/// send messages over a single I/O channel. By prefixing service identifiers
+/// to outgoing messages receivers are able to demux them and route them to the
+/// appropriate service processor. Rust receivers must use a 
`TMultiplexedProcessor`
+/// to process incoming messages, while other languages must use their
+/// corresponding multiplexed processor implementations.
+///
+/// For example, given a service `TestService` and a service call `test_call`,
+/// this implementation would identify messages as originating from
+/// `TestService:test_call`.
+///
+/// # Examples
+///
+/// Create and use a `TMultiplexedOutputProtocol`.
+///
+/// ```no_run
+/// use std::cell::RefCell;
+/// use std::rc::Rc;
+/// use thrift::protocol::{TMessageIdentifier, TMessageType, TOutputProtocol};
+/// use thrift::protocol::{TBinaryOutputProtocol, TMultiplexedOutputProtocol};
+/// use thrift::transport::{TTcpTransport, TTransport};
+///
+/// let mut transport = TTcpTransport::new();
+/// transport.open("localhost:9090").unwrap();
+/// let transport = Rc::new(RefCell::new(Box::new(transport) as 
Box<TTransport>));
+///
+/// let o_prot = TBinaryOutputProtocol::new(transport, true);
+/// let mut o_prot = TMultiplexedOutputProtocol::new("service_name", 
Box::new(o_prot));
+///
+/// let ident = TMessageIdentifier::new("svc_call", TMessageType::Call, 1);
+/// o_prot.write_message_begin(&ident).unwrap();
+/// ```
+pub struct TMultiplexedOutputProtocol {
+    service_name: String,
+    inner: Box<TOutputProtocol>,
+}
+
+impl TMultiplexedOutputProtocol {
+    /// Create a `TMultiplexedOutputProtocol` that identifies outgoing messages
+    /// as originating from a service named `service_name` and sends them over
+    /// the `wrapped` `TOutputProtocol`. Outgoing messages are encoded and sent
+    /// by `wrapped`, not by this instance.
+    pub fn new(service_name: &str, wrapped: Box<TOutputProtocol>) -> 
TMultiplexedOutputProtocol {
+        TMultiplexedOutputProtocol {
+            service_name: service_name.to_owned(),
+            inner: wrapped,
+        }
+    }
+}
+
+// FIXME: avoid passthrough methods
+impl TOutputProtocol for TMultiplexedOutputProtocol {
+    fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> 
::Result<()> {
+        match identifier.message_type { // FIXME: is there a better way to 
override identifier here?
+            TMessageType::Call | TMessageType::OneWay => {
+                let identifier = TMessageIdentifier {
+                    name: format!("{}:{}", self.service_name, identifier.name),
+                    ..*identifier
+                };
+                self.inner.write_message_begin(&identifier)
+            }
+            _ => self.inner.write_message_begin(identifier),
+        }
+    }
+
+    fn write_message_end(&mut self) -> ::Result<()> {
+        self.inner.write_message_end()
+    }
+
+    fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> 
::Result<()> {
+        self.inner.write_struct_begin(identifier)
+    }
+
+    fn write_struct_end(&mut self) -> ::Result<()> {
+        self.inner.write_struct_end()
+    }
+
+    fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> 
::Result<()> {
+        self.inner.write_field_begin(identifier)
+    }
+
+    fn write_field_end(&mut self) -> ::Result<()> {
+        self.inner.write_field_end()
+    }
+
+    fn write_field_stop(&mut self) -> ::Result<()> {
+        self.inner.write_field_stop()
+    }
+
+    fn write_bytes(&mut self, b: &[u8]) -> ::Result<()> {
+        self.inner.write_bytes(b)
+    }
+
+    fn write_bool(&mut self, b: bool) -> ::Result<()> {
+        self.inner.write_bool(b)
+    }
+
+    fn write_i8(&mut self, i: i8) -> ::Result<()> {
+        self.inner.write_i8(i)
+    }
+
+    fn write_i16(&mut self, i: i16) -> ::Result<()> {
+        self.inner.write_i16(i)
+    }
+
+    fn write_i32(&mut self, i: i32) -> ::Result<()> {
+        self.inner.write_i32(i)
+    }
+
+    fn write_i64(&mut self, i: i64) -> ::Result<()> {
+        self.inner.write_i64(i)
+    }
+
+    fn write_double(&mut self, d: f64) -> ::Result<()> {
+        self.inner.write_double(d)
+    }
+
+    fn write_string(&mut self, s: &str) -> ::Result<()> {
+        self.inner.write_string(s)
+    }
+
+    fn write_list_begin(&mut self, identifier: &TListIdentifier) -> 
::Result<()> {
+        self.inner.write_list_begin(identifier)
+    }
+
+    fn write_list_end(&mut self) -> ::Result<()> {
+        self.inner.write_list_end()
+    }
+
+    fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> ::Result<()> 
{
+        self.inner.write_set_begin(identifier)
+    }
+
+    fn write_set_end(&mut self) -> ::Result<()> {
+        self.inner.write_set_end()
+    }
+
+    fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> ::Result<()> 
{
+        self.inner.write_map_begin(identifier)
+    }
+
+    fn write_map_end(&mut self) -> ::Result<()> {
+        self.inner.write_map_end()
+    }
+
+    fn flush(&mut self) -> ::Result<()> {
+        self.inner.flush()
+    }
+
+    // utility
+    //
+
+    fn write_byte(&mut self, b: u8) -> ::Result<()> {
+        self.inner.write_byte(b)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use std::cell::RefCell;
+    use std::rc::Rc;
+
+    use ::protocol::{TBinaryOutputProtocol, TMessageIdentifier, TMessageType, 
TOutputProtocol};
+    use ::transport::{TPassThruTransport, TTransport};
+    use ::transport::mem::TBufferTransport;
+
+    use super::*;
+
+    #[test]
+    fn must_write_message_begin_with_prefixed_service_name() {
+        let (trans, mut o_prot) = test_objects();
+
+        let ident = TMessageIdentifier::new("bar", TMessageType::Call, 2);
+        assert_success!(o_prot.write_message_begin(&ident));
+
+        let expected: [u8; 19] =
+            [0x80, 0x01 /* protocol identifier */, 0x00, 0x01 /* message type 
*/, 0x00,
+             0x00, 0x00, 0x07, 0x66, 0x6F, 0x6F /* "foo" */, 0x3A /* ":" */, 
0x62, 0x61,
+             0x72 /* "bar" */, 0x00, 0x00, 0x00, 0x02 /* sequence number */];
+
+        assert_eq!(&trans.borrow().write_buffer_to_vec(), &expected);
+    }
+
+    fn test_objects() -> (Rc<RefCell<Box<TBufferTransport>>>, 
TMultiplexedOutputProtocol) {
+        let mem = 
Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity(40, 40))));
+
+        let inner: Box<TTransport> = Box::new(TPassThruTransport { inner: 
mem.clone() });
+        let inner = Rc::new(RefCell::new(inner));
+
+        let o_prot = TBinaryOutputProtocol::new(inner.clone(), true);
+        let o_prot = TMultiplexedOutputProtocol::new("foo", Box::new(o_prot));
+
+        (mem, o_prot)
+    }
+}

http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/protocol/stored.rs
----------------------------------------------------------------------
diff --git a/lib/rs/src/protocol/stored.rs b/lib/rs/src/protocol/stored.rs
new file mode 100644
index 0000000..6826c00
--- /dev/null
+++ b/lib/rs/src/protocol/stored.rs
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::convert::Into;
+
+use ::ProtocolErrorKind;
+use super::{TFieldIdentifier, TListIdentifier, TMapIdentifier, 
TMessageIdentifier, TInputProtocol,
+            TSetIdentifier, TStructIdentifier};
+
+/// `TInputProtocol` required to use a `TMultiplexedProcessor`.
+///
+/// A `TMultiplexedProcessor` reads incoming message identifiers to determine 
to
+/// which `TProcessor` requests should be forwarded. However, once read, those
+/// message identifier bytes are no longer on the wire. Since downstream
+/// processors expect to read message identifiers from the given input protocol
+/// we need some way of supplying a `TMessageIdentifier` with the service-name
+/// stripped. This implementation stores the received `TMessageIdentifier`
+/// (without the service name) and passes it to the wrapped `TInputProtocol`
+/// when `TInputProtocol::read_message_begin(...)` is called. It delegates all
+/// other calls directly to the wrapped `TInputProtocol`.
+///
+/// This type **should not** be used by application code.
+///
+/// # Examples
+///
+/// Create and use a `TStoredInputProtocol`.
+///
+/// ```no_run
+/// use std::cell::RefCell;
+/// use std::rc::Rc;
+/// use thrift;
+/// use thrift::protocol::{TInputProtocol, TMessageIdentifier, TMessageType, 
TOutputProtocol};
+/// use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol, 
TStoredInputProtocol};
+/// use thrift::server::TProcessor;
+/// use thrift::transport::{TTcpTransport, TTransport};
+///
+/// // sample processor
+/// struct ActualProcessor;
+/// impl TProcessor for ActualProcessor {
+///     fn process(
+///         &mut self,
+///         _: &mut TInputProtocol,
+///         _: &mut TOutputProtocol
+///     ) -> thrift::Result<()> {
+///         unimplemented!()
+///     }
+/// }
+/// let mut processor = ActualProcessor {};
+///
+/// // construct the shared transport
+/// let mut transport = TTcpTransport::new();
+/// transport.open("localhost:9090").unwrap();
+/// let transport = Rc::new(RefCell::new(Box::new(transport) as 
Box<TTransport>));
+///
+/// // construct the actual input and output protocols
+/// let mut i_prot = TBinaryInputProtocol::new(transport.clone(), true);
+/// let mut o_prot = TBinaryOutputProtocol::new(transport.clone(), true);
+///
+/// // message identifier received from remote and modified to remove the 
service name
+/// let new_msg_ident = TMessageIdentifier::new("service_call", 
TMessageType::Call, 1);
+///
+/// // construct the proxy input protocol
+/// let mut proxy_i_prot = TStoredInputProtocol::new(&mut i_prot, 
new_msg_ident);
+/// let res = processor.process(&mut proxy_i_prot, &mut o_prot);
+/// ```
+pub struct TStoredInputProtocol<'a> {
+    inner: &'a mut TInputProtocol,
+    message_ident: Option<TMessageIdentifier>,
+}
+
+impl<'a> TStoredInputProtocol<'a> {
+    /// Create a `TStoredInputProtocol` that delegates all calls other than
+    /// `TInputProtocol::read_message_begin(...)` to a `wrapped`
+    /// `TInputProtocol`. `message_ident` is the modified message identifier -
+    /// with service name stripped - that will be passed to
+    /// `wrapped.read_message_begin(...)`.
+    pub fn new(wrapped: &mut TInputProtocol,
+               message_ident: TMessageIdentifier)
+               -> TStoredInputProtocol {
+        TStoredInputProtocol {
+            inner: wrapped,
+            message_ident: message_ident.into(),
+        }
+    }
+}
+
+impl<'a> TInputProtocol for TStoredInputProtocol<'a> {
+    fn read_message_begin(&mut self) -> ::Result<TMessageIdentifier> {
+        self.message_ident.take().ok_or_else(|| {
+            ::errors::new_protocol_error(ProtocolErrorKind::Unknown,
+                                         "message identifier already read")
+        })
+    }
+
+    fn read_message_end(&mut self) -> ::Result<()> {
+        self.inner.read_message_end()
+    }
+
+    fn read_struct_begin(&mut self) -> ::Result<Option<TStructIdentifier>> {
+        self.inner.read_struct_begin()
+    }
+
+    fn read_struct_end(&mut self) -> ::Result<()> {
+        self.inner.read_struct_end()
+    }
+
+    fn read_field_begin(&mut self) -> ::Result<TFieldIdentifier> {
+        self.inner.read_field_begin()
+    }
+
+    fn read_field_end(&mut self) -> ::Result<()> {
+        self.inner.read_field_end()
+    }
+
+    fn read_bytes(&mut self) -> ::Result<Vec<u8>> {
+        self.inner.read_bytes()
+    }
+
+    fn read_bool(&mut self) -> ::Result<bool> {
+        self.inner.read_bool()
+    }
+
+    fn read_i8(&mut self) -> ::Result<i8> {
+        self.inner.read_i8()
+    }
+
+    fn read_i16(&mut self) -> ::Result<i16> {
+        self.inner.read_i16()
+    }
+
+    fn read_i32(&mut self) -> ::Result<i32> {
+        self.inner.read_i32()
+    }
+
+    fn read_i64(&mut self) -> ::Result<i64> {
+        self.inner.read_i64()
+    }
+
+    fn read_double(&mut self) -> ::Result<f64> {
+        self.inner.read_double()
+    }
+
+    fn read_string(&mut self) -> ::Result<String> {
+        self.inner.read_string()
+    }
+
+    fn read_list_begin(&mut self) -> ::Result<TListIdentifier> {
+        self.inner.read_list_begin()
+    }
+
+    fn read_list_end(&mut self) -> ::Result<()> {
+        self.inner.read_list_end()
+    }
+
+    fn read_set_begin(&mut self) -> ::Result<TSetIdentifier> {
+        self.inner.read_set_begin()
+    }
+
+    fn read_set_end(&mut self) -> ::Result<()> {
+        self.inner.read_set_end()
+    }
+
+    fn read_map_begin(&mut self) -> ::Result<TMapIdentifier> {
+        self.inner.read_map_begin()
+    }
+
+    fn read_map_end(&mut self) -> ::Result<()> {
+        self.inner.read_map_end()
+    }
+
+    // utility
+    //
+
+    fn read_byte(&mut self) -> ::Result<u8> {
+        self.inner.read_byte()
+    }
+}

http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/server/mod.rs
----------------------------------------------------------------------
diff --git a/lib/rs/src/server/mod.rs b/lib/rs/src/server/mod.rs
new file mode 100644
index 0000000..ceac18a
--- /dev/null
+++ b/lib/rs/src/server/mod.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Types required to implement a Thrift server.
+
+use ::protocol::{TInputProtocol, TOutputProtocol};
+
+mod simple;
+mod multiplexed;
+
+pub use self::simple::TSimpleServer;
+pub use self::multiplexed::TMultiplexedProcessor;
+
+/// Handles incoming Thrift messages and dispatches them to the user-defined
+/// handler functions.
+///
+/// An implementation is auto-generated for each Thrift service. When used by a
+/// server (for example, a `TSimpleServer`), it will demux incoming service
+/// calls and invoke the corresponding user-defined handler function.
+///
+/// # Examples
+///
+/// Create and start a server using the auto-generated `TProcessor` for
+/// a Thrift service `SimpleService`.
+///
+/// ```no_run
+/// use thrift;
+/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
+/// use thrift::server::TProcessor;
+///
+/// //
+/// // auto-generated
+/// //
+///
+/// // processor for `SimpleService`
+/// struct SimpleServiceSyncProcessor;
+/// impl SimpleServiceSyncProcessor {
+///     fn new<H: SimpleServiceSyncHandler>(processor: H) -> 
SimpleServiceSyncProcessor {
+///         unimplemented!();
+///     }
+/// }
+///
+/// // `TProcessor` implementation for `SimpleService`
+/// impl TProcessor for SimpleServiceSyncProcessor {
+///     fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) 
-> thrift::Result<()> {
+///         unimplemented!();
+///     }
+/// }
+///
+/// // service functions for SimpleService
+/// trait SimpleServiceSyncHandler {
+///     fn service_call(&mut self) -> thrift::Result<()>;
+/// }
+///
+/// //
+/// // user-code follows
+/// //
+///
+/// // define a handler that will be invoked when `service_call` is received
+/// struct SimpleServiceHandlerImpl;
+/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
+///     fn service_call(&mut self) -> thrift::Result<()> {
+///         unimplemented!();
+///     }
+/// }
+///
+/// // instantiate the processor
+/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl 
{});
+///
+/// // at this point you can pass the processor to the server
+/// // let server = TSimpleServer::new(..., processor);
+/// ```
+pub trait TProcessor {
+    /// Process a Thrift service call.
+    ///
+    /// Reads arguments from `i`, executes the user's handler code, and writes
+    /// the response to `o`.
+    ///
+    /// Returns `()` if the handler was executed; `Err` otherwise.
+    fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> 
::Result<()>;
+}

http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/server/multiplexed.rs
----------------------------------------------------------------------
diff --git a/lib/rs/src/server/multiplexed.rs b/lib/rs/src/server/multiplexed.rs
new file mode 100644
index 0000000..d2314a1
--- /dev/null
+++ b/lib/rs/src/server/multiplexed.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::convert::Into;
+
+use ::{new_application_error, ApplicationErrorKind};
+use ::protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, 
TStoredInputProtocol};
+
+use super::TProcessor;
+
+/// A `TProcessor` that can demux service calls to multiple underlying
+/// Thrift services.
+///
+/// Users register service-specific `TProcessor` instances with a
+/// `TMultiplexedProcessor`, and then register that processor with a server
+/// implementation. Following that, all incoming service calls are 
automatically
+/// routed to the service-specific `TProcessor`.
+///
+/// A `TMultiplexedProcessor` can only handle messages sent by a
+/// `TMultiplexedOutputProtocol`.
+pub struct TMultiplexedProcessor {
+    processors: HashMap<String, Box<TProcessor>>,
+}
+
+impl TMultiplexedProcessor {
+    /// Register a service-specific `processor` for the service named
+    /// `service_name`.
+    ///
+    /// Return `true` if this is the first registration for `service_name`.
+    ///
+    /// Return `false` if a mapping previously existed (the previous mapping is
+    /// *not* overwritten).
+    #[cfg_attr(feature = "cargo-clippy", allow(map_entry))]
+    pub fn register_processor<S: Into<String>>(&mut self,
+                                               service_name: S,
+                                               processor: Box<TProcessor>)
+                                               -> bool {
+        let name = service_name.into();
+        if self.processors.contains_key(&name) {
+            false
+        } else {
+            self.processors.insert(name, processor);
+            true
+        }
+    }
+}
+
+impl TProcessor for TMultiplexedProcessor {
+    fn process(&mut self,
+               i_prot: &mut TInputProtocol,
+               o_prot: &mut TOutputProtocol)
+               -> ::Result<()> {
+        let msg_ident = i_prot.read_message_begin()?;
+        let sep_index = msg_ident.name
+            .find(':')
+            .ok_or_else(|| {
+                new_application_error(ApplicationErrorKind::Unknown,
+                                      "no service separator found in incoming 
message")
+            })?;
+
+        let (svc_name, svc_call) = msg_ident.name.split_at(sep_index);
+
+        match self.processors.get_mut(svc_name) {
+            Some(ref mut processor) => {
+                let new_msg_ident = TMessageIdentifier::new(svc_call,
+                                                            
msg_ident.message_type,
+                                                            
msg_ident.sequence_number);
+                let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, 
new_msg_ident);
+                processor.process(&mut proxy_i_prot, o_prot)
+            }
+            None => {
+                Err(new_application_error(ApplicationErrorKind::Unknown,
+                                          format!("no processor found for 
service {}", svc_name)))
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/server/simple.rs
----------------------------------------------------------------------
diff --git a/lib/rs/src/server/simple.rs b/lib/rs/src/server/simple.rs
new file mode 100644
index 0000000..89ed977
--- /dev/null
+++ b/lib/rs/src/server/simple.rs
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::net::{TcpListener, TcpStream};
+use std::rc::Rc;
+
+use ::{ApplicationError, ApplicationErrorKind};
+use ::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
+use ::transport::{TTcpTransport, TTransport, TTransportFactory};
+
+use super::TProcessor;
+
+/// Single-threaded blocking Thrift socket server.
+///
+/// A `TSimpleServer` listens on a given address and services accepted
+/// connections *synchronously* and *sequentially* - i.e. in a blocking manner,
+/// one at a time - on the main thread. Each accepted connection has an input
+/// half and an output half, each of which uses a `TTransport` and `TProtocol`
+/// to translate messages to and from byes. Any combination of `TProtocol` and
+/// `TTransport` may be used.
+///
+/// # Examples
+///
+/// Creating and running a `TSimpleServer` using Thrift-compiler-generated
+/// service code.
+///
+/// ```no_run
+/// use thrift;
+/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
+/// use thrift::protocol::{TBinaryInputProtocolFactory, 
TBinaryOutputProtocolFactory};
+/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
+/// use thrift::transport::{TBufferedTransportFactory, TTransportFactory};
+/// use thrift::server::{TProcessor, TSimpleServer};
+///
+/// //
+/// // auto-generated
+/// //
+///
+/// // processor for `SimpleService`
+/// struct SimpleServiceSyncProcessor;
+/// impl SimpleServiceSyncProcessor {
+///     fn new<H: SimpleServiceSyncHandler>(processor: H) -> 
SimpleServiceSyncProcessor {
+///         unimplemented!();
+///     }
+/// }
+///
+/// // `TProcessor` implementation for `SimpleService`
+/// impl TProcessor for SimpleServiceSyncProcessor {
+///     fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) 
-> thrift::Result<()> {
+///         unimplemented!();
+///     }
+/// }
+///
+/// // service functions for SimpleService
+/// trait SimpleServiceSyncHandler {
+///     fn service_call(&mut self) -> thrift::Result<()>;
+/// }
+///
+/// //
+/// // user-code follows
+/// //
+///
+/// // define a handler that will be invoked when `service_call` is received
+/// struct SimpleServiceHandlerImpl;
+/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
+///     fn service_call(&mut self) -> thrift::Result<()> {
+///         unimplemented!();
+///     }
+/// }
+///
+/// // instantiate the processor
+/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl 
{});
+///
+/// // instantiate the server
+/// let i_tr_fact: Box<TTransportFactory> = 
Box::new(TBufferedTransportFactory::new());
+/// let i_pr_fact: Box<TInputProtocolFactory> = 
Box::new(TBinaryInputProtocolFactory::new());
+/// let o_tr_fact: Box<TTransportFactory> = 
Box::new(TBufferedTransportFactory::new());
+/// let o_pr_fact: Box<TOutputProtocolFactory> = 
Box::new(TBinaryOutputProtocolFactory::new());
+///
+/// let mut server = TSimpleServer::new(
+///     i_tr_fact,
+///     i_pr_fact,
+///     o_tr_fact,
+///     o_pr_fact,
+///     processor
+/// );
+///
+/// // start listening for incoming connections
+/// match server.listen("127.0.0.1:8080") {
+///   Ok(_)  => println!("listen completed"),
+///   Err(e) => println!("listen failed with error {:?}", e),
+/// }
+/// ```
+pub struct TSimpleServer<PR: TProcessor> {
+    i_trans_factory: Box<TTransportFactory>,
+    i_proto_factory: Box<TInputProtocolFactory>,
+    o_trans_factory: Box<TTransportFactory>,
+    o_proto_factory: Box<TOutputProtocolFactory>,
+    processor: PR,
+}
+
+impl<PR: TProcessor> TSimpleServer<PR> {
+    /// Create a `TSimpleServer`.
+    ///
+    /// Each accepted connection has an input and output half, each of which
+    /// requires a `TTransport` and `TProtocol`. `TSimpleServer` uses
+    /// `input_transport_factory` and `input_protocol_factory` to create
+    /// implementations for the input, and `output_transport_factory` and
+    /// `output_protocol_factory` to create implementations for the output.
+    pub fn new(input_transport_factory: Box<TTransportFactory>,
+               input_protocol_factory: Box<TInputProtocolFactory>,
+               output_transport_factory: Box<TTransportFactory>,
+               output_protocol_factory: Box<TOutputProtocolFactory>,
+               processor: PR)
+               -> TSimpleServer<PR> {
+        TSimpleServer {
+            i_trans_factory: input_transport_factory,
+            i_proto_factory: input_protocol_factory,
+            o_trans_factory: output_transport_factory,
+            o_proto_factory: output_protocol_factory,
+            processor: processor,
+        }
+    }
+
+    /// Listen for incoming connections on `listen_address`.
+    ///
+    /// `listen_address` should be in the form `host:port`,
+    /// for example: `127.0.0.1:8080`.
+    ///
+    /// Return `()` if successful.
+    ///
+    /// Return `Err` when the server cannot bind to `listen_address` or there
+    /// is an unrecoverable error.
+    pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
+        let listener = TcpListener::bind(listen_address)?;
+        for stream in listener.incoming() {
+            match stream {
+                Ok(s) => self.handle_incoming_connection(s),
+                Err(e) => warn!("failed to accept remote connection with error 
{:?}", e),
+            }
+        }
+
+        Err(::Error::Application(ApplicationError {
+            kind: ApplicationErrorKind::Unknown,
+            message: "aborted listen loop".into(),
+        }))
+    }
+
+    fn handle_incoming_connection(&mut self, stream: TcpStream) {
+        // create the shared tcp stream
+        let stream = TTcpTransport::with_stream(stream);
+        let stream: Box<TTransport> = Box::new(stream);
+        let stream = Rc::new(RefCell::new(stream));
+
+        // input protocol and transport
+        let i_tran = self.i_trans_factory.create(stream.clone());
+        let i_tran = Rc::new(RefCell::new(i_tran));
+        let mut i_prot = self.i_proto_factory.create(i_tran);
+
+        // output protocol and transport
+        let o_tran = self.o_trans_factory.create(stream.clone());
+        let o_tran = Rc::new(RefCell::new(o_tran));
+        let mut o_prot = self.o_proto_factory.create(o_tran);
+
+        // process loop
+        loop {
+            let r = self.processor.process(&mut *i_prot, &mut *o_prot);
+            if let Err(e) = r {
+                warn!("processor failed with error: {:?}", e);
+                break; // FIXME: close here
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/transport/buffered.rs
----------------------------------------------------------------------
diff --git a/lib/rs/src/transport/buffered.rs b/lib/rs/src/transport/buffered.rs
new file mode 100644
index 0000000..3f240d8
--- /dev/null
+++ b/lib/rs/src/transport/buffered.rs
@@ -0,0 +1,400 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cell::RefCell;
+use std::cmp;
+use std::io;
+use std::io::{Read, Write};
+use std::rc::Rc;
+
+use super::{TTransport, TTransportFactory};
+
+/// Default capacity of the read buffer in bytes.
+const DEFAULT_RBUFFER_CAPACITY: usize = 4096;
+
+/// Default capacity of the write buffer in bytes..
+const DEFAULT_WBUFFER_CAPACITY: usize = 4096;
+
+/// Transport that communicates with endpoints using a byte stream.
+///
+/// A `TBufferedTransport` maintains a fixed-size internal write buffer. All
+/// writes are made to this buffer and are sent to the wrapped transport only
+/// when `TTransport::flush()` is called. On a flush a fixed-length header 
with a
+/// count of the buffered bytes is written, followed by the bytes themselves.
+///
+/// A `TBufferedTransport` also maintains a fixed-size internal read buffer.
+/// On a call to `TTransport::read(...)` one full message - both fixed-length
+/// header and bytes - is read from the wrapped transport and buffered.
+/// Subsequent read calls are serviced from the internal buffer until it is
+/// exhausted, at which point the next full message is read from the wrapped
+/// transport.
+///
+/// # Examples
+///
+/// Create and use a `TBufferedTransport`.
+///
+/// ```no_run
+/// use std::cell::RefCell;
+/// use std::rc::Rc;
+/// use std::io::{Read, Write};
+/// use thrift::transport::{TBufferedTransport, TTcpTransport, TTransport};
+///
+/// let mut t = TTcpTransport::new();
+/// t.open("localhost:9090").unwrap();
+///
+/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
+/// let mut t = TBufferedTransport::new(t);
+///
+/// // read
+/// t.read(&mut vec![0u8; 1]).unwrap();
+///
+/// // write
+/// t.write(&[0x00]).unwrap();
+/// t.flush().unwrap();
+/// ```
+pub struct TBufferedTransport {
+    rbuf: Box<[u8]>,
+    rpos: usize,
+    rcap: usize,
+    wbuf: Vec<u8>,
+    inner: Rc<RefCell<Box<TTransport>>>,
+}
+
+impl TBufferedTransport {
+    /// Create a `TBufferedTransport` with default-sized internal read and
+    /// write buffers that wraps an `inner` `TTransport`.
+    pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TBufferedTransport {
+        TBufferedTransport::with_capacity(DEFAULT_RBUFFER_CAPACITY, 
DEFAULT_WBUFFER_CAPACITY, inner)
+    }
+
+    /// Create a `TBufferedTransport` with an internal read buffer of size
+    /// `read_buffer_capacity` and an internal write buffer of size
+    /// `write_buffer_capacity` that wraps an `inner` `TTransport`.
+    pub fn with_capacity(read_buffer_capacity: usize,
+                         write_buffer_capacity: usize,
+                         inner: Rc<RefCell<Box<TTransport>>>)
+                         -> TBufferedTransport {
+        TBufferedTransport {
+            rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
+            rpos: 0,
+            rcap: 0,
+            wbuf: Vec::with_capacity(write_buffer_capacity),
+            inner: inner,
+        }
+    }
+
+    fn get_bytes(&mut self) -> io::Result<&[u8]> {
+        if self.rcap - self.rpos == 0 {
+            self.rpos = 0;
+            self.rcap = self.inner.borrow_mut().read(&mut self.rbuf)?;
+        }
+
+        Ok(&self.rbuf[self.rpos..self.rcap])
+    }
+
+    fn consume(&mut self, consumed: usize) {
+        // TODO: was a bug here += <-- test somehow
+        self.rpos = cmp::min(self.rcap, self.rpos + consumed);
+    }
+}
+
+impl Read for TBufferedTransport {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        let mut bytes_read = 0;
+
+        loop {
+            let nread = {
+                let avail_bytes = self.get_bytes()?;
+                let avail_space = buf.len() - bytes_read;
+                let nread = cmp::min(avail_space, avail_bytes.len());
+                buf[bytes_read..(bytes_read + 
nread)].copy_from_slice(&avail_bytes[..nread]);
+                nread
+            };
+
+            self.consume(nread);
+            bytes_read += nread;
+
+            if bytes_read == buf.len() || nread == 0 {
+                break;
+            }
+        }
+
+        Ok(bytes_read)
+    }
+}
+
+impl Write for TBufferedTransport {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        let avail_bytes = cmp::min(buf.len(), self.wbuf.capacity() - 
self.wbuf.len());
+        self.wbuf.extend_from_slice(&buf[..avail_bytes]);
+        assert!(self.wbuf.len() <= self.wbuf.capacity(),
+                "copy overflowed buffer");
+        Ok(avail_bytes)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.inner.borrow_mut().write_all(&self.wbuf)?;
+        self.inner.borrow_mut().flush()?;
+        self.wbuf.clear();
+        Ok(())
+    }
+}
+
+/// Factory for creating instances of `TBufferedTransport`
+#[derive(Default)]
+pub struct TBufferedTransportFactory;
+
+impl TBufferedTransportFactory {
+    /// Create a `TBufferedTransportFactory`.
+    pub fn new() -> TBufferedTransportFactory {
+        TBufferedTransportFactory {}
+    }
+}
+
+impl TTransportFactory for TBufferedTransportFactory {
+    fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> {
+        Box::new(TBufferedTransport::new(inner)) as Box<TTransport>
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::cell::RefCell;
+    use std::io::{Read, Write};
+    use std::rc::Rc;
+
+    use super::*;
+    use ::transport::{TPassThruTransport, TTransport};
+    use ::transport::mem::TBufferTransport;
+
+    macro_rules! new_transports {
+        ($wbc:expr, $rbc:expr) => (
+            {
+                let mem = 
Rc::new(RefCell::new(Box::new(TBufferTransport::with_capacity($wbc, $rbc))));
+                let thru: Box<TTransport> = Box::new(TPassThruTransport { 
inner: mem.clone() });
+                let thru = Rc::new(RefCell::new(thru));
+                (mem, thru)
+            }
+        );
+    }
+
+    #[test]
+    fn must_return_zero_if_read_buffer_is_empty() {
+        let (_, thru) = new_transports!(10, 0);
+        let mut t = TBufferedTransport::with_capacity(10, 0, thru);
+
+        let mut b = vec![0; 10];
+        let read_result = t.read(&mut b);
+
+        assert_eq!(read_result.unwrap(), 0);
+    }
+
+    #[test]
+    fn must_return_zero_if_caller_reads_into_zero_capacity_buffer() {
+        let (_, thru) = new_transports!(10, 0);
+        let mut t = TBufferedTransport::with_capacity(10, 0, thru);
+
+        let read_result = t.read(&mut []);
+
+        assert_eq!(read_result.unwrap(), 0);
+    }
+
+    #[test]
+    fn must_return_zero_if_nothing_more_can_be_read() {
+        let (mem, thru) = new_transports!(4, 0);
+        let mut t = TBufferedTransport::with_capacity(4, 0, thru);
+
+        mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]);
+
+        // read buffer is exactly the same size as bytes available
+        let mut buf = vec![0u8; 4];
+        let read_result = t.read(&mut buf);
+
+        // we've read exactly 4 bytes
+        assert_eq!(read_result.unwrap(), 4);
+        assert_eq!(&buf, &[0, 1, 2, 3]);
+
+        // try read again
+        let buf_again = vec![0u8; 4];
+        let read_result = t.read(&mut buf);
+
+        // this time, 0 bytes and we haven't changed the buffer
+        assert_eq!(read_result.unwrap(), 0);
+        assert_eq!(&buf_again, &[0, 0, 0, 0])
+    }
+
+    #[test]
+    fn must_fill_user_buffer_with_only_as_many_bytes_as_available() {
+        let (mem, thru) = new_transports!(4, 0);
+        let mut t = TBufferedTransport::with_capacity(4, 0, thru);
+
+        mem.borrow_mut().set_readable_bytes(&[0, 1, 2, 3]);
+
+        // read buffer is much larger than the bytes available
+        let mut buf = vec![0u8; 8];
+        let read_result = t.read(&mut buf);
+
+        // we've read exactly 4 bytes
+        assert_eq!(read_result.unwrap(), 4);
+        assert_eq!(&buf[..4], &[0, 1, 2, 3]);
+
+        // try read again
+        let read_result = t.read(&mut buf[4..]);
+
+        // this time, 0 bytes and we haven't changed the buffer
+        assert_eq!(read_result.unwrap(), 0);
+        assert_eq!(&buf, &[0, 1, 2, 3, 0, 0, 0, 0])
+    }
+
+    #[test]
+    fn must_read_successfully() {
+        // this test involves a few loops within the buffered transport
+        // itself where it has to drain the underlying transport in order
+        // to service a read
+
+        // we have a much smaller buffer than the
+        // underlying transport has bytes available
+        let (mem, thru) = new_transports!(10, 0);
+        let mut t = TBufferedTransport::with_capacity(2, 0, thru);
+
+        // fill the underlying transport's byte buffer
+        let mut readable_bytes = [0u8; 10];
+        for i in 0..10 {
+            readable_bytes[i] = i as u8;
+        }
+        mem.borrow_mut().set_readable_bytes(&readable_bytes);
+
+        // we ask to read into a buffer that's much larger
+        // than the one the buffered transport has; as a result
+        // it's going to have to keep asking the underlying
+        // transport for more bytes
+        let mut buf = [0u8; 8];
+        let read_result = t.read(&mut buf);
+
+        // we should have read 8 bytes
+        assert_eq!(read_result.unwrap(), 8);
+        assert_eq!(&buf, &[0, 1, 2, 3, 4, 5, 6, 7]);
+
+        // let's clear out the buffer and try read again
+        for i in 0..8 {
+            buf[i] = 0;
+        }
+        let read_result = t.read(&mut buf);
+
+        // this time we were only able to read 2 bytes
+        // (all that's remaining from the underlying transport)
+        // let's also check that the remaining bytes are untouched
+        assert_eq!(read_result.unwrap(), 2);
+        assert_eq!(&buf[0..2], &[8, 9]);
+        assert_eq!(&buf[2..], &[0, 0, 0, 0, 0, 0]);
+
+        // try read again (we should get 0)
+        // and all the existing bytes were untouched
+        let read_result = t.read(&mut buf);
+        assert_eq!(read_result.unwrap(), 0);
+        assert_eq!(&buf[0..2], &[8, 9]);
+        assert_eq!(&buf[2..], &[0, 0, 0, 0, 0, 0]);
+    }
+
+    #[test]
+    fn must_return_zero_if_nothing_can_be_written() {
+        let (_, thru) = new_transports!(0, 0);
+        let mut t = TBufferedTransport::with_capacity(0, 0, thru);
+
+        let b = vec![0; 10];
+        let r = t.write(&b);
+
+        assert_eq!(r.unwrap(), 0);
+    }
+
+    #[test]
+    fn must_return_zero_if_caller_calls_write_with_empty_buffer() {
+        let (mem, thru) = new_transports!(0, 10);
+        let mut t = TBufferedTransport::with_capacity(0, 10, thru);
+
+        let r = t.write(&[]);
+
+        assert_eq!(r.unwrap(), 0);
+        assert_eq!(mem.borrow_mut().write_buffer_as_ref(), &[]);
+    }
+
+    #[test]
+    fn must_return_zero_if_write_buffer_full() {
+        let (_, thru) = new_transports!(0, 0);
+        let mut t = TBufferedTransport::with_capacity(0, 4, thru);
+
+        let b = [0x00, 0x01, 0x02, 0x03];
+
+        // we've now filled the write buffer
+        let r = t.write(&b);
+        assert_eq!(r.unwrap(), 4);
+
+        // try write the same bytes again - nothing should be writable
+        let r = t.write(&b);
+        assert_eq!(r.unwrap(), 0);
+    }
+
+    #[test]
+    fn must_only_write_to_inner_transport_on_flush() {
+        let (mem, thru) = new_transports!(10, 10);
+        let mut t = TBufferedTransport::new(thru);
+
+        let b: [u8; 5] = [0, 1, 2, 3, 4];
+        assert_eq!(t.write(&b).unwrap(), 5);
+        assert_eq!(mem.borrow_mut().write_buffer_as_ref().len(), 0);
+
+        assert!(t.flush().is_ok());
+
+        {
+            let inner = mem.borrow_mut();
+            let underlying_buffer = inner.write_buffer_as_ref();
+            assert_eq!(b, underlying_buffer);
+        }
+    }
+
+    #[test]
+    fn must_write_successfully_after_flush() {
+        let (mem, thru) = new_transports!(0, 5);
+        let mut t = TBufferedTransport::with_capacity(0, 5, thru);
+
+        // write and flush
+        let b: [u8; 5] = [0, 1, 2, 3, 4];
+        assert_eq!(t.write(&b).unwrap(), 5);
+        assert!(t.flush().is_ok());
+
+        // check the flushed bytes
+        {
+            let inner = mem.borrow_mut();
+            let underlying_buffer = inner.write_buffer_as_ref();
+            assert_eq!(b, underlying_buffer);
+        }
+
+        // reset our underlying transport
+        mem.borrow_mut().empty_write_buffer();
+
+        // write and flush again
+        assert_eq!(t.write(&b).unwrap(), 5);
+        assert!(t.flush().is_ok());
+
+        // check the flushed bytes
+        {
+            let inner = mem.borrow_mut();
+            let underlying_buffer = inner.write_buffer_as_ref();
+            assert_eq!(b, underlying_buffer);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/thrift/blob/8b96bfbf/lib/rs/src/transport/framed.rs
----------------------------------------------------------------------
diff --git a/lib/rs/src/transport/framed.rs b/lib/rs/src/transport/framed.rs
new file mode 100644
index 0000000..75c12f4
--- /dev/null
+++ b/lib/rs/src/transport/framed.rs
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
+use std::cell::RefCell;
+use std::cmp;
+use std::io;
+use std::io::{ErrorKind, Read, Write};
+use std::rc::Rc;
+
+use super::{TTransport, TTransportFactory};
+
+/// Default capacity of the read buffer in bytes.
+const WRITE_BUFFER_CAPACITY: usize = 4096;
+
+/// Default capacity of the write buffer in bytes..
+const DEFAULT_WBUFFER_CAPACITY: usize = 4096;
+
+/// Transport that communicates with endpoints using framed messages.
+///
+/// A `TFramedTransport` maintains a fixed-size internal write buffer. All
+/// writes are made to this buffer and are sent to the wrapped transport only
+/// when `TTransport::flush()` is called. On a flush a fixed-length header 
with a
+/// count of the buffered bytes is written, followed by the bytes themselves.
+///
+/// A `TFramedTransport` also maintains a fixed-size internal read buffer.
+/// On a call to `TTransport::read(...)` one full message - both fixed-length
+/// header and bytes - is read from the wrapped transport and buffered.
+/// Subsequent read calls are serviced from the internal buffer until it is
+/// exhausted, at which point the next full message is read from the wrapped
+/// transport.
+///
+/// # Examples
+///
+/// Create and use a `TFramedTransport`.
+///
+/// ```no_run
+/// use std::cell::RefCell;
+/// use std::rc::Rc;
+/// use std::io::{Read, Write};
+/// use thrift::transport::{TFramedTransport, TTcpTransport, TTransport};
+///
+/// let mut t = TTcpTransport::new();
+/// t.open("localhost:9090").unwrap();
+///
+/// let t = Rc::new(RefCell::new(Box::new(t) as Box<TTransport>));
+/// let mut t = TFramedTransport::new(t);
+///
+/// // read
+/// t.read(&mut vec![0u8; 1]).unwrap();
+///
+/// // write
+/// t.write(&[0x00]).unwrap();
+/// t.flush().unwrap();
+/// ```
+pub struct TFramedTransport {
+    rbuf: Box<[u8]>,
+    rpos: usize,
+    rcap: usize,
+    wbuf: Box<[u8]>,
+    wpos: usize,
+    inner: Rc<RefCell<Box<TTransport>>>,
+}
+
+impl TFramedTransport {
+    /// Create a `TFramedTransport` with default-sized internal read and
+    /// write buffers that wraps an `inner` `TTransport`.
+    pub fn new(inner: Rc<RefCell<Box<TTransport>>>) -> TFramedTransport {
+        TFramedTransport::with_capacity(WRITE_BUFFER_CAPACITY, 
DEFAULT_WBUFFER_CAPACITY, inner)
+    }
+
+    /// Create a `TFramedTransport` with an internal read buffer of size
+    /// `read_buffer_capacity` and an internal write buffer of size
+    /// `write_buffer_capacity` that wraps an `inner` `TTransport`.
+    pub fn with_capacity(read_buffer_capacity: usize,
+                         write_buffer_capacity: usize,
+                         inner: Rc<RefCell<Box<TTransport>>>)
+                         -> TFramedTransport {
+        TFramedTransport {
+            rbuf: vec![0; read_buffer_capacity].into_boxed_slice(),
+            rpos: 0,
+            rcap: 0,
+            wbuf: vec![0; write_buffer_capacity].into_boxed_slice(),
+            wpos: 0,
+            inner: inner,
+        }
+    }
+}
+
+impl Read for TFramedTransport {
+    fn read(&mut self, b: &mut [u8]) -> io::Result<usize> {
+        if self.rcap - self.rpos == 0 {
+            let message_size = 
self.inner.borrow_mut().read_i32::<BigEndian>()? as usize;
+            if message_size > self.rbuf.len() {
+                return Err(io::Error::new(ErrorKind::Other,
+                                          format!("bytes to be read ({}) 
exceeds buffer \
+                                                   capacity ({})",
+                                                  message_size,
+                                                  self.rbuf.len())));
+            }
+            self.inner.borrow_mut().read_exact(&mut 
self.rbuf[..message_size])?;
+            self.rpos = 0;
+            self.rcap = message_size as usize;
+        }
+
+        let nread = cmp::min(b.len(), self.rcap - self.rpos);
+        b[..nread].clone_from_slice(&self.rbuf[self.rpos..self.rpos + nread]);
+        self.rpos += nread;
+
+        Ok(nread)
+    }
+}
+
+impl Write for TFramedTransport {
+    fn write(&mut self, b: &[u8]) -> io::Result<usize> {
+        if b.len() > (self.wbuf.len() - self.wpos) {
+            return Err(io::Error::new(ErrorKind::Other,
+                                      format!("bytes to be written ({}) 
exceeds buffer \
+                                               capacity ({})",
+                                              b.len(),
+                                              self.wbuf.len() - self.wpos)));
+        }
+
+        let nwrite = b.len(); // always less than available write buffer 
capacity
+        self.wbuf[self.wpos..(self.wpos + nwrite)].clone_from_slice(b);
+        self.wpos += nwrite;
+        Ok(nwrite)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        let message_size = self.wpos;
+
+        if let 0 = message_size {
+            return Ok(());
+        } else {
+            self.inner.borrow_mut().write_i32::<BigEndian>(message_size as 
i32)?;
+        }
+
+        let mut byte_index = 0;
+        while byte_index < self.wpos {
+            let nwrite = 
self.inner.borrow_mut().write(&self.wbuf[byte_index..self.wpos])?;
+            byte_index = cmp::min(byte_index + nwrite, self.wpos);
+        }
+
+        self.wpos = 0;
+        self.inner.borrow_mut().flush()
+    }
+}
+
+/// Factory for creating instances of `TFramedTransport`.
+#[derive(Default)]
+pub struct TFramedTransportFactory;
+
+impl TFramedTransportFactory {
+    // Create a `TFramedTransportFactory`.
+    pub fn new() -> TFramedTransportFactory {
+        TFramedTransportFactory {}
+    }
+}
+
+impl TTransportFactory for TFramedTransportFactory {
+    fn create(&self, inner: Rc<RefCell<Box<TTransport>>>) -> Box<TTransport> {
+        Box::new(TFramedTransport::new(inner)) as Box<TTransport>
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    //    use std::io::{Read, Write};
+    //
+    //    use super::*;
+    //    use ::transport::mem::TBufferTransport;
+}

Reply via email to