Rich-T-kid commented on code in PR #10122:
URL: https://github.com/apache/arrow-rs/pull/10122#discussion_r3398258973


##########
arrow-ipc/benches/ipc_writer.rs:
##########
@@ -69,6 +72,119 @@ fn criterion_benchmark(c: &mut Criterion) {
             writer.finish().unwrap();
         })
     });
+
+    group.bench_function("StreamWriter/write_10/dict", |b| {
+        let batches = create_unique_dict_batches(10, 8192);
+        let schema = batches[0].schema();
+        let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
+        b.iter(move || {
+            buffer.clear();
+            let mut writer = StreamWriter::try_new(&mut buffer, 
schema.as_ref()).unwrap();
+            for batch in &batches {
+                writer.write(batch).unwrap();
+            }
+            writer.finish().unwrap();
+        })
+    });
+
+    group.bench_function("StreamWriter/write_10/dict/delta", |b| {
+        let batches = create_delta_dict_batches(10, 8192);
+        let schema = batches[0].schema();
+        let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
+        let options =
+            
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
+
+        b.iter(move || {
+            buffer.clear();
+
+            let mut writer =
+                StreamWriter::try_new_with_options(&mut buffer, 
schema.as_ref(), options.clone())
+                    .unwrap();
+
+            for batch in &batches {
+                writer.write(batch).unwrap();
+            }
+
+            writer.finish().unwrap();
+        })
+    });
+
+    // The file writer rejects dictionary replacement, so only the delta case 
is
+    // exercised here (growing dictionaries that are prefixes of one another).
+    group.bench_function("FileWriter/write_10/dict/delta", |b| {
+        let batches = create_delta_dict_batches(10, 8192);
+        let schema = batches[0].schema();
+        let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
+        let options =
+            
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
+
+        b.iter(move || {
+            buffer.clear();
+
+            let mut writer =
+                FileWriter::try_new_with_options(&mut buffer, schema.as_ref(), 
options.clone())
+                    .unwrap();
+
+            for batch in &batches {
+                writer.write(batch).unwrap();
+            }
+
+            writer.finish().unwrap();
+        })
+    });
+}
+
+/// Build `n` record batches with a single dictionary column whose dictionary
+/// grows across batches. A single builder is reused with 
`finish_preserve_values`
+/// so each batch's dictionary has the previous batch's as a prefix which 
allows
+/// us to emit deltas.
+fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec<RecordBatch> {
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "d0",
+        DataType::Dictionary(Box::new(DataType::UInt32), 
Box::new(DataType::Utf8)),
+        false,
+    )]));
+    let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
+
+    let mut batches = Vec::with_capacity(n);
+    for i in 0..n {
+        // 3/4 of the rows reuse values shared by every batch, the other 1/4
+        // introduce values unique to this batch which extends the dictionary.
+        for r in 0..num_rows {
+            if r < num_rows / 4 {
+                builder.append_value(format!("batch {i} value {}", r));
+            } else {
+                builder.append_value(format!("shared {r}"));
+            }
+        }
+
+        // Preserve the values builder so the dictionary accumulates across 
batches.
+        let dict = builder.finish_preserve_values();
+        batches.push(RecordBatch::try_new(schema.clone(), 
vec![Arc::new(dict)]).unwrap());
+    }
+
+    batches
+}
+
+/// Build `n` record batches each with a completely distinct dictionary for 
each batch.
+fn create_unique_dict_batches(n: usize, num_rows: usize) -> Vec<RecordBatch> {
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "d0",
+        DataType::Dictionary(Box::new(DataType::UInt32), 
Box::new(DataType::Utf8)),
+        false,
+    )]));
+
+    let mut batches = Vec::with_capacity(n);
+    for i in 0..n {
+        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
+        for r in 0..num_rows {
+            builder.append_value(format!("batch {i} value {}", r % (num_rows / 
2)));

Review Comment:
   For an 8k batch size there will be 4k unique values, and for 64k there will 
be 32k unique values. I don't think this alone is the right way to benchmark 
dictionaries. Dictionaries are focused on low cardinality, so it makes more 
sense to parameterize benchmarks by target cardinality. For example, (5%, 10%, 
25%, 50%) unique values relative to batch size.
   The implementation should also grow with the number of unique values, since 
that's the point of the [IPC format in delta 
mode](https://arrow.apache.org/docs/format/Columnar.html#dictionary-messages):
   
   > A dictionary batch with isDelta set indicates that its vector should be 
concatenated with those of any previous batches with the same id.
   
   Varying cardinality helps detect whether the encoder is doing O(N) work 
(proportional to total rows) when it should be doing O(K) work (proportional to 
unique values).



##########
arrow-ipc/benches/ipc_writer.rs:
##########
@@ -69,6 +72,119 @@ fn criterion_benchmark(c: &mut Criterion) {
             writer.finish().unwrap();
         })
     });
+
+    group.bench_function("StreamWriter/write_10/dict", |b| {
+        let batches = create_unique_dict_batches(10, 8192);
+        let schema = batches[0].schema();
+        let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
+        b.iter(move || {
+            buffer.clear();
+            let mut writer = StreamWriter::try_new(&mut buffer, 
schema.as_ref()).unwrap();
+            for batch in &batches {
+                writer.write(batch).unwrap();
+            }
+            writer.finish().unwrap();
+        })
+    });
+
+    group.bench_function("StreamWriter/write_10/dict/delta", |b| {
+        let batches = create_delta_dict_batches(10, 8192);
+        let schema = batches[0].schema();
+        let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
+        let options =
+            
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
+
+        b.iter(move || {
+            buffer.clear();
+
+            let mut writer =
+                StreamWriter::try_new_with_options(&mut buffer, 
schema.as_ref(), options.clone())
+                    .unwrap();
+
+            for batch in &batches {
+                writer.write(batch).unwrap();
+            }
+
+            writer.finish().unwrap();
+        })
+    });
+
+    // The file writer rejects dictionary replacement, so only the delta case 
is
+    // exercised here (growing dictionaries that are prefixes of one another).
+    group.bench_function("FileWriter/write_10/dict/delta", |b| {
+        let batches = create_delta_dict_batches(10, 8192);
+        let schema = batches[0].schema();
+        let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
+        let options =
+            
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
+
+        b.iter(move || {
+            buffer.clear();
+
+            let mut writer =
+                FileWriter::try_new_with_options(&mut buffer, schema.as_ref(), 
options.clone())
+                    .unwrap();
+
+            for batch in &batches {
+                writer.write(batch).unwrap();
+            }
+
+            writer.finish().unwrap();
+        })
+    });
+}
+
+/// Build `n` record batches with a single dictionary column whose dictionary
+/// grows across batches. A single builder is reused with 
`finish_preserve_values`
+/// so each batch's dictionary has the previous batch's as a prefix which 
allows
+/// us to emit deltas.
+fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec<RecordBatch> {
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "d0",
+        DataType::Dictionary(Box::new(DataType::UInt32), 
Box::new(DataType::Utf8)),
+        false,
+    )]));
+    let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
+
+    let mut batches = Vec::with_capacity(n);
+    for i in 0..n {
+        // 3/4 of the rows reuse values shared by every batch, the other 1/4
+        // introduce values unique to this batch which extends the dictionary.
+        for r in 0..num_rows {

Review Comment:
   Similar comment to before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to