charlesdong1991 commented on code in PR #532:
URL: https://github.com/apache/fluss-rust/pull/532#discussion_r3204279272
##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -307,28 +308,32 @@ impl RecordAccumulator {
None
};
- let (dq, compression_ratio_estimator) = {
- let mut binding =
- self.write_batches
- .entry(Arc::clone(physical_table_path))
- .or_insert_with(|| BucketAndWriteBatches {
- table_id: table_info.table_id,
+ let (dq, compression_ratio_estimator, dynamic_target) = {
+ let mut binding = self
+ .write_batches
+ .entry(Arc::clone(physical_table_path))
+ .or_insert_with(|| {
+ BucketAndWriteBatches::new(
+ table_info.table_id,
is_partitioned_table,
partition_id,
- batches: Default::default(),
- compression_ratio_estimator: Arc::new(
- ArrowCompressionRatioEstimator::default(),
- ),
- });
+ &self.config,
+ )
+ });
let bucket_and_batches = binding.value_mut();
let dq = bucket_and_batches
.batches
.entry(bucket_id)
.or_insert_with(|| Arc::new(Mutex::new(VecDeque::new())))
.clone();
+ let dynamic_target = self
+ .config
+ .writer_dynamic_batch_size_enabled
+ .then(||
bucket_and_batches.dynamic_batch_size.lock().current());
Review Comment:
given how we use `dynamic_batch_size` here, i wonder if AtomicUsize will be
a better option here than Mutex to avoid locking?
##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -938,6 +950,29 @@ struct BucketAndWriteBatches {
batches: HashMap<BucketId, Arc<Mutex<VecDeque<WriteBatch>>>>,
/// Compression ratio estimator shared across Arrow log batches for this
table.
compression_ratio_estimator: Arc<ArrowCompressionRatioEstimator>,
+ dynamic_batch_size: Mutex<DynamicWriteBatchSizeEstimator>,
+}
+
+impl BucketAndWriteBatches {
+ fn new(
+ table_id: TableId,
+ is_partitioned_table: bool,
+ partition_id: Option<PartitionId>,
+ config: &Config,
+ ) -> Self {
+ let estimator = DynamicWriteBatchSizeEstimator::new(
Review Comment:
so this estimator is always being constructed no matter the dynamic setting
is true or false?
##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -690,6 +692,15 @@ impl RecordAccumulator {
self.incomplete_batches.write().remove(&batch_id);
}
+ fn record_actual_batch_size(&self, table_path: &Arc<PhysicalTablePath>,
actual: usize) {
+ if !self.config.writer_dynamic_batch_size_enabled {
+ return;
+ }
+ if let Some(entry) = self.write_batches.get(table_path) {
+ entry.dynamic_batch_size.lock().update(actual);
Review Comment:
Nit: i wonder in production environment, would it be helpful to see
tracing/debugging message when value changes since it might be quite important
change that impacts performance?
##########
crates/fluss/src/client/write/accumulator.rs:
##########
@@ -1563,4 +1598,77 @@ mod tests {
.await
.expect("notified should complete after wakeup_sender");
}
+
+ #[test]
+ fn dynamic_batch_size_shrinks_after_small_drained_batch() {
Review Comment:
Nit: can we add a complementary test to verify grow direction end-to-end?
aka, it shrinks estimate first, so drain small batches, then append that
drained batch goes beyond 80% of target, and then we should verify that next
allocation will increase?
this is very minor, we can ignore too
##########
website/docs/user-guide/cpp/api-reference.md:
##########
@@ -21,7 +21,9 @@ Complete API reference for the Fluss C++ client.
| `writer_request_max_size` | `int32_t` | `10485760` (10 MB)
| Maximum request size in bytes
|
| `writer_acks` | `std::string` | `"all"`
| Acknowledgment setting (`"all"`, `"0"`, `"1"`, or `"-1"`)
|
| `writer_retries` | `int32_t` | `INT32_MAX`
| Number of retries on failure
|
-| `writer_batch_size` | `int32_t` | `2097152` (2 MB)
| Batch size for writes in bytes
|
+| `writer_batch_size` | `int32_t` | `2097152` (2 MB)
| Batch size for writes in bytes (also the upper bound when dynamic sizing is
enabled) |
Review Comment:
Feel a little bit counterintuitive from user perspective to have
`writer_batch_size` for max, while `writer_batch_size_min` for min 😅 but very
personal feeling.
Nit: on doc itself, maybe clearer to mention it is default batch size when
dynamic sizing is disabled?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]