924060929 commented on PR #62942: URL: https://github.com/apache/doris/pull/62942#issuecomment-4666633154
Thanks for the work here — the low-memory load idea is valuable. One note for reviewers: the diff actually contains **three** independent mechanisms toward the same goal (reducing peak load memory), but the current description only documents the first. Since #2 is default-on and a large part of the change, it'd help a lot to cover all three. Here's a suggested description in a **Background → Approach → Implementation** structure — feel free to use or adapt: > ## Overview > > This PR reduces peak memory usage during load for random-distribution / wide / duplicate-without-key tables. It bundles three independent mechanisms: > > | Mechanism | Switch | Default | Scope | > |---|---|---|---| > | ① Low-memory load (direct rowset write + early column-page flush) | table property `enable_low_memory_load` + BE `column_writer_page_flush_threshold` (4MB) | off (per-table opt-in, DUP-without-key only) | V1 tablet-writer path | > | ② Adaptive random bucket (receiver-side) | FE `enable_adaptive_random_bucket_load` + BE `enable_adaptive_random_bucket_load_bucket_rotation` | on | cloud + random-distribution tables only | > | ③ Memtable flush-queue backpressure | BE `enable_memtable_flush_queue_backpressure` | off | all memtable-based loads | > > --- > > ### ① `enable_low_memory_load`: direct rowset write + early column-page flush > > **Background.** Segment V2 writes column by column. Each column writer first builds data pages in memory and keeps them in its internal page list until the column is finalized. For wide, duplicate-without-key tables this means that while column N is being written, the already-materialized pages of columns 1..N-1 stay resident; peak memory grows with column count × buffered pages per column × page size, and is worst for large variable-length (`STRING`/`VARCHAR`) columns and large batches. In addition, the normal path first buffers rows in a memtable for batching/sort, but a duplicate-without-key table needs neither sort nor aggregation, so the memtable layer is pure overhead for it. > > **Approach.** Add an opt-in table property `enable_low_memory_load` for duplicate-without-key tables. When enabled: (1) skip the memtable and write blocks directly into the rowset writer / segment (safe because there is no sort/aggregation — hence the strict DUP-without-key restriction); (2) inside segment writing, once a column's buffered pages reach `column_writer_page_flush_threshold` (default 4MB) **and** memory has hit the hard limit, flush those pages to the file writer immediately instead of holding them until finalize. Only the V1 tablet-writer path is covered; the memtable-on-sink (V2) path keeps existing behavior. > > **Implementation.** > - FE: `PropertyAnalyzer.analyzeEnableLowMemoryLoad`; validation in `InternalCatalog.createOlapTable` / `SchemaChangeHandler` (must be `DUP_KEYS` with no key columns); `OlapTable`/`TableProperty` get/set/build; `Env` for SHOW CREATE TABLE; `OlapTableSink.init` sets `TOlapTableSink.enable_low_memory_load`. > - BE propagation: `VNodeChannel::_open_internal` → `PTabletWriterOpenRequest.enable_low_memory_load` → `WriteRequest` → `RowsetWriterContext` → `ColumnWriterOptions`. > - BE direct write: `BaseDeltaWriter` skips creating `MemTableWriter`; `DeltaWriter::write` / `CloudDeltaWriter::write` → `_write_directly_to_rowset()` (clone selected columns + `add_rows` + `rowset_writer()->add_block()`); `close()` → `rowset_writer()->flush()`. > - BE early flush: `ScalarColumnWriter::_flush_pages_if_needed()`, called at the end of `finish_current_page()`. > > --- > > ### ② `enable_adaptive_random_bucket_load`: receiver-side random bucket > > **Background.** For random-distribution tables, today the **sender** (the post-planning VTabletWriter) round-robins each batch onto a tablet (`FIND_TABLET_EVERY_BATCH`), so a partition's data is scattered across all of its buckets. As a result each BE concurrently holds active writers/memtables for many buckets of that partition (high memory), and each batch fans out to many BEs (network amplification). In cloud mode we'd rather let the BE where data lands decide which bucket to write. > > **Approach.** Move the bucket decision from sender to receiver: the sender sends only `partition_id` (no `tablet_id`); each receiver BE writes to one bucket it owns (holds the primary replica of) and rotates to the next local bucket after a memtable flush, so each BE has only one active writer per partition at a time. FE computes, for each (sink BE, partition), the starting bucket and the ordered local bucket list and ships them down; the receiver maintains a small state machine for current-tablet + flush rotation. Auto-partition partitions created at runtime are computed on the fly inside the create/replace-partition RPC. Default-on, but only for cloud + random-distribution tables. > > **Implementation.** > - FE flag: `OlapTableSink.init` sets `enable_adaptive_random_bucket` (cloud + `Config.enable_adaptive_random_bucket_load` + `RandomDistributionInfo`). > - FE bucket-assignment algorithm (new static methods in `OlapTableSink`): `buildBeToBucketSeqs` derives each bucket's owner BE from tablet locations; `targetBucketNum = min(min(#sinkBE, #bucket), max(planFragmentNum, 1))`; balancing via `load_tablet_idx` rotation + `selectLeastUsedBucketSeq`; `applyAdaptiveRandomBucketAssignments` writes `load_tablet_idx`/`bucket_be_id`/`local_bucket_seqs` onto a per-BE deep-copied sink. Entry points: legacy `Coordinator.assignAdaptiveRandomBucketForFragment`, Nereids `ThriftPlansBuilder.assignAdaptiveRandomBucketForSinkParams`. > - BE sender: `VTabletWriter::_init` adds `FIND_TABLET_RANDOM_BUCKET` (highest priority); in that mode `OlapTabletFinder`/`VRowDistribution` skip tablet-id computation; `Payload` carries `RowPartTabletIds*` (partition_ids); `IndexChannel` builds `_channels_by_partition` and routes payloads to the owner channel; `VNodeChannel` sends per-row `partition_ids` plus the per-partition ordered tablet list at open. > - BE receiver: new proto fields (`is_receiver_side_random_bucket`, `random_bucket_partitions`, `PRandomBucketPartitionParam`); `_init_receiver_side_random_bucket_state` builds `AdaptiveRandomBucketState`; `add_batch` groups rows by partition → `_write_block_data_for_receiver_side_random_bucket` takes a per-partition lock, writes to `current_tablet()`, and calls `rotate_by_tablet()` on `memtable_flushed` (gated by `enable_adaptive_random_bucket_load_bucket_rotation`). > - Auto-partition: `FrontendServiceImpl.createPartition/replacePartition` set `load_tablet_idx` for RANDOM partitions and re-run the assignment via `collectAdaptiveBucketSinkContext` (looks up the coordinator by `query_id` to get the sink BE set). > - **Cross-cutting change (affects every load path, not just the new modes):** a `memtable_flushed` out-param threaded through `MemTableWriter::write` → `DeltaWriter::write` → `TabletsChannel`, and `MemTableWriter` switched to lazy memtable creation (created on first write / after flush instead of in `init()`). Called out separately because it's on the hot path for all loads. > > --- > > ### ③ `enable_memtable_flush_queue_backpressure`: flush-queue backpressure > > **Background.** `MemTableMemoryLimiter` currently throttles only on memory (soft/hard limit). But the flush thread pool can back up — memory hasn't hit the limit yet, but the flush queue is already long, and continuing to admit writes only grows the backlog. A memory-only view can't see this. > > **Approach.** Add the workload group's flush-pool queue length as a second backpressure signal: when the queue exceeds the threshold, hang/flush even if memory limits aren't reached. Default-off, introduced conservatively. > > **Implementation.** BE config `enable_memtable_flush_queue_backpressure` (default false). `MemTableMemoryLimiter::handle_memtable_flush(cancel_check, WorkloadGroup* wg)` adds a `check_queue_overloaded` check (`wg->get_memtable_flush_pool()->get_queue_size() > kQueueThreshold`) to both the entry condition and the wait loop; `hard_limit_reached()` is exposed as public (used by ①'s `_flush_pages_if_needed`). Callers in `load_channel_mgr.cpp` and `vtablet_writer_v2.cpp` pass the workload group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
