This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c8b990  ARROW-10801: [Rust] [Flight] Support sending FlightData for 
Dictionaries with that of a RecordBatch
0c8b990 is described below

commit 0c8b9903602e1cde0a20b825abf92d361af3c315
Author: Carol (Nichols || Goulding) <[email protected]>
AuthorDate: Sat Dec 12 23:48:38 2020 +0200

    ARROW-10801: [Rust] [Flight] Support sending FlightData for Dictionaries 
with that of a RecordBatch
    
    This PR contains a series of refactorings to extract code from the IPC 
`FileWriter` and `StreamWriter` that generates `EncodedData` instances. I 
highly recommend reviewing commit-by-commit, I tried to make the changes fairly 
mechanical and easy to understand at each step.
    
    With those refactorings, it becomes easier to reuse the code with Flight 
too, so that when we're generating `FlightData` for a `RecordBatch`, we get the 
`FlightData` for the batch's dictionaries too.
    
    This will help in getting the Flight integration tests that include 
dictionaries to pass... I'm also having some unrelated protocol problems over 
in [this PR I'm working on](https://github.com/apache/arrow/pull/8641) but this 
work will help there.
    
    Closes #8826 from carols10cents/refactor-file-stream-writer
    
    Authored-by: Carol (Nichols || Goulding) <[email protected]>
    Signed-off-by: Neville Dipale <[email protected]>
---
 rust/arrow-flight/src/utils.rs            |  68 ++--
 rust/arrow/src/ipc/writer.rs              | 528 ++++++++++++++++--------------
 rust/datafusion/examples/flight_server.rs |  25 +-
 rust/parquet/src/arrow/schema.rs          |   3 +-
 4 files changed, 325 insertions(+), 299 deletions(-)

diff --git a/rust/arrow-flight/src/utils.rs b/rust/arrow-flight/src/utils.rs
index ee19f34..995aa18 100644
--- a/rust/arrow-flight/src/utils.rs
+++ b/rust/arrow-flight/src/utils.rs
@@ -26,40 +26,29 @@ use arrow::error::{ArrowError, Result};
 use arrow::ipc::{convert, reader, writer, writer::IpcWriteOptions};
 use arrow::record_batch::RecordBatch;
 
-/// Convert a `RecordBatch` to `FlightData` by converting the header and body 
to bytes
-///
-/// Note: This implicitly uses the default `IpcWriteOptions`. To configure 
options,
-/// use `flight_data_from_arrow_batch()`
-impl From<&RecordBatch> for FlightData {
-    fn from(batch: &RecordBatch) -> Self {
-        let options = IpcWriteOptions::default();
-        flight_data_from_arrow_batch(batch, &options)
-    }
-}
-
-/// Convert a `RecordBatch` to `FlightData` by converting the header and body 
to bytes
+/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes 
of the dictionaries
+/// and values
 pub fn flight_data_from_arrow_batch(
     batch: &RecordBatch,
     options: &IpcWriteOptions,
-) -> FlightData {
-    let data = writer::record_batch_to_bytes(batch, &options);
-    FlightData {
-        flight_descriptor: None,
-        app_metadata: vec![],
-        data_header: data.ipc_message,
-        data_body: data.arrow_data,
-    }
-}
+) -> Vec<FlightData> {
+    let data_gen = writer::IpcDataGenerator::default();
+    let mut dictionary_tracker = writer::DictionaryTracker::new(false);
 
-/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
-///
-/// Note: This implicitly uses the default `IpcWriteOptions`. To configure 
options,
-/// use `flight_schema_from_arrow_schema()`
-impl From<&Schema> for SchemaResult {
-    fn from(schema: &Schema) -> Self {
-        let options = IpcWriteOptions::default();
-        flight_schema_from_arrow_schema(schema, &options)
-    }
+    let (encoded_dictionaries, encoded_batch) = data_gen
+        .encoded_batch(batch, &mut dictionary_tracker, &options)
+        .expect("DictionaryTracker configured above to not error on 
replacement");
+
+    encoded_dictionaries
+        .into_iter()
+        .chain(std::iter::once(encoded_batch))
+        .map(|data| FlightData {
+            flight_descriptor: None,
+            app_metadata: vec![],
+            data_header: data.ipc_message,
+            data_body: data.arrow_data,
+        })
+        .collect()
 }
 
 /// Convert a `Schema` to `SchemaResult` by converting to an IPC message
@@ -67,19 +56,11 @@ pub fn flight_schema_from_arrow_schema(
     schema: &Schema,
     options: &IpcWriteOptions,
 ) -> SchemaResult {
-    SchemaResult {
-        schema: writer::schema_to_bytes(schema, &options).ipc_message,
-    }
-}
+    let data_gen = writer::IpcDataGenerator::default();
+    let schema_bytes = data_gen.schema_to_bytes(schema, &options);
 
-/// Convert a `Schema` to `FlightData` by converting to an IPC message
-///
-/// Note: This implicitly uses the default `IpcWriteOptions`. To configure 
options,
-/// use `flight_data_from_arrow_schema()`
-impl From<&Schema> for FlightData {
-    fn from(schema: &Schema) -> Self {
-        let options = writer::IpcWriteOptions::default();
-        flight_data_from_arrow_schema(schema, &options)
+    SchemaResult {
+        schema: schema_bytes.ipc_message,
     }
 }
 
@@ -88,7 +69,8 @@ pub fn flight_data_from_arrow_schema(
     schema: &Schema,
     options: &IpcWriteOptions,
 ) -> FlightData {
-    let schema = writer::schema_to_bytes(schema, &options);
+    let data_gen = writer::IpcDataGenerator::default();
+    let schema = data_gen.schema_to_bytes(schema, &options);
     FlightData {
         flight_descriptor: None,
         app_metadata: vec![],
diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs
index e07e2be..e7983ee 100644
--- a/rust/arrow/src/ipc/writer.rs
+++ b/rust/arrow/src/ipc/writer.rs
@@ -98,6 +98,239 @@ impl Default for IpcWriteOptions {
     }
 }
 
+#[derive(Debug, Default)]
+pub struct IpcDataGenerator {}
+
+impl IpcDataGenerator {
+    pub fn schema_to_bytes(
+        &self,
+        schema: &Schema,
+        write_options: &IpcWriteOptions,
+    ) -> EncodedData {
+        let mut fbb = FlatBufferBuilder::new();
+        let schema = {
+            let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema);
+            fb.as_union_value()
+        };
+
+        let mut message = ipc::MessageBuilder::new(&mut fbb);
+        message.add_version(write_options.metadata_version);
+        message.add_header_type(ipc::MessageHeader::Schema);
+        message.add_bodyLength(0);
+        message.add_header(schema);
+        // TODO: custom metadata
+        let data = message.finish();
+        fbb.finish(data, None);
+
+        let data = fbb.finished_data();
+        EncodedData {
+            ipc_message: data.to_vec(),
+            arrow_data: vec![],
+        }
+    }
+
+    pub fn encoded_batch(
+        &self,
+        batch: &RecordBatch,
+        dictionary_tracker: &mut DictionaryTracker,
+        write_options: &IpcWriteOptions,
+    ) -> Result<(Vec<EncodedData>, EncodedData)> {
+        // TODO: handle nested dictionaries
+        let schema = batch.schema();
+        let mut encoded_dictionaries = 
Vec::with_capacity(schema.fields().len());
+
+        for (i, field) in schema.fields().iter().enumerate() {
+            let column = batch.column(i);
+
+            if let DataType::Dictionary(_key_type, _value_type) = 
column.data_type() {
+                let dict_id = field
+                    .dict_id()
+                    .expect("All Dictionary types have `dict_id`");
+                let dict_data = column.data();
+                let dict_values = &dict_data.child_data()[0];
+
+                let emit = dictionary_tracker.insert(dict_id, column)?;
+
+                if emit {
+                    encoded_dictionaries.push(self.dictionary_batch_to_bytes(
+                        dict_id,
+                        dict_values,
+                        write_options,
+                    ));
+                }
+            }
+        }
+
+        let encoded_message = self.record_batch_to_bytes(batch, write_options);
+
+        Ok((encoded_dictionaries, encoded_message))
+    }
+
+    /// Write a `RecordBatch` into two sets of bytes, one for the header 
(ipc::Message) and the
+    /// other for the batch's data
+    fn record_batch_to_bytes(
+        &self,
+        batch: &RecordBatch,
+        write_options: &IpcWriteOptions,
+    ) -> EncodedData {
+        let mut fbb = FlatBufferBuilder::new();
+
+        let mut nodes: Vec<ipc::FieldNode> = vec![];
+        let mut buffers: Vec<ipc::Buffer> = vec![];
+        let mut arrow_data: Vec<u8> = vec![];
+        let mut offset = 0;
+        for array in batch.columns() {
+            let array_data = array.data();
+            offset = write_array_data(
+                &array_data,
+                &mut buffers,
+                &mut arrow_data,
+                &mut nodes,
+                offset,
+                array.len(),
+                array.null_count(),
+            );
+        }
+
+        // write data
+        let buffers = fbb.create_vector(&buffers);
+        let nodes = fbb.create_vector(&nodes);
+
+        let root = {
+            let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
+            batch_builder.add_length(batch.num_rows() as i64);
+            batch_builder.add_nodes(nodes);
+            batch_builder.add_buffers(buffers);
+            let b = batch_builder.finish();
+            b.as_union_value()
+        };
+        // create an ipc::Message
+        let mut message = ipc::MessageBuilder::new(&mut fbb);
+        message.add_version(write_options.metadata_version);
+        message.add_header_type(ipc::MessageHeader::RecordBatch);
+        message.add_bodyLength(arrow_data.len() as i64);
+        message.add_header(root);
+        let root = message.finish();
+        fbb.finish(root, None);
+        let finished_data = fbb.finished_data();
+
+        EncodedData {
+            ipc_message: finished_data.to_vec(),
+            arrow_data,
+        }
+    }
+
+    /// Write dictionary values into two sets of bytes, one for the header 
(ipc::Message) and the
+    /// other for the data
+    fn dictionary_batch_to_bytes(
+        &self,
+        dict_id: i64,
+        array_data: &ArrayDataRef,
+        write_options: &IpcWriteOptions,
+    ) -> EncodedData {
+        let mut fbb = FlatBufferBuilder::new();
+
+        let mut nodes: Vec<ipc::FieldNode> = vec![];
+        let mut buffers: Vec<ipc::Buffer> = vec![];
+        let mut arrow_data: Vec<u8> = vec![];
+
+        write_array_data(
+            &array_data,
+            &mut buffers,
+            &mut arrow_data,
+            &mut nodes,
+            0,
+            array_data.len(),
+            array_data.null_count(),
+        );
+
+        // write data
+        let buffers = fbb.create_vector(&buffers);
+        let nodes = fbb.create_vector(&nodes);
+
+        let root = {
+            let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
+            batch_builder.add_length(array_data.len() as i64);
+            batch_builder.add_nodes(nodes);
+            batch_builder.add_buffers(buffers);
+            batch_builder.finish()
+        };
+
+        let root = {
+            let mut batch_builder = ipc::DictionaryBatchBuilder::new(&mut fbb);
+            batch_builder.add_id(dict_id);
+            batch_builder.add_data(root);
+            batch_builder.finish().as_union_value()
+        };
+
+        let root = {
+            let mut message_builder = ipc::MessageBuilder::new(&mut fbb);
+            message_builder.add_version(write_options.metadata_version);
+            
message_builder.add_header_type(ipc::MessageHeader::DictionaryBatch);
+            message_builder.add_bodyLength(arrow_data.len() as i64);
+            message_builder.add_header(root);
+            message_builder.finish()
+        };
+
+        fbb.finish(root, None);
+        let finished_data = fbb.finished_data();
+
+        EncodedData {
+            ipc_message: finished_data.to_vec(),
+            arrow_data,
+        }
+    }
+}
+
+/// Keeps track of dictionaries that have been written, to avoid emitting the 
same dictionary
+/// multiple times. Can optionally error if an update to an existing 
dictionary is attempted, which
+/// isn't allowed in the `FileWriter`.
+pub struct DictionaryTracker {
+    written: HashMap<i64, ArrayRef>,
+    error_on_replacement: bool,
+}
+
+impl DictionaryTracker {
+    pub fn new(error_on_replacement: bool) -> Self {
+        Self {
+            written: HashMap::new(),
+            error_on_replacement,
+        }
+    }
+
+    /// Keep track of the dictionary with the given ID and values. Behavior:
+    ///
+    /// * If this ID has been written already and has the same data, return 
`Ok(false)` to indicate
+    ///   that the dictionary was not actually inserted (because it's already 
been seen).
+    /// * If this ID has been written already but with different data, and 
this tracker is
+    ///   configured to return an error, return an error.
+    /// * If the tracker has not been configured to error on replacement or 
this dictionary
+    ///   has never been seen before, return `Ok(true)` to indicate that the 
dictionary was just
+    ///   inserted.
+    pub fn insert(&mut self, dict_id: i64, column: &ArrayRef) -> Result<bool> {
+        let dict_data = column.data();
+        let dict_values = &dict_data.child_data()[0];
+
+        // If a dictionary with this id was already emitted, check if it was 
the same.
+        if let Some(last) = self.written.get(&dict_id) {
+            if last.data().child_data()[0] == *dict_values {
+                // Same dictionary values => no need to emit it again
+                return Ok(false);
+            } else if self.error_on_replacement {
+                return Err(ArrowError::InvalidArgumentError(
+                    "Dictionary replacement detected when writing IPC file 
format. \
+                     Arrow IPC files only support a single dictionary for a 
given field \
+                     across all batches."
+                        .to_string(),
+                ));
+            }
+        }
+
+        self.written.insert(dict_id, column.clone());
+        Ok(true)
+    }
+}
+
 pub struct FileWriter<W: Write> {
     /// The object to write to
     writer: BufWriter<W>,
@@ -114,7 +347,9 @@ pub struct FileWriter<W: Write> {
     /// Whether the writer footer has been written, and the writer is finished
     finished: bool,
     /// Keeps track of dictionaries that have been written
-    last_written_dictionaries: HashMap<i64, ArrayRef>,
+    dictionary_tracker: DictionaryTracker,
+
+    data_gen: IpcDataGenerator,
 }
 
 impl<W: Write> FileWriter<W> {
@@ -130,14 +365,15 @@ impl<W: Write> FileWriter<W> {
         schema: &Schema,
         write_options: IpcWriteOptions,
     ) -> Result<Self> {
+        let data_gen = IpcDataGenerator::default();
         let mut writer = BufWriter::new(writer);
         // write magic to header
         writer.write_all(&super::ARROW_MAGIC[..])?;
         // create an 8-byte boundary after the header
         writer.write_all(&[0, 0])?;
         // write the schema, set the written bytes to the schema + header
-        let message = Message::Schema(schema, &write_options);
-        let (meta, data) = write_message(&mut writer, &message, 
&write_options)?;
+        let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
+        let (meta, data) = write_message(&mut writer, encoded_message, 
&write_options)?;
         Ok(Self {
             writer,
             write_options,
@@ -146,7 +382,8 @@ impl<W: Write> FileWriter<W> {
             dictionary_blocks: vec![],
             record_blocks: vec![],
             finished: false,
-            last_written_dictionaries: HashMap::new(),
+            dictionary_tracker: DictionaryTracker::new(true),
+            data_gen,
         })
     }
 
@@ -157,10 +394,25 @@ impl<W: Write> FileWriter<W> {
                 "Cannot write record batch to file writer as it is 
closed".to_string(),
             ));
         }
-        self.write_dictionaries(&batch)?;
-        let message = Message::RecordBatch(batch, &self.write_options);
+
+        let (encoded_dictionaries, encoded_message) = 
self.data_gen.encoded_batch(
+            batch,
+            &mut self.dictionary_tracker,
+            &self.write_options,
+        )?;
+
+        for encoded_dictionary in encoded_dictionaries {
+            let (meta, data) =
+                write_message(&mut self.writer, encoded_dictionary, 
&self.write_options)?;
+
+            let block =
+                ipc::Block::new(self.block_offsets as i64, meta as i32, data 
as i64);
+            self.dictionary_blocks.push(block);
+            self.block_offsets += meta + data;
+        }
+
         let (meta, data) =
-            write_message(&mut self.writer, &message, &self.write_options)?;
+            write_message(&mut self.writer, encoded_message, 
&self.write_options)?;
         // add a record block for the footer
         let block = ipc::Block::new(
             self.block_offsets as i64,
@@ -172,53 +424,6 @@ impl<W: Write> FileWriter<W> {
         Ok(())
     }
 
-    fn write_dictionaries(&mut self, batch: &RecordBatch) -> Result<()> {
-        // TODO: handle nested dictionaries
-
-        let schema = batch.schema();
-        for (i, field) in schema.fields().iter().enumerate() {
-            let column = batch.column(i);
-
-            if let DataType::Dictionary(_key_type, _value_type) = 
column.data_type() {
-                let dict_id = field
-                    .dict_id()
-                    .expect("All Dictionary types have `dict_id`");
-                let dict_data = column.data();
-                let dict_values = &dict_data.child_data()[0];
-
-                // If a dictionary with this id was already emitted, check if 
it was the same.
-                if let Some(last_dictionary) =
-                    self.last_written_dictionaries.get(&dict_id)
-                {
-                    if last_dictionary.data().child_data()[0] == *dict_values {
-                        // Same dictionary values => no need to emit it again
-                        continue;
-                    } else {
-                        return Err(ArrowError::InvalidArgumentError(
-                            "Dictionary replacement detected when writing IPC 
file format. \
-                             Arrow IPC files only support a single dictionary 
for a given field \
-                             across all batches.".to_string()));
-                    }
-                }
-
-                self.last_written_dictionaries
-                    .insert(dict_id, column.clone());
-
-                let message =
-                    Message::DictionaryBatch(dict_id, dict_values, 
&self.write_options);
-
-                let (meta, data) =
-                    write_message(&mut self.writer, &message, 
&self.write_options)?;
-
-                let block =
-                    ipc::Block::new(self.block_offsets as i64, meta as i32, 
data as i64);
-                self.dictionary_blocks.push(block);
-                self.block_offsets += meta + data;
-            }
-        }
-        Ok(())
-    }
-
     /// Write footer and closing tag, then mark the writer as done
     pub fn finish(&mut self) -> Result<()> {
         // write EOS
@@ -269,7 +474,9 @@ pub struct StreamWriter<W: Write> {
     /// Whether the writer footer has been written, and the writer is finished
     finished: bool,
     /// Keeps track of dictionaries that have been written
-    last_written_dictionaries: HashMap<i64, ArrayRef>,
+    dictionary_tracker: DictionaryTracker,
+
+    data_gen: IpcDataGenerator,
 }
 
 impl<W: Write> StreamWriter<W> {
@@ -284,16 +491,18 @@ impl<W: Write> StreamWriter<W> {
         schema: &Schema,
         write_options: IpcWriteOptions,
     ) -> Result<Self> {
+        let data_gen = IpcDataGenerator::default();
         let mut writer = BufWriter::new(writer);
         // write the schema, set the written bytes to the schema
-        let message = Message::Schema(schema, &write_options);
-        write_message(&mut writer, &message, &write_options)?;
+        let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
+        write_message(&mut writer, encoded_message, &write_options)?;
         Ok(Self {
             writer,
             write_options,
             schema: schema.clone(),
             finished: false,
-            last_written_dictionaries: HashMap::new(),
+            dictionary_tracker: DictionaryTracker::new(false),
+            data_gen,
         })
     }
 
@@ -304,46 +513,17 @@ impl<W: Write> StreamWriter<W> {
                 "Cannot write record batch to stream writer as it is 
closed".to_string(),
             ));
         }
-        self.write_dictionaries(&batch)?;
-
-        let message = Message::RecordBatch(batch, &self.write_options);
-        write_message(&mut self.writer, &message, &self.write_options)?;
-        Ok(())
-    }
-
-    fn write_dictionaries(&mut self, batch: &RecordBatch) -> Result<()> {
-        // TODO: handle nested dictionaries
-
-        let schema = batch.schema();
-        for (i, field) in schema.fields().iter().enumerate() {
-            let column = batch.column(i);
-
-            if let DataType::Dictionary(_key_type, _value_type) = 
column.data_type() {
-                let dict_id = field
-                    .dict_id()
-                    .expect("All Dictionary types have `dict_id`");
-                let dict_data = column.data();
-                let dict_values = &dict_data.child_data()[0];
-
-                // If a dictionary with this id was already emitted, check if 
it was the same.
-                if let Some(last_dictionary) =
-                    self.last_written_dictionaries.get(&dict_id)
-                {
-                    if last_dictionary.data().child_data()[0] == *dict_values {
-                        // Same dictionary values => no need to emit it again
-                        continue;
-                    }
-                }
 
-                self.last_written_dictionaries
-                    .insert(dict_id, column.clone());
+        let (encoded_dictionaries, encoded_message) = self
+            .data_gen
+            .encoded_batch(batch, &mut self.dictionary_tracker, 
&self.write_options)
+            .expect("StreamWriter is configured to not error on dictionary 
replacement");
 
-                let message =
-                    Message::DictionaryBatch(dict_id, dict_values, 
&self.write_options);
-
-                write_message(&mut self.writer, &message, 
&self.write_options)?;
-            }
+        for encoded_dictionary in encoded_dictionaries {
+            write_message(&mut self.writer, encoded_dictionary, 
&self.write_options)?;
         }
+
+        write_message(&mut self.writer, encoded_message, &self.write_options)?;
         Ok(())
     }
 
@@ -374,57 +554,12 @@ pub struct EncodedData {
     pub arrow_data: Vec<u8>,
 }
 
-pub fn schema_to_bytes(schema: &Schema, write_options: &IpcWriteOptions) -> 
EncodedData {
-    let mut fbb = FlatBufferBuilder::new();
-    let schema = {
-        let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema);
-        fb.as_union_value()
-    };
-
-    let mut message = ipc::MessageBuilder::new(&mut fbb);
-    message.add_version(write_options.metadata_version);
-    message.add_header_type(ipc::MessageHeader::Schema);
-    message.add_bodyLength(0);
-    message.add_header(schema);
-    // TODO: custom metadata
-    let data = message.finish();
-    fbb.finish(data, None);
-
-    let data = fbb.finished_data();
-    EncodedData {
-        ipc_message: data.to_vec(),
-        arrow_data: vec![],
-    }
-}
-
-enum Message<'a> {
-    Schema(&'a Schema, &'a IpcWriteOptions),
-    RecordBatch(&'a RecordBatch, &'a IpcWriteOptions),
-    DictionaryBatch(i64, &'a ArrayDataRef, &'a IpcWriteOptions),
-}
-
-impl<'a> Message<'a> {
-    /// Encode message to a ipc::Message and return data as bytes
-    fn encode(&'a self) -> EncodedData {
-        match self {
-            Message::Schema(schema, options) => schema_to_bytes(*schema, 
*options),
-            Message::RecordBatch(batch, options) => {
-                record_batch_to_bytes(*batch, *options)
-            }
-            Message::DictionaryBatch(dict_id, array_data, options) => {
-                dictionary_batch_to_bytes(*dict_id, *array_data, *options)
-            }
-        }
-    }
-}
-
 /// Write a message's IPC data and buffers, returning metadata and buffer data 
lengths written
 fn write_message<W: Write>(
     mut writer: &mut BufWriter<W>,
-    message: &Message,
+    encoded: EncodedData,
     write_options: &IpcWriteOptions,
 ) -> Result<(usize, usize)> {
-    let encoded = message.encode();
     let arrow_data_len = encoded.arrow_data.len();
     if arrow_data_len % 8 != 0 {
         return Err(ArrowError::MemoryError(
@@ -481,117 +616,6 @@ fn write_body_buffers<W: Write>(writer: &mut 
BufWriter<W>, data: &[u8]) -> Resul
     Ok(total_len as usize)
 }
 
-/// Write a `RecordBatch` into a tuple of bytes, one for the header 
(ipc::Message) and the other for the batch's data
-pub fn record_batch_to_bytes(
-    batch: &RecordBatch,
-    write_options: &IpcWriteOptions,
-) -> EncodedData {
-    let mut fbb = FlatBufferBuilder::new();
-
-    let mut nodes: Vec<ipc::FieldNode> = vec![];
-    let mut buffers: Vec<ipc::Buffer> = vec![];
-    let mut arrow_data: Vec<u8> = vec![];
-    let mut offset = 0;
-    for array in batch.columns() {
-        let array_data = array.data();
-        offset = write_array_data(
-            &array_data,
-            &mut buffers,
-            &mut arrow_data,
-            &mut nodes,
-            offset,
-            array.len(),
-            array.null_count(),
-        );
-    }
-
-    // write data
-    let buffers = fbb.create_vector(&buffers);
-    let nodes = fbb.create_vector(&nodes);
-
-    let root = {
-        let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
-        batch_builder.add_length(batch.num_rows() as i64);
-        batch_builder.add_nodes(nodes);
-        batch_builder.add_buffers(buffers);
-        let b = batch_builder.finish();
-        b.as_union_value()
-    };
-    // create an ipc::Message
-    let mut message = ipc::MessageBuilder::new(&mut fbb);
-    message.add_version(write_options.metadata_version);
-    message.add_header_type(ipc::MessageHeader::RecordBatch);
-    message.add_bodyLength(arrow_data.len() as i64);
-    message.add_header(root);
-    let root = message.finish();
-    fbb.finish(root, None);
-    let finished_data = fbb.finished_data();
-
-    EncodedData {
-        ipc_message: finished_data.to_vec(),
-        arrow_data,
-    }
-}
-
-/// Write dictionary values into a tuple of bytes, one for the header 
(ipc::Message) and the other for the data
-pub fn dictionary_batch_to_bytes(
-    dict_id: i64,
-    array_data: &ArrayDataRef,
-    write_options: &IpcWriteOptions,
-) -> EncodedData {
-    let mut fbb = FlatBufferBuilder::new();
-
-    let mut nodes: Vec<ipc::FieldNode> = vec![];
-    let mut buffers: Vec<ipc::Buffer> = vec![];
-    let mut arrow_data: Vec<u8> = vec![];
-
-    write_array_data(
-        &array_data,
-        &mut buffers,
-        &mut arrow_data,
-        &mut nodes,
-        0,
-        array_data.len(),
-        array_data.null_count(),
-    );
-
-    // write data
-    let buffers = fbb.create_vector(&buffers);
-    let nodes = fbb.create_vector(&nodes);
-
-    let root = {
-        let mut batch_builder = ipc::RecordBatchBuilder::new(&mut fbb);
-        batch_builder.add_length(array_data.len() as i64);
-        batch_builder.add_nodes(nodes);
-        batch_builder.add_buffers(buffers);
-        batch_builder.finish()
-    };
-
-    let root = {
-        let mut batch_builder = ipc::DictionaryBatchBuilder::new(&mut fbb);
-        batch_builder.add_id(dict_id);
-        batch_builder.add_data(root);
-        batch_builder.finish().as_union_value()
-    };
-
-    let root = {
-        let mut message_builder = ipc::MessageBuilder::new(&mut fbb);
-        message_builder.add_version(write_options.metadata_version);
-        message_builder.add_header_type(ipc::MessageHeader::DictionaryBatch);
-        message_builder.add_bodyLength(arrow_data.len() as i64);
-        message_builder.add_header(root);
-        message_builder.finish()
-    };
-
-    fbb.finish(root, None);
-    let finished_data = fbb.finished_data();
-
-    EncodedData {
-        ipc_message: finished_data.to_vec(),
-        arrow_data,
-    }
-}
-
 /// Write a record batch to the writer, writing the message size before the 
message
 /// if the record batch is being written to a stream
 fn write_continuation<W: Write>(
diff --git a/rust/datafusion/examples/flight_server.rs 
b/rust/datafusion/examples/flight_server.rs
index a601b7c..d835ab0 100644
--- a/rust/datafusion/examples/flight_server.rs
+++ b/rust/datafusion/examples/flight_server.rs
@@ -66,7 +66,13 @@ impl FlightService for FlightServiceImpl {
 
         let table = ParquetTable::try_new(&request.path[0]).unwrap();
 
-        Ok(Response::new(SchemaResult::from(table.schema().as_ref())))
+        let options = arrow::ipc::writer::IpcWriteOptions::default();
+        let schema_result = 
arrow_flight::utils::flight_schema_from_arrow_schema(
+            table.schema().as_ref(),
+            &options,
+        );
+
+        Ok(Response::new(schema_result))
     }
 
     async fn do_get(
@@ -108,13 +114,26 @@ impl FlightService for FlightServiceImpl {
                 }
 
                 // add an initial FlightData message that sends schema
+                let options = arrow::ipc::writer::IpcWriteOptions::default();
                 let schema = plan.schema();
+                let schema_flight_data =
+                    arrow_flight::utils::flight_data_from_arrow_schema(
+                        schema.as_ref(),
+                        &options,
+                    );
+
                 let mut flights: Vec<Result<FlightData, Status>> =
-                    vec![Ok(FlightData::from(schema.as_ref()))];
+                    vec![Ok(schema_flight_data)];
 
                 let mut batches: Vec<Result<FlightData, Status>> = results
                     .iter()
-                    .map(|batch| Ok(FlightData::from(batch)))
+                    .flat_map(|batch| {
+                        let flight_data =
+                            arrow_flight::utils::flight_data_from_arrow_batch(
+                                batch, &options,
+                            );
+                        flight_data.into_iter().map(Ok)
+                    })
                     .collect();
 
                 // append batch vector to schema vector, so that the first 
message sent is the schema
diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs
index 8674dfa..163d675 100644
--- a/rust/parquet/src/arrow/schema.rs
+++ b/rust/parquet/src/arrow/schema.rs
@@ -205,7 +205,8 @@ fn get_arrow_schema_from_metadata(encoded_meta: &str) -> 
Option<Schema> {
 /// Encodes the Arrow schema into the IPC format, and base64 encodes it
 fn encode_arrow_schema(schema: &Schema) -> String {
     let options = writer::IpcWriteOptions::default();
-    let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&schema, 
&options);
+    let data_gen = arrow::ipc::writer::IpcDataGenerator::default();
+    let mut serialized_schema = data_gen.schema_to_bytes(&schema, &options);
 
     // manually prepending the length to the schema as arrow uses the legacy 
IPC format
     // TODO: change after addressing ARROW-9777

Reply via email to