This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new b581ef51b0 Add safe zero-copy converion from bytes::Bytes (#4254) 
(#4260)
b581ef51b0 is described below

commit b581ef51b07f416374955f4b3ebbcd1ff8b1fc48
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Aug 16 13:10:48 2023 +0100

    Add safe zero-copy converion from bytes::Bytes (#4254) (#4260)
---
 arrow-buffer/Cargo.toml        |  1 +
 arrow-buffer/src/bytes.rs      | 28 ++++++++++++++++++++++++++++
 arrow-flight/src/decode.rs     |  3 ++-
 arrow-flight/src/sql/client.rs |  2 +-
 4 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/arrow-buffer/Cargo.toml b/arrow-buffer/Cargo.toml
index 1db388db83..746045cc8d 100644
--- a/arrow-buffer/Cargo.toml
+++ b/arrow-buffer/Cargo.toml
@@ -34,6 +34,7 @@ path = "src/lib.rs"
 bench = false
 
 [dependencies]
+bytes = { version = "1.4" }
 num = { version = "0.4", default-features = false, features = ["std"] }
 half = { version = "2.1", default-features = false }
 
diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs
index b3105ed5a3..8f5019d5a4 100644
--- a/arrow-buffer/src/bytes.rs
+++ b/arrow-buffer/src/bytes.rs
@@ -148,3 +148,31 @@ impl Debug for Bytes {
         write!(f, " }}")
     }
 }
+
+impl From<bytes::Bytes> for Bytes {
+    fn from(value: bytes::Bytes) -> Self {
+        Self {
+            len: value.len(),
+            ptr: NonNull::new(value.as_ptr() as _).unwrap(),
+            deallocation: Deallocation::Custom(std::sync::Arc::new(value)),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_from_bytes() {
+        let bytes = bytes::Bytes::from(vec![1, 2, 3, 4]);
+        let arrow_bytes: Bytes = bytes.clone().into();
+
+        assert_eq!(bytes.as_ptr(), arrow_bytes.as_ptr());
+
+        drop(bytes);
+        drop(arrow_bytes);
+
+        let _ = Bytes::from(bytes::Bytes::new());
+    }
+}
diff --git a/arrow-flight/src/decode.rs b/arrow-flight/src/decode.rs
index fe132e3e84..df74923332 100644
--- a/arrow-flight/src/decode.rs
+++ b/arrow-flight/src/decode.rs
@@ -17,6 +17,7 @@
 
 use crate::{utils::flight_data_to_arrow_batch, FlightData};
 use arrow_array::{ArrayRef, RecordBatch};
+use arrow_buffer::Buffer;
 use arrow_schema::{Schema, SchemaRef};
 use bytes::Bytes;
 use futures::{ready, stream::BoxStream, Stream, StreamExt};
@@ -258,7 +259,7 @@ impl FlightDataDecoder {
                     ));
                 };
 
-                let buffer: arrow_buffer::Buffer = data.data_body.into();
+                let buffer = Buffer::from_bytes(data.data_body.into());
                 let dictionary_batch =
                     message.header_as_dictionary_batch().ok_or_else(|| {
                         FlightError::protocol(
diff --git a/arrow-flight/src/sql/client.rs b/arrow-flight/src/sql/client.rs
index c9adc2b98b..d661c96409 100644
--- a/arrow-flight/src/sql/client.rs
+++ b/arrow-flight/src/sql/client.rs
@@ -538,7 +538,7 @@ pub fn arrow_data_from_flight_data(
 
             let dictionaries_by_field = HashMap::new();
             let record_batch = read_record_batch(
-                &Buffer::from(&flight_data.data_body),
+                &Buffer::from_bytes(flight_data.data_body.into()),
                 ipc_record_batch,
                 arrow_schema_ref.clone(),
                 &dictionaries_by_field,

Reply via email to