garyanaplan commented on a change in pull request #377:
URL: https://github.com/apache/arrow-rs/pull/377#discussion_r657291133
##########
File path: arrow-flight/src/lib.rs
##########
@@ -15,6 +15,441 @@
// specific language governing permissions and limitations
// under the License.
-include!("arrow.flight.protocol.rs");
+use arrow::datatypes::Schema;
+use arrow::error::{ArrowError, Result as ArrowResult};
+use arrow::ipc::{
+ convert, size_prefixed_root_as_message, writer, writer::EncodedData,
+ writer::IpcWriteOptions,
+};
+
+use std::{
+ convert::{TryFrom, TryInto},
+ fmt,
+ ops::Deref,
+};
+
+mod gen {
+ include!("arrow.flight.protocol.rs");
+}
+
+pub mod flight_descriptor {
+ use super::gen;
+ pub use gen::flight_descriptor::DescriptorType;
+}
+
+pub mod flight_service_client {
+ use super::gen;
+ pub use gen::flight_service_client::FlightServiceClient;
+}
+
+pub mod flight_service_server {
+ use super::gen;
+ pub use gen::flight_service_server::FlightService;
+ pub use gen::flight_service_server::FlightServiceServer;
+}
+
+pub use gen::Action;
+pub use gen::ActionType;
+pub use gen::BasicAuth;
+pub use gen::Criteria;
+pub use gen::Empty;
+pub use gen::FlightData;
+pub use gen::FlightDescriptor;
+pub use gen::FlightEndpoint;
+pub use gen::FlightInfo;
+pub use gen::HandshakeRequest;
+pub use gen::HandshakeResponse;
+pub use gen::Location;
+pub use gen::PutResult;
+pub use gen::Result;
+pub use gen::SchemaResult;
+pub use gen::Ticket;
pub mod utils;
+
+use flight_descriptor::DescriptorType;
+
+/// SchemaAsIpc represents a pairing of a `Schema` with IpcWriteOptions
+pub struct SchemaAsIpc<'a> {
+ pub pair: (&'a Schema, &'a IpcWriteOptions),
+}
+
+/// IpcMessage represents a `Schema` in the format expected in
+/// `FlightInfo.schema`
+#[derive(Debug)]
+pub struct IpcMessage(pub Vec<u8>);
+
+// Useful conversion functions
+
+fn flight_schema_as_encoded_data(
+ arrow_schema: &Schema,
+ options: &IpcWriteOptions,
+) -> EncodedData {
+ let data_gen = writer::IpcDataGenerator::default();
+ data_gen.schema_to_bytes(arrow_schema, options)
+}
+
+fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) ->
IpcMessage {
+ let encoded_data = flight_schema_as_encoded_data(schema, options);
+ IpcMessage(encoded_data.ipc_message)
+}
+
+// Implement a bunch of useful traits for various conversions, displays,
+// etc...
+
+// Deref
+
+impl Deref for IpcMessage {
+ type Target = Vec<u8>;
+
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
+}
+
+impl<'a> Deref for SchemaAsIpc<'a> {
+ type Target = (&'a Schema, &'a IpcWriteOptions);
+
+ fn deref(&self) -> &Self::Target {
+ &self.pair
+ }
+}
+
+// Display...
+
+/// Limits the output of value to limit...
+fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) ->
fmt::Result {
+ if value.len() > limit {
+ write!(f, "{:?}", &value[..limit])
+ } else {
+ write!(f, "{:?}", &value)
+ }
+}
+
+impl fmt::Display for FlightData {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "FlightData {{")?;
+ write!(f, " descriptor: ")?;
+ match &self.flight_descriptor {
+ Some(d) => write!(f, "{}", d)?,
+ None => write!(f, "None")?,
+ };
+ write!(f, ", header: ")?;
+ limited_fmt(f, &self.data_header, 8)?;
+ write!(f, ", metadata: ")?;
+ limited_fmt(f, &self.app_metadata, 8)?;
+ write!(f, ", body: ")?;
+ limited_fmt(f, &self.data_body, 8)?;
+ write!(f, " }}")
+ }
+}
+
+impl fmt::Display for FlightDescriptor {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "FlightDescriptor {{")?;
+ write!(f, " type: ")?;
+ match self.r#type() {
+ DescriptorType::Cmd => {
+ write!(f, "cmd, value: ")?;
+ limited_fmt(f, &self.cmd, 8)?;
+ }
+ DescriptorType::Path => {
+ write!(f, "path: [")?;
+ let mut sep = "";
+ for element in &self.path {
+ write!(f, "{}{}", sep, element)?;
+ sep = ", ";
+ }
+ write!(f, "]")?;
+ }
+ DescriptorType::Unknown => {
+ write!(f, "unknown")?;
+ }
+ }
+ write!(f, " }}")
+ }
+}
+
+impl fmt::Display for FlightEndpoint {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "FlightEndpoint {{")?;
+ write!(f, " ticket: ")?;
+ match &self.ticket {
+ Some(value) => write!(f, "{}", value),
+ None => write!(f, " none"),
+ }?;
+ write!(f, ", location: [")?;
+ let mut sep = "";
+ for location in &self.location {
+ write!(f, "{}{}", sep, location)?;
+ sep = ", ";
+ }
+ write!(f, "]")?;
+ write!(f, " }}")
+ }
+}
+
+impl fmt::Display for FlightInfo {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let ipc_message = IpcMessage(self.schema.clone());
+ let schema: Schema = ipc_message.try_into().map_err(|_err|
fmt::Error)?;
+ write!(f, "FlightInfo {{")?;
+ write!(f, " schema: {}", schema)?;
+ write!(f, ", descriptor:")?;
+ match &self.flight_descriptor {
+ Some(d) => write!(f, " {}", d),
+ None => write!(f, " None"),
+ }?;
+ write!(f, ", endpoint: [")?;
+ let mut sep = "";
+ for endpoint in &self.endpoint {
+ write!(f, "{}{}", sep, endpoint)?;
+ sep = ", ";
+ }
+ write!(f, "], total_records: {}", self.total_records)?;
+ write!(f, ", total_bytes: {}", self.total_bytes)?;
+ write!(f, " }}")
+ }
+}
+
+impl fmt::Display for Location {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Location {{")?;
+ write!(f, " uri: ")?;
+ write!(f, "{}", self.uri)
+ }
+}
+
+impl fmt::Display for Ticket {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Ticket {{")?;
+ write!(f, " ticket: ")?;
+ write!(f, "{}", base64::encode(&self.ticket))
+ }
+}
+
+// From...
+
+impl From<EncodedData> for FlightData {
+ fn from(data: EncodedData) -> Self {
+ FlightData {
+ data_header: data.ipc_message,
+ data_body: data.arrow_data,
+ ..Default::default()
+ }
+ }
+}
+
+impl From<SchemaAsIpc<'_>> for FlightData {
+ fn from(schema_ipc: SchemaAsIpc) -> Self {
+ let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0,
schema_ipc.1);
+ FlightData {
+ data_header: vals,
+ ..Default::default()
+ }
+ }
+}
+
+impl From<SchemaAsIpc<'_>> for SchemaResult {
+ fn from(schema_ipc: SchemaAsIpc) -> Self {
+ let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0,
schema_ipc.1);
+ SchemaResult { schema: vals }
+ }
+}
+
+// TryFrom...
+
+impl TryFrom<i32> for DescriptorType {
+ type Error = ArrowError;
+
+ fn try_from(value: i32) -> ArrowResult<Self> {
+ match value {
+ 0 => Ok(DescriptorType::Unknown),
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]