Copilot commented on code in PR #60989:
URL: https://github.com/apache/doris/pull/60989#discussion_r2877340664
##########
be/src/pipeline/query_cache/query_cache.h:
##########
@@ -109,27 +109,76 @@ class QueryCache : public LRUCachePolicy {
static Status build_cache_key(const std::vector<TScanRangeParams>&
scan_ranges,
const TQueryCacheParam& cache_param,
std::string* cache_key,
int64_t* version) {
- if (scan_ranges.size() > 1) {
- return Status::InternalError(
- "CacheSourceOperator only support one scan range, plan
error");
+ if (scan_ranges.empty()) {
+ return Status::InternalError("scan_ranges is empty, plan error");
}
- auto& scan_range = scan_ranges[0];
- DCHECK(scan_range.scan_range.__isset.palo_scan_range);
- auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
-
- std::from_chars(scan_range.scan_range.palo_scan_range.version.data(),
- scan_range.scan_range.palo_scan_range.version.data() +
-
scan_range.scan_range.palo_scan_range.version.size(),
- *version);
-
- auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
- if (find_tablet == cache_param.tablet_to_range.end()) {
- return Status::InternalError("Not find tablet in
partition_to_tablets, plan error");
+
+ std::string digest;
+ try {
+ digest = cache_param.digest;
+ } catch (const std::exception&) {
+ return Status::InternalError("digest is invalid, plan error");
+ }
+ if (digest.empty()) {
+ return Status::InternalError("digest is empty, plan error");
+ }
+
+ if (cache_param.tablet_to_range.empty()) {
+ return Status::InternalError("tablet_to_range is empty, plan
error");
+ }
+
+ std::vector<int64_t> tablet_ids;
+ tablet_ids.reserve(scan_ranges.size());
+ for (const auto& scan_range : scan_ranges) {
+ auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
+ tablet_ids.push_back(tablet_id);
+ }
+ std::sort(tablet_ids.begin(), tablet_ids.end());
+
Review Comment:
`QueryCache::build_cache_key` now uses `std::sort` / `std::find_if` (and
`std::from_chars`) in this header, but the header doesn't include `<algorithm>`
(and should also explicitly include `<charconv>` if not guaranteed elsewhere).
Depending on include order this can break compilation; please add the required
standard headers here.
##########
be/src/pipeline/exec/cache_source_operator.cpp:
##########
@@ -60,8 +57,20 @@ Status CacheSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
// 2. build cache key by digest_tablet_id
RETURN_IF_ERROR(QueryCache::build_cache_key(scan_ranges, cache_param,
&_cache_key, &_version));
- custom_profile()->add_info_string(
- "CacheTabletId",
std::to_string(scan_ranges[0].scan_range.palo_scan_range.tablet_id));
+ std::vector<int64_t> cache_tablet_ids;
+ cache_tablet_ids.reserve(scan_ranges.size());
+ for (const auto& scan_range : scan_ranges) {
+
cache_tablet_ids.push_back(scan_range.scan_range.palo_scan_range.tablet_id);
+ }
+ std::sort(cache_tablet_ids.begin(), cache_tablet_ids.end());
+ std::string tablet_ids_str;
+ for (size_t i = 0; i < cache_tablet_ids.size(); ++i) {
+ tablet_ids_str += std::to_string(cache_tablet_ids[i]);
+ if (i < cache_tablet_ids.size() - 1) {
+ tablet_ids_str += ",";
+ }
+ }
+ custom_profile()->add_info_string("CacheTabletId", tablet_ids_str);
Review Comment:
`CacheTabletId` profile entry now contains a comma-separated list of
multiple tablet ids. Consider renaming the profile key to something plural
(e.g. `CacheTabletIds`) to avoid confusion when debugging profiles.
```suggestion
custom_profile()->add_info_string("CacheTabletIds", tablet_ids_str);
```
##########
be/test/pipeline/exec/query_cache_test.cpp:
##########
@@ -40,9 +40,24 @@ TEST_F(QueryCacheTest, create_global_cache) {
TEST_F(QueryCacheTest, build_cache_key) {
{
std::vector<TScanRangeParams> scan_ranges;
- scan_ranges.push_back({});
- scan_ranges.push_back({});
+ TScanRangeParams scan_range1;
+ TPaloScanRange palp_scan_range1;
+ palp_scan_range1.__set_tablet_id(1);
+ palp_scan_range1.__set_version("100");
Review Comment:
The variable name `palp_scan_range*` looks like a typo (should likely be
`palo_scan_range*` to match the Thrift type `TPaloScanRange`). Renaming would
improve readability and avoid confusion in future test edits.
##########
be/src/pipeline/query_cache/query_cache.h:
##########
@@ -109,27 +109,76 @@ class QueryCache : public LRUCachePolicy {
static Status build_cache_key(const std::vector<TScanRangeParams>&
scan_ranges,
const TQueryCacheParam& cache_param,
std::string* cache_key,
int64_t* version) {
- if (scan_ranges.size() > 1) {
- return Status::InternalError(
- "CacheSourceOperator only support one scan range, plan
error");
+ if (scan_ranges.empty()) {
+ return Status::InternalError("scan_ranges is empty, plan error");
}
- auto& scan_range = scan_ranges[0];
- DCHECK(scan_range.scan_range.__isset.palo_scan_range);
- auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
-
- std::from_chars(scan_range.scan_range.palo_scan_range.version.data(),
- scan_range.scan_range.palo_scan_range.version.data() +
-
scan_range.scan_range.palo_scan_range.version.size(),
- *version);
-
- auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
- if (find_tablet == cache_param.tablet_to_range.end()) {
- return Status::InternalError("Not find tablet in
partition_to_tablets, plan error");
+
+ std::string digest;
+ try {
+ digest = cache_param.digest;
+ } catch (const std::exception&) {
+ return Status::InternalError("digest is invalid, plan error");
+ }
+ if (digest.empty()) {
+ return Status::InternalError("digest is empty, plan error");
+ }
+
+ if (cache_param.tablet_to_range.empty()) {
+ return Status::InternalError("tablet_to_range is empty, plan
error");
+ }
+
+ std::vector<int64_t> tablet_ids;
+ tablet_ids.reserve(scan_ranges.size());
+ for (const auto& scan_range : scan_ranges) {
+ auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
+ tablet_ids.push_back(tablet_id);
+ }
+ std::sort(tablet_ids.begin(), tablet_ids.end());
+
+ int64_t first_version = -1;
+ std::string first_tablet_range;
+ for (size_t i = 0; i < tablet_ids.size(); ++i) {
+ auto tablet_id = tablet_ids[i];
+
+ auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
+ if (find_tablet == cache_param.tablet_to_range.end()) {
+ return Status::InternalError("Not find tablet in
partition_to_tablets, plan error");
+ }
+
+ auto scan_range_iter =
+ std::find_if(scan_ranges.begin(), scan_ranges.end(),
+ [&tablet_id](const TScanRangeParams& range) {
+ return
range.scan_range.palo_scan_range.tablet_id == tablet_id;
+ });
+ int64_t current_version = -1;
+
std::from_chars(scan_range_iter->scan_range.palo_scan_range.version.data(),
+
scan_range_iter->scan_range.palo_scan_range.version.data() +
+
scan_range_iter->scan_range.palo_scan_range.version.size(),
+ current_version);
Review Comment:
`std::from_chars(...)` parse result is ignored. If `version` is
empty/non-numeric/partially-numeric, `current_version` can remain -1 (or parse
partially) and still be treated as valid, leading to wrong cache versioning.
Please check the returned `std::from_chars_result` (ec/ptr) and fail fast on
parse errors.
```suggestion
const auto& version_str =
scan_range_iter->scan_range.palo_scan_range.version;
const char* version_begin = version_str.data();
const char* version_end = version_begin + version_str.size();
auto parse_result = std::from_chars(version_begin, version_end,
current_version);
if (parse_result.ec != std::errc() || parse_result.ptr !=
version_end) {
return Status::InternalError("tablet version is invalid,
plan error");
}
```
##########
fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanSingleOlapTableJobTest.java:
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import org.apache.doris.catalog.LocalTablet;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.thrift.TPaloScanRange;
+import org.apache.doris.thrift.TQueryCacheParam;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeParams;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class UnassignedScanSingleOlapTableJobTest {
+ @Test
+ public void testQueryCacheAssignByPartition() {
+ ConnectContext connectContext = new ConnectContext();
+ connectContext.setThreadLocalInfo();
+ connectContext.setQueryId(new TUniqueId(1, 1));
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+ StatementContext statementContext = new StatementContext(
Review Comment:
These tests set the thread-local `ConnectContext` via `setThreadLocalInfo()`
but never clear it (e.g. `ConnectContext.remove()`). Many other FE tests clean
up the thread-local to avoid cross-test interference; please add an
`@AfterEach` (or finally block) to remove the context.
##########
be/src/pipeline/query_cache/query_cache.h:
##########
@@ -109,27 +109,76 @@ class QueryCache : public LRUCachePolicy {
static Status build_cache_key(const std::vector<TScanRangeParams>&
scan_ranges,
const TQueryCacheParam& cache_param,
std::string* cache_key,
int64_t* version) {
- if (scan_ranges.size() > 1) {
- return Status::InternalError(
- "CacheSourceOperator only support one scan range, plan
error");
+ if (scan_ranges.empty()) {
+ return Status::InternalError("scan_ranges is empty, plan error");
}
- auto& scan_range = scan_ranges[0];
- DCHECK(scan_range.scan_range.__isset.palo_scan_range);
- auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
-
- std::from_chars(scan_range.scan_range.palo_scan_range.version.data(),
- scan_range.scan_range.palo_scan_range.version.data() +
-
scan_range.scan_range.palo_scan_range.version.size(),
- *version);
-
- auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
- if (find_tablet == cache_param.tablet_to_range.end()) {
- return Status::InternalError("Not find tablet in
partition_to_tablets, plan error");
+
+ std::string digest;
+ try {
+ digest = cache_param.digest;
+ } catch (const std::exception&) {
+ return Status::InternalError("digest is invalid, plan error");
+ }
+ if (digest.empty()) {
+ return Status::InternalError("digest is empty, plan error");
+ }
+
+ if (cache_param.tablet_to_range.empty()) {
+ return Status::InternalError("tablet_to_range is empty, plan
error");
+ }
+
+ std::vector<int64_t> tablet_ids;
+ tablet_ids.reserve(scan_ranges.size());
+ for (const auto& scan_range : scan_ranges) {
+ auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
+ tablet_ids.push_back(tablet_id);
+ }
Review Comment:
`QueryCache::build_cache_key` reads
`scan_range.scan_range.palo_scan_range.*` without checking
`scan_range.scan_range.__isset.palo_scan_range` (the previous DCHECK is gone).
If a non-palo scan range reaches here this will build an invalid key / version;
please validate the field is set for every range and return a clear error
instead of assuming it.
##########
be/src/pipeline/query_cache/query_cache.h:
##########
@@ -109,27 +109,76 @@ class QueryCache : public LRUCachePolicy {
static Status build_cache_key(const std::vector<TScanRangeParams>&
scan_ranges,
const TQueryCacheParam& cache_param,
std::string* cache_key,
int64_t* version) {
- if (scan_ranges.size() > 1) {
- return Status::InternalError(
- "CacheSourceOperator only support one scan range, plan
error");
+ if (scan_ranges.empty()) {
+ return Status::InternalError("scan_ranges is empty, plan error");
}
- auto& scan_range = scan_ranges[0];
- DCHECK(scan_range.scan_range.__isset.palo_scan_range);
- auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
-
- std::from_chars(scan_range.scan_range.palo_scan_range.version.data(),
- scan_range.scan_range.palo_scan_range.version.data() +
-
scan_range.scan_range.palo_scan_range.version.size(),
- *version);
-
- auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
- if (find_tablet == cache_param.tablet_to_range.end()) {
- return Status::InternalError("Not find tablet in
partition_to_tablets, plan error");
+
+ std::string digest;
+ try {
+ digest = cache_param.digest;
+ } catch (const std::exception&) {
+ return Status::InternalError("digest is invalid, plan error");
+ }
+ if (digest.empty()) {
+ return Status::InternalError("digest is empty, plan error");
+ }
+
+ if (cache_param.tablet_to_range.empty()) {
+ return Status::InternalError("tablet_to_range is empty, plan
error");
+ }
+
+ std::vector<int64_t> tablet_ids;
+ tablet_ids.reserve(scan_ranges.size());
+ for (const auto& scan_range : scan_ranges) {
+ auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
+ tablet_ids.push_back(tablet_id);
+ }
+ std::sort(tablet_ids.begin(), tablet_ids.end());
+
+ int64_t first_version = -1;
+ std::string first_tablet_range;
+ for (size_t i = 0; i < tablet_ids.size(); ++i) {
+ auto tablet_id = tablet_ids[i];
+
+ auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
+ if (find_tablet == cache_param.tablet_to_range.end()) {
+ return Status::InternalError("Not find tablet in
partition_to_tablets, plan error");
Review Comment:
The error message says `partition_to_tablets`, but the code is actually
checking `cache_param.tablet_to_range`. Please update the message to reference
the correct field to make diagnosing plan issues easier.
```suggestion
return Status::InternalError("Not find tablet in
tablet_to_range, plan error");
```
##########
be/src/pipeline/query_cache/query_cache.h:
##########
@@ -109,27 +109,76 @@ class QueryCache : public LRUCachePolicy {
static Status build_cache_key(const std::vector<TScanRangeParams>&
scan_ranges,
const TQueryCacheParam& cache_param,
std::string* cache_key,
int64_t* version) {
- if (scan_ranges.size() > 1) {
- return Status::InternalError(
- "CacheSourceOperator only support one scan range, plan
error");
+ if (scan_ranges.empty()) {
+ return Status::InternalError("scan_ranges is empty, plan error");
}
- auto& scan_range = scan_ranges[0];
- DCHECK(scan_range.scan_range.__isset.palo_scan_range);
- auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
-
- std::from_chars(scan_range.scan_range.palo_scan_range.version.data(),
- scan_range.scan_range.palo_scan_range.version.data() +
-
scan_range.scan_range.palo_scan_range.version.size(),
- *version);
-
- auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
- if (find_tablet == cache_param.tablet_to_range.end()) {
- return Status::InternalError("Not find tablet in
partition_to_tablets, plan error");
+
+ std::string digest;
+ try {
+ digest = cache_param.digest;
+ } catch (const std::exception&) {
+ return Status::InternalError("digest is invalid, plan error");
+ }
+ if (digest.empty()) {
+ return Status::InternalError("digest is empty, plan error");
+ }
+
+ if (cache_param.tablet_to_range.empty()) {
+ return Status::InternalError("tablet_to_range is empty, plan
error");
+ }
+
+ std::vector<int64_t> tablet_ids;
+ tablet_ids.reserve(scan_ranges.size());
+ for (const auto& scan_range : scan_ranges) {
+ auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
+ tablet_ids.push_back(tablet_id);
+ }
+ std::sort(tablet_ids.begin(), tablet_ids.end());
+
+ int64_t first_version = -1;
+ std::string first_tablet_range;
+ for (size_t i = 0; i < tablet_ids.size(); ++i) {
+ auto tablet_id = tablet_ids[i];
+
+ auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
+ if (find_tablet == cache_param.tablet_to_range.end()) {
+ return Status::InternalError("Not find tablet in
partition_to_tablets, plan error");
+ }
+
+ auto scan_range_iter =
+ std::find_if(scan_ranges.begin(), scan_ranges.end(),
+ [&tablet_id](const TScanRangeParams& range) {
+ return
range.scan_range.palo_scan_range.tablet_id == tablet_id;
+ });
Review Comment:
`build_cache_key` does a `std::find_if(scan_ranges.begin(),
scan_ranges.end(), ...)` inside a loop over `tablet_ids`, making it O(n^2) per
instance. For large instances this adds avoidable CPU overhead; consider a
single pass that builds a map (tablet_id -> parsed version) or collects
(tablet_id, version, range) pairs and then sorts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]