This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 63c99eb  [Cache][Enhancement] Assure sql cache only one version (#5793)
63c99eb is described below

commit 63c99eb4cbb9f1a42c0c41b5c6d65c36de8825dd
Author: xinghuayu007 <[email protected]>
AuthorDate: Fri May 28 13:45:47 2021 +0800

    [Cache][Enhancement] Assure sql cache only one version (#5793)
    
    For PR #5792. This patch add a new param `cache type` to distinguish sql 
cache and partition cache.
    When update sql cache,  we make assure one sql key only has one version 
cache.
---
 be/src/runtime/cache/result_node.cpp               | 46 +++++++++++++++++-
 be/src/runtime/cache/result_node.h                 |  2 +
 be/test/runtime/cache/partition_cache_test.cpp     | 55 ++++++++++++++++------
 .../org/apache/doris/qe/cache/RowBatchBuilder.java |  8 +++-
 gensrc/proto/internal_service.proto                |  6 +++
 5 files changed, 99 insertions(+), 18 deletions(-)

diff --git a/be/src/runtime/cache/result_node.cpp 
b/be/src/runtime/cache/result_node.cpp
index f188c89..4a56349 100644
--- a/be/src/runtime/cache/result_node.cpp
+++ b/be/src/runtime/cache/result_node.cpp
@@ -77,7 +77,50 @@ PCacheStatus ResultNode::update_partition(const 
PUpdateCacheRequest* request,
 
     //Only one thread per SQL key can update the cache
     CacheWriteLock write_lock(_node_mtx);
+    if (request->cache_type() == CacheType::SQL_CACHE) {
+        return update_sql_cache(request, is_update_firstkey);
+    } else {
+        return update_partition_cache(request, is_update_firstkey);
+    }
+}
+
+PCacheStatus ResultNode::update_sql_cache(const PUpdateCacheRequest *request, 
bool &is_update_firstkey) {
+    PartitionRowBatch* partition = NULL;
+    if (request->values_size() > 1) {
+        return PCacheStatus::PARAM_ERROR;
+    }
+    is_update_firstkey = true;
+    const PCacheValue& value = request->values(0);
+    PartitionKey partition_key = value.param().partition_key();
+    // no cache exist, create new cache node
+    if (_partition_map.size() == 0) {
+        partition = new PartitionRowBatch(partition_key);
+        partition->set_row_batch(value);
+        _partition_map[partition_key] = partition;
+        _partition_list.push_back(partition);
+    } else {
+        // compatible with previous version
+        for (auto it = _partition_list.begin(); it != _partition_list.end(); 
it++) {
+            _data_size -= (*it)->get_data_size();
+        }
+        // clear old cache, and create new cache node
+        for (auto it = _partition_list.begin(); it != _partition_list.end();) {
+            (*it)->clear();
+            SAFE_DELETE(*it);
+            it = _partition_list.erase(it);
+        }
+        _partition_map.clear();
+        partition = new PartitionRowBatch(partition_key);
+        partition->set_row_batch(value);
+        _partition_map[partition_key] = partition;
+        _partition_list.push_back(partition);
+    }
+    _data_size += partition->get_data_size();
+    VLOG(1) << "finish update sql cache batches:" << _partition_list.size();
+    return PCacheStatus::CACHE_OK;
+}
 
+PCacheStatus ResultNode::update_partition_cache(const PUpdateCacheRequest 
*request, bool &is_update_firstkey) {
     PartitionKey first_key = kint64max;
     if (_partition_list.size() == 0) {
         is_update_firstkey = true;
@@ -115,7 +158,7 @@ PCacheStatus ResultNode::update_partition(const 
PUpdateCacheRequest* request,
         _data_size += partition->get_data_size();
     }
     _partition_list.sort(compare_partition);
-    LOG(INFO) << "finish update batches:" << _partition_list.size();
+    VLOG(1) << "finish update partition cache batches:" << 
_partition_list.size();
     while (config::query_cache_max_partition_count > 0 &&
            _partition_list.size() > config::query_cache_max_partition_count) {
         if (prune_first() == 0) {
@@ -247,6 +290,7 @@ size_t ResultNode::prune_first() {
     PartitionRowBatch* part_node = *_partition_list.begin();
     size_t prune_size = part_node->get_data_size();
     _partition_list.erase(_partition_list.begin());
+    _partition_map.erase(part_node->get_partition_key());
     part_node->clear();
     SAFE_DELETE(part_node);
     _data_size -= prune_size;
diff --git a/be/src/runtime/cache/result_node.h 
b/be/src/runtime/cache/result_node.h
index 2d5ca0d..d2c5d1d 100644
--- a/be/src/runtime/cache/result_node.h
+++ b/be/src/runtime/cache/result_node.h
@@ -128,6 +128,8 @@ public:
     PCacheStatus update_partition(const PUpdateCacheRequest* request, bool& 
is_update_firstkey);
     PCacheStatus fetch_partition(const PFetchCacheRequest* request,
                                  PartitionRowBatchList& rowBatchList, bool& 
is_hit_firstkey);
+    PCacheStatus update_sql_cache(const PUpdateCacheRequest* request, bool& 
is_update_firstkey);
+    PCacheStatus update_partition_cache(const PUpdateCacheRequest* request, 
bool& is_update_firstkey);
 
     size_t prune_first();
     void clear();
diff --git a/be/test/runtime/cache/partition_cache_test.cpp 
b/be/test/runtime/cache/partition_cache_test.cpp
index 5ae9eef..9999090 100644
--- a/be/test/runtime/cache/partition_cache_test.cpp
+++ b/be/test/runtime/cache/partition_cache_test.cpp
@@ -46,7 +46,7 @@ private:
     }
     void init(int max_size, int ela_size);
     void clear();
-    PCacheStatus init_batch_data(int sql_num, int part_begin, int part_num);
+    PCacheStatus init_batch_data(int sql_num, int part_begin, int part_num, 
CacheType cache_type);
     ResultCache* _cache;
     PUpdateCacheRequest* _update_request;
     PCacheResponse* _update_response;
@@ -84,7 +84,7 @@ void set_sql_key(PUniqueId* sql_key, int64 hi, int64 lo) {
     sql_key->set_lo(lo);
 }
 
-PCacheStatus PartitionCacheTest::init_batch_data(int sql_num, int part_begin, 
int part_num) {
+PCacheStatus PartitionCacheTest::init_batch_data(int sql_num, int part_begin, 
int part_num, CacheType cache_type) {
     LOG(WARNING) << "init data, sql_num:" << sql_num << ",part_num:" << 
part_num;
     PUpdateCacheRequest* up_req = NULL;
     PCacheResponse* up_res = NULL;
@@ -103,6 +103,7 @@ PCacheStatus PartitionCacheTest::init_batch_data(int 
sql_num, int part_begin, in
             value->set_data_size(16);
             value->add_rows("0123456789abcdef"); //16 byte
         }
+        up_req->set_cache_type(cache_type);
         _cache->update(up_req, up_res);
         LOG(WARNING) << "finish update data";
         st = up_res->status();
@@ -114,7 +115,7 @@ PCacheStatus PartitionCacheTest::init_batch_data(int 
sql_num, int part_begin, in
 
 TEST_F(PartitionCacheTest, update_data) {
     init_default();
-    PCacheStatus st = init_batch_data(1, 1, 1);
+    PCacheStatus st = init_batch_data(1, 1, 1, CacheType::SQL_CACHE);
     ASSERT_TRUE(st == PCacheStatus::CACHE_OK);
     LOG(WARNING) << "clear cache";
     clear();
@@ -122,14 +123,14 @@ TEST_F(PartitionCacheTest, update_data) {
 
 TEST_F(PartitionCacheTest, update_over_partition) {
     init_default();
-    PCacheStatus st = init_batch_data(1, 1, 
config::query_cache_max_partition_count + 1);
+    PCacheStatus st = init_batch_data(1, 1, 
config::query_cache_max_partition_count + 1, CacheType::PARTITION_CACHE);
     ASSERT_TRUE(st == PCacheStatus::PARAM_ERROR);
     clear();
 }
 
 TEST_F(PartitionCacheTest, cache_clear) {
     init_default();
-    init_batch_data(1, 1, 1);
+    init_batch_data(1, 1, 1, CacheType::SQL_CACHE);
     _cache->clear(_clear_request, _clear_response);
     ASSERT_EQ(_cache->get_cache_size(), 0);
     clear();
@@ -137,7 +138,7 @@ TEST_F(PartitionCacheTest, cache_clear) {
 
 TEST_F(PartitionCacheTest, fetch_simple_data) {
     init_default();
-    init_batch_data(1, 1, 1);
+    init_batch_data(1, 1, 1, CacheType::SQL_CACHE);
 
     LOG(WARNING) << "finish init\n";
     set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
@@ -159,7 +160,7 @@ TEST_F(PartitionCacheTest, fetch_simple_data) {
 
 TEST_F(PartitionCacheTest, fetch_not_sqlid) {
     init_default();
-    init_batch_data(1, 1, 1);
+    init_batch_data(1, 1, 1, CacheType::SQL_CACHE);
 
     set_sql_key(_fetch_request->mutable_sql_key(), 2, 2);
     PCacheParam* p1 = _fetch_request->add_params();
@@ -174,7 +175,7 @@ TEST_F(PartitionCacheTest, fetch_not_sqlid) {
 
 TEST_F(PartitionCacheTest, fetch_range_data) {
     init_default();
-    init_batch_data(1, 1, 3);
+    init_batch_data(1, 1, 3, CacheType::PARTITION_CACHE);
 
     set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
     PCacheParam* p1 = _fetch_request->add_params();
@@ -195,7 +196,7 @@ TEST_F(PartitionCacheTest, fetch_range_data) {
 
 TEST_F(PartitionCacheTest, fetch_invalid_right_range) {
     init_default();
-    init_batch_data(1, 1, 3);
+    init_batch_data(1, 1, 3, CacheType::PARTITION_CACHE);
 
     set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
     PCacheParam* p1 = _fetch_request->add_params();
@@ -215,7 +216,7 @@ TEST_F(PartitionCacheTest, fetch_invalid_right_range) {
 
 TEST_F(PartitionCacheTest, fetch_invalid_left_range) {
     init_default();
-    init_batch_data(1, 1, 3);
+    init_batch_data(1, 1, 3, CacheType::PARTITION_CACHE);
 
     set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
     PCacheParam* p1 = _fetch_request->add_params();
@@ -231,7 +232,7 @@ TEST_F(PartitionCacheTest, fetch_invalid_left_range) {
 
 TEST_F(PartitionCacheTest, fetch_invalid_key_range) {
     init_default();
-    init_batch_data(1, 2, 1);
+    init_batch_data(1, 2, 1, CacheType::PARTITION_CACHE);
 
     set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
     PCacheParam* p1 = _fetch_request->add_params();
@@ -256,7 +257,7 @@ TEST_F(PartitionCacheTest, fetch_invalid_key_range) {
 
 TEST_F(PartitionCacheTest, fetch_data_overdue) {
     init_default();
-    init_batch_data(1, 1, 1);
+    init_batch_data(1, 1, 1, CacheType::PARTITION_CACHE);
 
     set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
     PCacheParam* p1 = _fetch_request->add_params();
@@ -276,15 +277,15 @@ TEST_F(PartitionCacheTest, fetch_data_overdue) {
 
 TEST_F(PartitionCacheTest, prune_data) {
     init(1, 1);
-    init_batch_data(LOOP_LESS_OR_MORE(10, 129), 1, 1024); // 16*1024*128=2M
+    init_batch_data(LOOP_LESS_OR_MORE(10, 129), 1, 1024, 
CacheType::PARTITION_CACHE); // 16*1024*128=2M
     ASSERT_LE(_cache->get_cache_size(), 1 * 1024 * 1024); //cache_size <= 1M
     clear();
 }
 
 TEST_F(PartitionCacheTest, fetch_not_continue_partition) {
     init_default();
-    init_batch_data(1, 1, 1);
-    init_batch_data(1, 3, 1);
+    init_batch_data(1, 1, 1, CacheType::PARTITION_CACHE);
+    init_batch_data(1, 3, 1, CacheType::PARTITION_CACHE);
     set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
     PCacheParam* p1 = _fetch_request->add_params();
     p1->set_partition_key(1);
@@ -306,6 +307,30 @@ TEST_F(PartitionCacheTest, fetch_not_continue_partition) {
     clear();
 }
 
+TEST_F(PartitionCacheTest, update_sql_cache) {
+    init_default();
+    init_batch_data(1, 1, 1, CacheType::SQL_CACHE);
+    set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
+    PCacheParam* p1 = _fetch_request->add_params();
+    p1->set_partition_key(1);
+    p1->set_last_version(1);
+    p1->set_last_version_time(1);
+    _cache->fetch(_fetch_request, _fetch_result);
+    ASSERT_TRUE(_fetch_result->status() == PCacheStatus::CACHE_OK);
+    ASSERT_EQ(_fetch_result->values_size(), 1);
+    ASSERT_EQ(_fetch_result->values(0).rows(0), "0123456789abcdef");
+    // update sql cache and fetch cache again
+    init_batch_data(1, 2, 1, CacheType::SQL_CACHE);
+    set_sql_key(_fetch_request->mutable_sql_key(), 1, 1);
+    p1 = _fetch_request->add_params();
+    p1->set_partition_key(1);
+    p1->set_last_version(1);
+    p1->set_last_version_time(1);
+    _cache->fetch(_fetch_request, _fetch_result);
+    ASSERT_TRUE(_fetch_result->status() == PCacheStatus::NO_PARTITION_KEY);
+    clear();
+}
+
 } // namespace doris
 
 int main(int argc, char** argv) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
index 444cef2..c468bac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
@@ -102,7 +102,9 @@ public class RowBatchBuilder {
 
     public InternalService.PUpdateCacheRequest buildSqlUpdateRequest(String 
sql, long partitionKey, long lastVersion, long lastestTime) {
         if (updateRequest == null) {
-            updateRequest = 
InternalService.PUpdateCacheRequest.newBuilder().setSqlKey(CacheProxy.getMd5(sql)).build();
+            updateRequest = InternalService.PUpdateCacheRequest.newBuilder()
+                    .setSqlKey(CacheProxy.getMd5(sql))
+                    .setCacheType(InternalService.CacheType.SQL_CACHE).build();
         }
         updateRequest = updateRequest.toBuilder()
                 .addValues(InternalService.PCacheValue.newBuilder()
@@ -139,7 +141,9 @@ public class RowBatchBuilder {
      */
     public InternalService.PUpdateCacheRequest 
buildPartitionUpdateRequest(String sql) {
         if (updateRequest == null) {
-            updateRequest = 
InternalService.PUpdateCacheRequest.newBuilder().setSqlKey(CacheProxy.getMd5(sql)).build();
+            updateRequest = InternalService.PUpdateCacheRequest.newBuilder()
+                    .setSqlKey(CacheProxy.getMd5(sql))
+                    
.setCacheType(InternalService.CacheType.PARTITION_CACHE).build();
         }
         HashMap<Long, List<byte[]>> partRowMap = new HashMap<>();
         List<byte[]> partitionRowList;
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 437fc2f..6aaf40d 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -164,6 +164,11 @@ enum PCacheStatus {
     EMPTY_DATA = 8;
 };
 
+enum CacheType {
+    SQL_CACHE = 1;
+    PARTITION_CACHE = 2;
+};
+
 message PCacheParam {
     required int64 partition_key = 1;
     optional int64 last_version = 2;
@@ -184,6 +189,7 @@ message PCacheResponse {
 message PUpdateCacheRequest{
     required PUniqueId sql_key = 1;
     repeated PCacheValue values = 2;
+    optional CacheType cache_type = 3;
 };
 
 message PFetchCacheRequest {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to