tustvold commented on code in PR #3481:
URL: https://github.com/apache/arrow-rs/pull/3481#discussion_r1063693157


##########
arrow-flight/src/encode.rs:
##########
@@ -556,4 +559,215 @@ mod tests {
 
     // test sending record batches
     // test sending record batches with multiple different dictionaries
+
+    #[tokio::test]
+    async fn flight_data_size_even() {
+        let s1 =
+            StringArray::from_iter_values(std::iter::repeat(".10 
bytes.").take(1024));
+        let i1 = Int16Array::from_iter_values(0..1024);
+        let s2 = 
StringArray::from_iter_values(std::iter::repeat("6bytes").take(1024));
+        let i2 = Int64Array::from_iter_values(0..1024);
+
+        let batch = RecordBatch::try_from_iter(vec![
+            ("s1", Arc::new(s1) as _),
+            ("i1", Arc::new(i1) as _),
+            ("s2", Arc::new(s2) as _),
+            ("i2", Arc::new(i2) as _),
+        ])
+        .unwrap();
+
+        verify_encoded_split(batch, 112).await;
+    }
+
+    #[tokio::test]
+    async fn flight_data_size_uneven_variable_lengths() {
+        // each row has a longer string than the last with increasing lengths 
0 --> 1024
+        let array = StringArray::from_iter_values((0..1024).map(|i| 
"*".repeat(i)));
+        let batch =
+            RecordBatch::try_from_iter(vec![("data", Arc::new(array) as 
_)]).unwrap();
+
+        verify_encoded_split(batch, 4304).await;
+    }
+
+    #[tokio::test]
+    async fn flight_data_size_large_row() {
+        // batch with individual that can each exceed the batch size
+        let array1 = StringArray::from_iter_values(vec![
+            "*".repeat(500),
+            "*".repeat(500),
+            "*".repeat(500),
+            "*".repeat(500),
+        ]);
+        let array2 = StringArray::from_iter_values(vec![
+            "*".to_string(),
+            "*".repeat(1000),
+            "*".repeat(2000),
+            "*".repeat(4000),
+        ]);
+
+        let array3 = StringArray::from_iter_values(vec![
+            "*".to_string(),
+            "*".to_string(),
+            "*".repeat(1000),
+            "*".repeat(2000),
+        ]);
+
+        let batch = RecordBatch::try_from_iter(vec![
+            ("a1", Arc::new(array1) as _),
+            ("a2", Arc::new(array2) as _),
+            ("a3", Arc::new(array3) as _),
+        ])
+        .unwrap();
+
+        // 5k over limit (which is 2x larger than limit of 5k) -- not great :(
+        verify_encoded_split(batch, 5800).await;
+    }
+
+    #[tokio::test]
+    async fn flight_data_size_string_dictionary() {
+        // Small dictionary (only 2 distinct values ==> 2 entries in 
dictionary)
+        let array: DictionaryArray<Int32Type> = (1..1024)
+            .map(|i| match i % 3 {
+                0 => Some("value0"),
+                1 => Some("value1"),
+                _ => None,
+            })
+            .collect();
+
+        let batch =
+            RecordBatch::try_from_iter(vec![("a1", Arc::new(array) as 
_)]).unwrap();
+
+        verify_encoded_split(batch, 160).await;
+    }
+
+    #[tokio::test]
+    async fn flight_data_size_large_dictionary() {
+        // large dictionary (all distinct values ==> 1024 entries in 
dictionary)
+        let values: Vec<_> = (1..1024).map(|i| "**".repeat(i)).collect();
+
+        let array: DictionaryArray<Int32Type> =
+            values.iter().map(|s| Some(s.as_str())).collect();
+
+        let batch =
+            RecordBatch::try_from_iter(vec![("a1", Arc::new(array) as 
_)]).unwrap();
+
+        verify_encoded_split(batch, 3328).await;
+    }
+
+    #[tokio::test]
+    async fn flight_data_size_large_dictionary_repeated_non_uniform() {
+        // large dictionary (1024 distinct values) that are used throughout 
the array
+        let values = StringArray::from_iter_values((0..1024).map(|i| 
"******".repeat(i)));
+        let keys = Int32Array::from_iter_values((0..3000).map(|i| (3000 - i) % 
1024));
+        let array = DictionaryArray::<Int32Type>::try_new(&keys, 
&values).unwrap();
+
+        let batch =
+            RecordBatch::try_from_iter(vec![("a1", Arc::new(array) as 
_)]).unwrap();
+
+        // Almost twice as large as limit
+        verify_encoded_split(batch, 5280).await;
+    }
+
+    #[tokio::test]
+    async fn flight_data_size_multiple_dictionaries() {
+        // high cardinality
+        let values1: Vec<_> = (1..1024).map(|i| "**".repeat(i)).collect();
+        // highish cardinality
+        let values2: Vec<_> = (1..1024).map(|i| "**".repeat(i % 10)).collect();
+        // medium cardinality
+        let values3: Vec<_> = (1..1024).map(|i| "**".repeat(i % 
100)).collect();
+
+        let array1: DictionaryArray<Int32Type> =
+            values1.iter().map(|s| Some(s.as_str())).collect();
+        let array2: DictionaryArray<Int32Type> =
+            values2.iter().map(|s| Some(s.as_str())).collect();
+        let array3: DictionaryArray<Int32Type> =
+            values3.iter().map(|s| Some(s.as_str())).collect();
+
+        let batch = RecordBatch::try_from_iter(vec![
+            ("a1", Arc::new(array1) as _),
+            ("a2", Arc::new(array2) as _),
+            ("a3", Arc::new(array3) as _),
+        ])
+        .unwrap();
+
+        verify_encoded_split(batch, 4128).await;
+    }
+
+    /// Return size, in memory of flight data
+    fn flight_data_size(d: &FlightData) -> usize {
+        let flight_descriptor_size = d
+            .flight_descriptor
+            .as_ref()
+            .map(|descriptor| {
+                let path_len: usize =
+                    descriptor.path.iter().map(|p| p.as_bytes().len()).sum();
+
+                std::mem::size_of_val(descriptor) + descriptor.cmd.len() + 
path_len
+            })
+            .unwrap_or(0);
+
+        flight_descriptor_size
+            + d.app_metadata.len()
+            + d.data_body.len()
+            + d.data_header.len()
+    }
+
+    /// Coverage for <https://github.com/apache/arrow-rs/issues/3478>
+    ///
+    /// Encodes the specified batch using several values of
+    /// `max_flight_data_size` between 1K to 5K and ensures that the
+    /// resulting size of the flight data stays within the limit
+    /// + `allowed_overage`
+    ///
+    /// `allowed_overage` is how far off the actual data encoding is
+    /// from the target limit that was set. It is an improvement when
+    /// the allowed_overage decreses.
+    ///
+    /// Note this overhead will likely always be greater than zero to
+    /// account for encoding overhead such as IPC headers and padding.
+    ///
+    ///
+    async fn verify_encoded_split(batch: RecordBatch, allowed_overage: usize) {
+        let num_rows = batch.num_rows();
+
+        // Track the overall required maximum overage
+        let mut max_overage_seen = 0;
+
+        for max_flight_data_size in [1024, 2021, 5000] {
+            println!("Encoding {num_rows} with a maximum size of 
{max_flight_data_size}");
+
+            let mut stream = FlightDataEncoderBuilder::new()
+                .with_max_flight_data_size(max_flight_data_size)
+                .build(futures::stream::iter([Ok(batch.clone())]));
+
+            let mut i = 0;
+            while let Some(data) = stream.next().await.transpose().unwrap() {
+                let actual_data_size = flight_data_size(&data);
+
+                let actual_overage = if actual_data_size > 
max_flight_data_size {
+                    actual_data_size - max_flight_data_size
+                } else {
+                    0
+                };
+
+                assert!(
+                    actual_overage <= allowed_overage,
+                    "encoded data[{i}]: actual size {actual_data_size}, \
+                         actual_overage: {actual_overage} \
+                         allowed_overage: {allowed_overage}"
+                );
+
+                i += 1;
+
+                max_overage_seen = max_overage_seen.max(actual_overage)
+            }
+        }
+
+        // ensure that the specified overage is exactly the maxmium than 
necessary

Review Comment:
   This comment is a little funky



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to