Copilot commented on code in PR #61385:
URL: https://github.com/apache/doris/pull/61385#discussion_r2939574984
##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -1395,6 +1396,9 @@ public enum IgnoreSplitType {
@VariableMgr.VarAttr(name = ENABLE_LEFT_SEMI_DIRECT_RETURN_OPT)
public boolean enableLeftSemiDirectReturnOpt = true;
+ @VariableMgr.VarAttr(name = ENABLE_CONDITION_CACHE)
+ public boolean enableConditionCache = true;
+
Review Comment:
`enableConditionCache` defaults to `true`, which effectively enables
condition caching (and digest propagation) for all sessions by default. Given
this is a new execution-cache feature, this default is likely too aggressive
and can change query behavior/performance globally; consider defaulting it to
`false` and requiring explicit enablement via session variable.
##########
be/src/olap/rowset/segment_v2/segment_iterator.cpp:
##########
@@ -2297,6 +2364,18 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
auto type = idx_to_datatype.find(idx)->second;
block->replace_by_position(idx, type->create_column());
}
+
+ if (_opts.condition_cache_digest && !_find_condition_cache) {
+ auto* condition_cache = ConditionCache::instance();
+ ConditionCache::CacheKey cache_key(_opts.rowset_id,
_segment->id(),
+
_opts.condition_cache_digest);
+ VLOG_DEBUG << "Condition cache insert, query id: "
+ << print_id(_opts.runtime_state->query_id())
+ << ", rowset id: " <<
_opts.rowset_id.to_string()
+ << ", segment id: " << _segment->id()
+ << ", cache digest: " <<
_opts.condition_cache_digest;
Review Comment:
The condition-cache insert log path unconditionally dereferences
`_opts.runtime_state` (`_opts.runtime_state->query_id()`), but
`StorageReadOptions::runtime_state` can be null in non-query contexts. This can
crash when inserting the cache entry; guard the log (and any use of
runtime_state) with a null check like the hit-path does.
##########
be/src/pipeline/exec/scan_operator.h:
##########
@@ -430,7 +431,10 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
// Record the value of the aggregate function 'count' from doris's be
int64_t _push_down_count = -1;
const int _parallel_tasks = 0;
- std::vector<int> topn_filter_source_node_ids;
+
+ int _query_parallel_instance_num = 0;
+
Review Comment:
`_query_parallel_instance_num` is introduced but not referenced anywhere in
the pipeline scan operator code. If it’s not needed for this PR, consider
removing it to avoid dead state; if it is needed, wire it into the relevant
logic (or at least add a TODO/comment explaining how it will be used).
##########
regression-test/suites/query_p0/cache/condition_cache.groovy:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.stream.Collectors
+
+suite("condition_cache") {
+ def tableName = "table_condition_cache"
+ def joinTableName = "table_join_condition_cache"
+
+ def test = {
+ sql "set enable_condition_cache=false"
+ sql "set runtime_filter_type=0"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` int NULL,
+ `name` varchar(50) NULL,
+ `age` int NULL,
+ `score` double NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `name`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "min_load_replica_num" = "-1",
+ "is_being_synced" = "false",
+ "storage_medium" = "hdd",
+ "storage_format" = "V2",
+ "inverted_index_storage_format" = "V3",
+ "light_schema_change" = "true",
+ "disable_auto_compaction" = "false",
+ "enable_single_replica_compaction" = "false"
+ )
+ """
+
+ sql """
+ INSERT INTO ${tableName}(id, name, age, score)
+ VALUES
+ (1, "Alice", 25, 85.5),
+ (2, "Bob", 30, 90.0),
+ (3, "Charlie", 22, 75.5),
+ (4, "David", 28, 92.0),
+ (5, "Eve", 26, 88.0)
+ """
+
+ // Create join table
+ sql """ DROP TABLE IF EXISTS ${joinTableName} """
+ sql """
+ CREATE TABLE ${joinTableName} (
+ `id` int NULL,
+ `department` varchar(50) NULL,
+ `position` varchar(50) NULL,
+ `salary` double NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`, `department`)
+ DISTRIBUTED BY HASH(`id`) BUCKETS 10
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "min_load_replica_num" = "-1",
+ "is_being_synced" = "false",
+ "storage_medium" = "hdd",
+ "storage_format" = "V2",
+ "inverted_index_storage_format" = "V3",
+ "light_schema_change" = "true",
+ "disable_auto_compaction" = "false",
+ "enable_single_replica_compaction" = "false"
+ )
+ """
+
+ sql """
+ INSERT INTO ${joinTableName}(id, department, position, salary)
+ VALUES
+ (1, "Engineering", "Developer", 100000),
+ (2, "Marketing", "Manager", 120000),
+ (3, "HR", "Specialist", 80000),
+ (4, "Engineering", "Senior Developer", 140000),
+ (5, "Finance", "Analyst", 95000)
+ """
+
+ // First query with WHERE condition - Run without cache
+ order_qt_condition_cache1 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE age > 25 AND score > 85
+ """
+
+ // Second query with different WHERE condition - Run without cache
+ order_qt_condition_cache2 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE name LIKE 'A%' OR score < 80
+ """
+
+ // Enable condition cache
+ sql "set enable_condition_cache=true"
+
+ // Run the same first query with cache enabled
+ order_qt_condition_cache3 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE age > 25 AND score > 85
+ """
+
+ // Run the same second query with cache enabled
+ order_qt_condition_cache4 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE name LIKE 'A%' OR score < 80
+ """
+
+ // Run both queries again to test cache hit
+ order_qt_condition_cache5 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE age > 25 AND score > 85
+ """
+
+ order_qt_condition_cache6 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE name LIKE 'A%' OR score < 80
+ """
+
+ // Test delete operation impact on condition cache
+ // Delete some data
+ sql "DELETE FROM ${tableName} WHERE age = 30" // Delete Bob's record
+
+ // Run the same queries after delete to see if cache is invalidated
+ order_qt_condition_delete1 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE age > 25 AND score > 85
+ """
+
+ order_qt_condition_delete2 """
+ SELECT
+ id,
+ name,
+ age,
+ score
+ FROM ${tableName}
+ WHERE name LIKE 'A%' OR score < 80
+ """
+
+ // rebuild table to skip the delete operation
+ sql "create table temp like ${tableName}"
+ sql "insert into temp select * from ${tableName}"
+ sql "drop table ${tableName}"
+ sql "alter table temp rename ${tableName}"
Review Comment:
The temporary table name `temp` is very generic and could collide with other
tests (or with leftover tables from a failed run). Use a name derived from
`tableName` (e.g. `${tableName}_tmp`) or include the suite name in the temp
table name.
##########
gensrc/thrift/PaloInternalService.thrift:
##########
@@ -265,7 +265,7 @@ struct TQueryOptions {
91: optional bool runtime_filter_wait_infinitely = false;
- 92: optional i32 wait_full_block_schedule_times = 1; // deprecated
+ 92: optional i32 condition_cache_digest = 0;
Review Comment:
Thrift field id `92` is being reused for `condition_cache_digest`. Even if
the previous field was marked deprecated, reusing an existing id is
wire-incompatible (older FEs/BEs may still send/interpret it), and can cause
unintended behavior when mixing versions. Please add new fields with new ids
and keep deprecated ids reserved.
##########
be/src/olap/parallel_scanner_builder.cpp:
##########
@@ -192,23 +196,65 @@ Status
ParallelScannerBuilder::_build_scanners_by_segment(std::list<ScannerSPtr>
continue;
}
- // Build scanners for [i, i+1) segment range, without row-range
slicing.
- for (int64_t i = 0; i < rowset->num_segments(); ++i) {
- RowSetSplits split(reader->clone());
- split.segment_offsets.first = i;
- split.segment_offsets.second = i + 1;
- // No row-ranges slicing; scan whole segment i.
- DCHECK_GE(split.segment_offsets.second,
split.segment_offsets.first + 1);
+ int64_t segment_start = 0;
+ auto split = RowSetSplits(reader->clone());
+
+ for (size_t i = 0; i < segments_rows.size(); ++i) {
+ const size_t rows_of_segment = segments_rows[i];
- TabletReadSource partitial_read_source;
+ // Check if adding this segment would exceed rows_per_scanner
+ // 0.9: try to avoid splitting the segments into excessively
small parts.
+ if (rows_collected > 0 && (rows_collected + rows_of_segment >
_rows_per_scanner &&
+ rows_collected < _rows_per_scanner
* 9 / 10)) {
+ // Create a new scanner with collected segments
+ split.segment_offsets.first = segment_start;
+ split.segment_offsets.second =
+ i; // Range is [segment_start, i), including all
segments from segment_start to i-1
+
+ DCHECK_GT(split.segment_offsets.second,
split.segment_offsets.first);
+
+
partitial_read_source.rs_splits.emplace_back(std::move(split));
Review Comment:
In `_build_scanners_by_segment()`, the flush condition can trigger when `i
== segment_start` (e.g., at the start of a new rowset while `rows_collected`
already has previous rowsets’ rows). That sets `segment_offsets` to `[0, 0)`
and hits `DCHECK_GT`, or builds an invalid empty split in release. When
flushing an existing `partitial_read_source`, avoid appending an empty
`RowSetSplits` (flush before starting the next rowset, or only flush when `i >
segment_start`).
##########
regression-test/suites/query_p0/cache/condition_cache.groovy:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.stream.Collectors
+
+suite("condition_cache") {
+ def tableName = "table_condition_cache"
+ def joinTableName = "table_join_condition_cache"
Review Comment:
This suite uses generic table names (`table_condition_cache`,
`table_join_condition_cache`) which increases the risk of collisions when
regression suites run concurrently. Other cache suites in this folder use long
unique table names to avoid that; consider including the suite name + a unique
suffix in these table names as well.
##########
be/src/olap/rowset/segment_v2/condition_cache.cpp:
##########
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/segment_v2/condition_cache.h"
+
+#include <memory>
+
+#include "util/defer_op.h"
+
+namespace doris::segment_v2 {
+
+bool ConditionCache::lookup(const CacheKey& key, ConditionCacheHandle* handle)
{
+ if (key.encode().empty()) {
+ return false;
+ }
+ auto* lru_handle = LRUCachePolicy::lookup(key.encode());
+ if (lru_handle == nullptr) {
+ return false;
+ }
+ *handle = ConditionCacheHandle(this, lru_handle);
+ return true;
+}
+
+void ConditionCache::insert(const CacheKey& key,
std::shared_ptr<std::vector<bool>> result) {
+ if (key.encode().empty()) {
+ return;
+ }
+ std::unique_ptr<ConditionCache::CacheValue> cache_value_ptr =
+ std::make_unique<ConditionCache::CacheValue>();
+ cache_value_ptr->filter_result = result;
+
+ ConditionCacheHandle(
+ this,
+ LRUCachePolicy::insert(key.encode(),
(void*)cache_value_ptr.release(),
+ result->capacity(), result->capacity(),
CachePriority::NORMAL));
Review Comment:
`ConditionCache::insert()` uses `result->capacity()` as both `charge` and
`value_tracking_bytes`. For `std::vector<bool>`, `capacity()` is in bits (and
can exceed `size()`), so cache accounting/eviction will be wrong and may exceed
the configured memory limit. Compute the approximate bytes for the stored
bitmap (e.g. `(result->size() + 7) / 8`, or `result->capacity()/8` if you
really want capacity) and use that for `charge`/tracking.
##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -5013,6 +5018,9 @@ public TQueryOptions toThrift() {
tResult.setEnableBroadcastJoinForcePassthrough(enableBroadcastJoinForcePassthrough);
tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions);
tResult.setPartitionTopnPrePartitionRows(partitionTopNPerPartitionRows);
+ if (enableConditionCache) {
+
tResult.setConditionCacheDigest(getAffectQueryResultVariableHashCode());
+ }
Review Comment:
`toThrift()` sets `conditionCacheDigest` from
`getAffectQueryResultVariableHashCode()` (an `int`). If the intent is to seed a
64-bit digest and avoid collisions across session-variable sets, consider
producing a 64-bit hash (e.g., long/xxhash) and sending it over thrift as `i64`
instead of truncating to 32 bits.
##########
gensrc/thrift/PaloInternalService.thrift:
##########
@@ -368,7 +368,7 @@ struct TQueryOptions {
144: optional bool enable_inverted_index_searcher_cache = true;
145: optional bool enable_inverted_index_query_cache = true;
- 146: optional bool fuzzy_disable_runtime_filter_in_be = false; // deprecated
+ 146: optional bool enable_condition_cache = false; //deprecated
Review Comment:
Thrift field id `146` is being repurposed from a deprecated option to
`enable_condition_cache`. Reusing ids is wire-incompatible and can break
mixed-version clusters/clients even if the old field was deprecated. Add
`enable_condition_cache` under a new unused id and keep the old id reserved (or
keep the deprecated field with its original meaning).
##########
gensrc/thrift/PaloInternalService.thrift:
##########
@@ -265,7 +265,7 @@ struct TQueryOptions {
91: optional bool runtime_filter_wait_infinitely = false;
- 92: optional i32 wait_full_block_schedule_times = 1; // deprecated
+ 92: optional i32 condition_cache_digest = 0;
Review Comment:
`condition_cache_digest` is defined as `i32`, but the BE digest computation
and cache key use `uint64_t`. A 32-bit digest increases collision risk
(potentially reusing cached filtering results across sessions/settings that
should differ). Consider switching this field to `i64` (and setting a 64-bit
digest from FE) to reduce collision probability.
##########
be/src/olap/parallel_scanner_builder.h:
##########
@@ -96,7 +96,7 @@ class ParallelScannerBuilder {
std::map<RowsetId, std::vector<size_t>> _all_segments_rows;
// Force building one scanner per segment when true.
- bool _optimize_index_scan_parallelism {false};
+ bool _scan_parallelism_by_segment {false};
Review Comment:
The API/docs in this header still say "one scanner per segment"
(`_build_scanners_by_segment` comment and `_scan_parallelism_by_segment`
comment), but the implementation now groups segments by row count. Please
update these comments to match the new behavior to avoid misleading future
changes/debugging.
##########
regression-test/suites/query_p0/cache/condition_cache.groovy:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.stream.Collectors
Review Comment:
`import java.util.stream.Collectors` is unused in this suite. Removing
unused imports helps keep regression suites minimal and avoids confusion about
expected helper usage.
##########
be/src/olap/rowset/segment_v2/segment_iterator.cpp:
##########
@@ -113,6 +112,60 @@ namespace segment_v2 {
SegmentIterator::~SegmentIterator() = default;
+void SegmentIterator::_init_row_bitmap_by_condition_cache() {
+ // Only dispose need column predicate and expr cal in condition cache
+ if (!_col_predicates.empty() ||
+ (_enable_common_expr_pushdown && !_remaining_conjunct_roots.empty())) {
+ if (_opts.condition_cache_digest) {
+ auto* condition_cache = ConditionCache::instance();
+ ConditionCache::CacheKey cache_key(_opts.rowset_id, _segment->id(),
+ _opts.condition_cache_digest);
+
+ // Increment search count when digest != 0
+
DorisMetrics::instance()->condition_cache_search_count->increment(1);
+
+ ConditionCacheHandle handle;
+ _find_condition_cache = condition_cache->lookup(cache_key,
&handle);
+
+ // Increment hit count if cache lookup is successful
+ if (_find_condition_cache) {
+
DorisMetrics::instance()->condition_cache_hit_count->increment(1);
+ if (_opts.runtime_state) {
+ VLOG_DEBUG << "Condition cache hit, query id: "
+ << print_id(_opts.runtime_state->query_id())
+ << ", segment id: " << _segment->id()
+ << ", cache digest: " <<
_opts.condition_cache_digest
+ << ", rowset id: " <<
_opts.rowset_id.to_string();
+ }
+ }
+
+ auto num_rows = _segment->num_rows();
+ if (_find_condition_cache) {
+ const auto& filter_result = *(handle.get_filter_result());
+ int64_t filtered_blocks = 0;
+ for (int i = 0; i < filter_result.size(); i++) {
+ if (!filter_result[i]) {
+ _row_bitmap.removeRange(
+ i * CONDITION_CACHE_OFFSET,
+ i * CONDITION_CACHE_OFFSET +
CONDITION_CACHE_OFFSET);
+ filtered_blocks++;
+ }
+ }
+ // Record condition_cache hit segment number
+ _opts.stats->condition_cache_hit_seg_nums++;
+ // Record rows filtered by condition cache hit
+ _opts.stats->condition_cache_filtered_rows +=
+ filtered_blocks *
SegmentIterator::CONDITION_CACHE_OFFSET;
+ } else {
+ _condition_cache = std::make_shared<std::vector<bool>>(
+ num_rows / CONDITION_CACHE_OFFSET + 1, false);
Review Comment:
`_condition_cache` is sized as `num_rows / CONDITION_CACHE_OFFSET + 1`,
which creates an extra block when `num_rows` is an exact multiple of the
offset. That extra trailing `false` inflates
`filtered_blocks`/`condition_cache_filtered_rows` stats and wastes cache space.
Use a ceiling division (e.g. `(num_rows + offset - 1) / offset`) for the number
of blocks.
##########
be/src/olap/rowset/segment_v2/segment_iterator.cpp:
##########
@@ -2297,6 +2364,18 @@ Status SegmentIterator::next_batch(vectorized::Block*
block) {
auto type = idx_to_datatype.find(idx)->second;
block->replace_by_position(idx, type->create_column());
}
Review Comment:
`next_batch()` replaces virtual columns whenever `_next_batch_internal()`
returns `END_OF_FILE`, even when `block` already contains rows. This will
overwrite valid result columns with empty columns and can corrupt the last
batch. Gate the virtual-column replacement on `block->rows() == 0` (as before)
so only the final empty EOF block is patched.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]