This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 63c99eb [Cache][Enhancement] Assure sql cache only one version (#5793)
63c99eb is described below
commit 63c99eb4cbb9f1a42c0c41b5c6d65c36de8825dd
Author: xinghuayu007 <[email protected]>
AuthorDate: Fri May 28 13:45:47 2021 +0800
[Cache][Enhancement] Assure sql cache only one version (#5793)
For PR #5792. This patch add a new param `cache type` to distinguish sql
cache and partition cache.
When update sql cache, we make assure one sql key only has one version
cache.
---
be/src/runtime/cache/result_node.cpp | 46 +++++++++++++++++-
be/src/runtime/cache/result_node.h | 2 +
be/test/runtime/cache/partition_cache_test.cpp | 55 ++++++++++++++++------
.../org/apache/doris/qe/cache/RowBatchBuilder.java | 8 +++-
gensrc/proto/internal_service.proto | 6 +++
5 files changed, 99 insertions(+), 18 deletions(-)
diff --git a/be/src/runtime/cache/result_node.cpp
b/be/src/runtime/cache/result_node.cpp
index f188c89..4a56349 100644
--- a/be/src/runtime/cache/result_node.cpp
+++ b/be/src/runtime/cache/result_node.cpp
@@ -77,7 +77,50 @@ PCacheStatus ResultNode::update_partition(const
PUpdateCacheRequest* request,
//Only one thread per SQL key can update the cache
CacheWriteLock write_lock(_node_mtx);
+ if (request->cache_type() == CacheType::SQL_CACHE) {
+ return update_sql_cache(request, is_update_firstkey);
+ } else {
+ return update_partition_cache(request, is_update_firstkey);
+ }
+}
+
+PCacheStatus ResultNode::update_sql_cache(const PUpdateCacheRequest *request,
bool &is_update_firstkey) {
+ PartitionRowBatch* partition = NULL;
+ if (request->values_size() > 1) {
+ return PCacheStatus::PARAM_ERROR;
+ }
+ is_update_firstkey = true;
+ const PCacheValue& value = request->values(0);
+ PartitionKey partition_key = value.param().partition_key();
+ // no cache exist, create new cache node
+ if (_partition_map.size() == 0) {
+ partition = new PartitionRowBatch(partition_key);
+ partition->set_row_batch(value);
+ _partition_map[partition_key] = partition;
+ _partition_list.push_back(partition);
+ } else {
+ // compatible with previous version
+ for (auto it = _partition_list.begin(); it != _partition_list.end();
it++) {
+ _data_size -= (*it)->get_data_size();
+ }
+ // clear old cache, and create new cache node
+ for (auto it = _partition_list.begin(); it != _partition_list.end();) {
+ (*it)->clear();
+ SAFE_DELETE(*it);
+ it = _partition_list.erase(it);
+ }
+ _partition_map.clear();
+ partition = new PartitionRowBatch(partition_key);
+ partition->set_row_batch(value);
+ _partition_map[partition_key] = partition;
+ _partition_list.push_back(partition);
+ }
+ _data_size += partition->get_data_size();
+ VLOG(1) << "finish update sql cache batches:" << _partition_list.size();
+ return PCacheStatus::CACHE_OK;
+}
+PCacheStatus ResultNode::update_partition_cache(const PUpdateCacheRequest
*request, bool &is_update_firstkey) {
PartitionKey first_key = kint64max;
if (_partition_list.size() == 0) {
is_update_firstkey = true;
@@ -115,7 +158,7 @@ PCacheStatus ResultNode::update_partition(const
PUpdateCacheRequest* request,
_data_size += partition->get_data_size();
}
_partition_list.sort(compare_partition);
- LOG(INFO) << "finish update batches:" << _partition_list.size();
+ VLOG(1) << "finish update partition cache batches:" <<
_partition_list.size();
while (config::query_cache_max_partition_count > 0 &&
_partition_list.size() > config::query_cache_max_partition_count) {
if (prune_first() == 0) {
@@ -247,6 +290,7 @@ size_t ResultNode::prune_first() {
PartitionRowBatch* part_node = *_partition_list.begin();
size_t prune_size = part_node->get_data_size();
_partition_list.erase(_partition_list.begin());
+ _partition_map.erase(part_node->get_partition_key());
part_node->clear();
SAFE_DELETE(part_node);
_data_size -= prune_size;
diff --git a/be/src/runtime/cache/result_node.h
b/be/src/runtime/cache/result_node.h
index 2d5ca0d..d2c5d1d 100644
--- a/be/src/runtime/cache/result_node.h
+++ b/be/src/runtime/cache/result_node.h
@@ -128,6 +128,8 @@ public:
PCacheStatus update_partition(const PUpdateCacheRequest* request, bool&
is_update_firstkey);
PCacheStatus fetch_partition(const PFetchCacheRequest* request,
PartitionRowBatchList& rowBatchList, bool&
is_hit_firstkey);
+ PCacheStatus update_sql_cache(const PUpdateCacheRequest* request, bool&
is_update_firstkey);
+ PCacheStatus update_partition_cache(const PUpdateCacheRequest* request,
bool& is_update_firstkey);
size_t prune_first();
void clear();
diff --git a/be/test/runtime/cache/partition_cache_test.cpp
b/be/test/runtime/cache/partition_cache_test.cpp
index 5ae9eef..9999090 100644
--- a/be/test/runtime/cache/partition_cache_test.cpp
+++ b/be/test/runtime/cache/partition_cache_test.cpp
@@ -46,7 +46,7 @@ private:
}
void init(int max_size, int ela_size);
void clear();
- PCacheStatus init_batch_data(int sql_num, int part_begin, int part_num);
+ PCacheStatus init_batch_data(int sql_num, int part_begin, int part_num,
CacheType cache_type);
ResultCache* _cache;
PUpdateCacheRequest* _update_request;
PCacheResponse* _update_response;
@@ -84,7 +84,7 @@ void set_sql_key(PUniqueId* sql_key, int64 hi, int64 lo) {
sql_key->set_lo(lo);
}
-PCacheStatus PartitionCacheTest::init_batch_data(int sql_num, int part_begin,
int part_num) {
+PCacheStatus PartitionCacheTest::init_batch_data(int sql_num, int part_begin,
int part_num, CacheType cache_type) {
LOG(WARNING) << "init data, sql_num:" << sql_num << ",part_num:" <<
part_num;
PUpdateCacheRequest* up_req = NULL;
PCacheResponse* up_res = NULL;
@@ -103,6 +103,7 @@ PCacheStatus PartitionCacheTest::init_batch_data(int
sql_num, int part_begin, in
value->set_data_size(16);
value->add_rows("0123456789abcdef"); //16 byte
}
+ up_req->set_cache_type(cache_type);
_cache->update(up_req, up_res);
LOG(WARNING) << "finish update data";
st = up_res->status();
@@ -114,7 +115,7 @@ PCacheStatus PartitionCacheTest::init_batch_data(int
sql_num, int part_begin, in
TEST_F(PartitionCacheTest, update_data) {
init_default();
- PCacheStatus st = init_batch_data(1, 1, 1);
+ PCacheStatus st = init_batch_data(1, 1, 1, CacheType::SQL_CACHE);
ASSERT_TRUE(st == PCacheStatus::CACHE_OK);
LOG(WARNING) << "clear cache";
clear();
@@ -122,14 +123,14 @@ TEST_F(PartitionCacheTest, update_data) {
TEST_F(PartitionCacheTest, update_over_partition) {
init_default();
- PCacheStatus st = init_batch_data(1, 1,
config::query_cache_max_partition_count + 1);
+ PCacheStatus st = init_batch_data(1, 1,
config::query_cache_max_partition_count + 1, CacheType::PARTITION_CACHE);
ASSERT_TRUE(st == PCacheStatus::PARAM_ERROR);
clear();
}
TEST_F(PartitionCacheTest, cache_clear) {
init_default();
- init_batch_data(1, 1, 1);
+ init_batch_data(1, 1, 1, CacheType::SQL_CACHE);
_cache->clear(_clear_request, _clear_response);
ASSERT_EQ(_cache->get_cache_size(), 0);
clear();
@@ -137,7 +138,7 @@ TEST_F(PartitionCacheTest, cache_clear) {
TEST_F(PartitionCacheTest, fetch_simple_data) {
init_default();
- init_batch_data(1, 1, 1);
+ init_batch_data(1, 1, 1, CacheType::SQL_CACHE);
LOG(WARNING) << "finish init\n";
set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
@@ -159,7 +160,7 @@ TEST_F(PartitionCacheTest, fetch_simple_data) {
TEST_F(PartitionCacheTest, fetch_not_sqlid) {
init_default();
- init_batch_data(1, 1, 1);
+ init_batch_data(1, 1, 1, CacheType::SQL_CACHE);
set_sql_key(_fetch_request->mutable_sql_key(), 2, 2);
PCacheParam* p1 = _fetch_request->add_params();
@@ -174,7 +175,7 @@ TEST_F(PartitionCacheTest, fetch_not_sqlid) {
TEST_F(PartitionCacheTest, fetch_range_data) {
init_default();
- init_batch_data(1, 1, 3);
+ init_batch_data(1, 1, 3, CacheType::PARTITION_CACHE);
set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
PCacheParam* p1 = _fetch_request->add_params();
@@ -195,7 +196,7 @@ TEST_F(PartitionCacheTest, fetch_range_data) {
TEST_F(PartitionCacheTest, fetch_invalid_right_range) {
init_default();
- init_batch_data(1, 1, 3);
+ init_batch_data(1, 1, 3, CacheType::PARTITION_CACHE);
set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
PCacheParam* p1 = _fetch_request->add_params();
@@ -215,7 +216,7 @@ TEST_F(PartitionCacheTest, fetch_invalid_right_range) {
TEST_F(PartitionCacheTest, fetch_invalid_left_range) {
init_default();
- init_batch_data(1, 1, 3);
+ init_batch_data(1, 1, 3, CacheType::PARTITION_CACHE);
set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
PCacheParam* p1 = _fetch_request->add_params();
@@ -231,7 +232,7 @@ TEST_F(PartitionCacheTest, fetch_invalid_left_range) {
TEST_F(PartitionCacheTest, fetch_invalid_key_range) {
init_default();
- init_batch_data(1, 2, 1);
+ init_batch_data(1, 2, 1, CacheType::PARTITION_CACHE);
set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
PCacheParam* p1 = _fetch_request->add_params();
@@ -256,7 +257,7 @@ TEST_F(PartitionCacheTest, fetch_invalid_key_range) {
TEST_F(PartitionCacheTest, fetch_data_overdue) {
init_default();
- init_batch_data(1, 1, 1);
+ init_batch_data(1, 1, 1, CacheType::PARTITION_CACHE);
set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
PCacheParam* p1 = _fetch_request->add_params();
@@ -276,15 +277,15 @@ TEST_F(PartitionCacheTest, fetch_data_overdue) {
TEST_F(PartitionCacheTest, prune_data) {
init(1, 1);
- init_batch_data(LOOP_LESS_OR_MORE(10, 129), 1, 1024); // 16*1024*128=2M
+ init_batch_data(LOOP_LESS_OR_MORE(10, 129), 1, 1024,
CacheType::PARTITION_CACHE); // 16*1024*128=2M
ASSERT_LE(_cache->get_cache_size(), 1 * 1024 * 1024); //cache_size <= 1M
clear();
}
TEST_F(PartitionCacheTest, fetch_not_continue_partition) {
init_default();
- init_batch_data(1, 1, 1);
- init_batch_data(1, 3, 1);
+ init_batch_data(1, 1, 1, CacheType::PARTITION_CACHE);
+ init_batch_data(1, 3, 1, CacheType::PARTITION_CACHE);
set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
PCacheParam* p1 = _fetch_request->add_params();
p1->set_partition_key(1);
@@ -306,6 +307,30 @@ TEST_F(PartitionCacheTest, fetch_not_continue_partition) {
clear();
}
+TEST_F(PartitionCacheTest, update_sql_cache) {
+ init_default();
+ init_batch_data(1, 1, 1, CacheType::SQL_CACHE);
+ set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
+ PCacheParam* p1 = _fetch_request->add_params();
+ p1->set_partition_key(1);
+ p1->set_last_version(1);
+ p1->set_last_version_time(1);
+ _cache->fetch(_fetch_request, _fetch_result);
+ ASSERT_TRUE(_fetch_result->status() == PCacheStatus::CACHE_OK);
+ ASSERT_EQ(_fetch_result->values_size(), 1);
+ ASSERT_EQ(_fetch_result->values(0).rows(0), "0123456789abcdef");
+ // update sql cache and fetch cache again
+ init_batch_data(1, 2, 1, CacheType::SQL_CACHE);
+ set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
+ p1 = _fetch_request->add_params();
+ p1->set_partition_key(1);
+ p1->set_last_version(1);
+ p1->set_last_version_time(1);
+ _cache->fetch(_fetch_request, _fetch_result);
+ ASSERT_TRUE(_fetch_result->status() == PCacheStatus::NO_PARTITION_KEY);
+ clear();
+}
+
} // namespace doris
int main(int argc, char** argv) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
index 444cef2..c468bac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
@@ -102,7 +102,9 @@ public class RowBatchBuilder {
public InternalService.PUpdateCacheRequest buildSqlUpdateRequest(String
sql, long partitionKey, long lastVersion, long lastestTime) {
if (updateRequest == null) {
- updateRequest =
InternalService.PUpdateCacheRequest.newBuilder().setSqlKey(CacheProxy.getMd5(sql)).build();
+ updateRequest = InternalService.PUpdateCacheRequest.newBuilder()
+ .setSqlKey(CacheProxy.getMd5(sql))
+ .setCacheType(InternalService.CacheType.SQL_CACHE).build();
}
updateRequest = updateRequest.toBuilder()
.addValues(InternalService.PCacheValue.newBuilder()
@@ -139,7 +141,9 @@ public class RowBatchBuilder {
*/
public InternalService.PUpdateCacheRequest
buildPartitionUpdateRequest(String sql) {
if (updateRequest == null) {
- updateRequest =
InternalService.PUpdateCacheRequest.newBuilder().setSqlKey(CacheProxy.getMd5(sql)).build();
+ updateRequest = InternalService.PUpdateCacheRequest.newBuilder()
+ .setSqlKey(CacheProxy.getMd5(sql))
+
.setCacheType(InternalService.CacheType.PARTITION_CACHE).build();
}
HashMap<Long, List<byte[]>> partRowMap = new HashMap<>();
List<byte[]> partitionRowList;
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 437fc2f..6aaf40d 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -164,6 +164,11 @@ enum PCacheStatus {
EMPTY_DATA = 8;
};
+enum CacheType {
+ SQL_CACHE = 1;
+ PARTITION_CACHE = 2;
+};
+
message PCacheParam {
required int64 partition_key = 1;
optional int64 last_version = 2;
@@ -184,6 +189,7 @@ message PCacheResponse {
message PUpdateCacheRequest{
required PUniqueId sql_key = 1;
repeated PCacheValue values = 2;
+ optional CacheType cache_type = 3;
};
message PFetchCacheRequest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]