morningman commented on a change in pull request #4670:
URL: https://github.com/apache/incubator-doris/pull/4670#discussion_r495408723



##########
File path: be/src/common/config.h
##########
@@ -310,13 +306,25 @@ namespace config {
     // if compaction of a tablet failed, this tablet should not be chosen to
     // compaction until this interval passes.
     CONF_mInt64(min_compaction_failure_interval_sec, "600"); // 10 min
-    // Too many compaction tasks may run out of memory.
-    // This config is to limit the max concurrency of running compaction tasks.
-    // -1 means no limit, and the max concurrency will be:
-    //      C = (cumulative_compaction_num_threads_per_disk + 
base_compaction_num_threads_per_disk) * dir_num
-    // set it to larger than C will be set to equal to C.
-    // This config can be set to 0, which means to forbid any compaction, for 
some special cases.
-    CONF_Int32(max_compaction_concurrency, "-1");
+
+    // This config can be set to limit thread number in compaction thread pool.
+    CONF_mInt32(min_compaction_threads, "10");
+    CONF_mInt32(max_compaction_threads, "10");
+
+    // The upper limit of "permits" held by all compaction tasks. This config 
can be set to limit memory consumption for compaction.
+    CONF_mInt64(total_permits_for_compaction_score, "10000")
+
+    // Whether compaction task is allowed to start when compaction score of 
current tablet is out of upper limit.
+    CONF_mBool(enable_over_sold, "true");

Review comment:
       ```suggestion
       CONF_mBool(enable_compaction_permit_over_sold, "true");
   ```

##########
File path: be/src/olap/olap_server.cpp
##########
@@ -391,4 +313,70 @@ void StorageEngine::_tablet_checkpoint_callback(DataDir* 
data_dir) {
     } while 
(!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
 }
 
-}  // namespace doris
+void StorageEngine::_compaction_tasks_producer_callback() {
+#ifdef GOOGLE_PROFILER
+    ProfilerRegisterThread();
+#endif
+    LOG(INFO) << "try to start compaction producer process!";
+
+    std::vector<DataDir*> data_dirs;
+    for (auto& tmp_store : _store_map) {
+        data_dirs.push_back(tmp_store.second);
+    }
+
+    int round = 0;
+    CompactionType compaction_type;
+    do {
+        if (!config::disable_auto_compaction) {
+            if (round < 
config::cumulative_compaction_rounds_for_each_base_compaction_round) {

Review comment:
       And I also test this with following case:
   1. Only 1 BE with 1 data dir.
   2. Create one table with 100 buckets.
   3. insert data into this table for every 5 seconds.
   
   The compaction is triggered every 2 seconds. And each compaction task cost 
just 0.x seconds. But the average version count of tablets is about 50, and can 
not be lower.
   
   So I think the way to generate compaction tasks through polling may not be 
appropriate. One possible way is to generate compaction tasks through 
triggering.
   
   Based on polling, currently only one task can be done in 2 seconds, and 
based on triggering, in my case, it can be done 500 times per second (because 
the amount of data in each batch is very small in the case of high-frequency 
load)

##########
File path: be/src/olap/olap_server.cpp
##########
@@ -391,4 +313,70 @@ void StorageEngine::_tablet_checkpoint_callback(DataDir* 
data_dir) {
     } while 
(!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
 }
 
-}  // namespace doris
+void StorageEngine::_compaction_tasks_producer_callback() {
+#ifdef GOOGLE_PROFILER
+    ProfilerRegisterThread();
+#endif
+    LOG(INFO) << "try to start compaction producer process!";
+
+    std::vector<DataDir*> data_dirs;
+    for (auto& tmp_store : _store_map) {
+        data_dirs.push_back(tmp_store.second);
+    }
+
+    int round = 0;
+    CompactionType compaction_type;
+    do {
+        if (!config::disable_auto_compaction) {
+            if (round < 
config::cumulative_compaction_rounds_for_each_base_compaction_round) {

Review comment:
       The default 
`cumulative_compaction_rounds_for_each_base_compaction_round` is 9, and default 
`generate_compaction_tasks_interval_seconds` is 2. So generally, it will create 
a base compaction task for every 18 seconds?

##########
File path: be/src/olap/olap_server.cpp
##########
@@ -391,4 +313,70 @@ void StorageEngine::_tablet_checkpoint_callback(DataDir* 
data_dir) {
     } while 
(!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
 }
 
-}  // namespace doris
+void StorageEngine::_compaction_tasks_producer_callback() {
+#ifdef GOOGLE_PROFILER
+    ProfilerRegisterThread();
+#endif
+    LOG(INFO) << "try to start compaction producer process!";
+
+    std::vector<DataDir*> data_dirs;
+    for (auto& tmp_store : _store_map) {
+        data_dirs.push_back(tmp_store.second);
+    }
+
+    int round = 0;
+    CompactionType compaction_type;
+    do {
+        if (!config::disable_auto_compaction) {
+            if (round < 
config::cumulative_compaction_rounds_for_each_base_compaction_round) {
+                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
+                round++;
+            } else {
+                compaction_type = CompactionType::BASE_COMPACTION;
+                round = 0;
+            }
+            LOG(INFO) << "try to generate a batch of compaction tasks!";
+            vector<TabletSharedPtr> tablets_compaction =
+                    _compaction_tasks_generator(compaction_type, data_dirs);
+            for (const auto& tablet : tablets_compaction) {
+                if (tablet->data_dir()->get_disks_compaction_num() <
+                    config::compaction_task_num_per_disk) {
+                    int64_t permits = 
tablet->calc_compaction_score(compaction_type);
+                    if (_permit_limiter.request(permits)) {
+                        if (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION) {
+                            _compaction_thread_pool->submit_func([this, 
tablet, permits]() {
+                                CgroupsMgr::apply_system_cgroup();
+                                this->_perform_cumulative_compaction(tablet);
+                                this->_permit_limiter.release(permits);
+                            });
+                        } else {
+                            _compaction_thread_pool->submit_func([this, 
tablet, permits]() {
+                                CgroupsMgr::apply_system_cgroup();
+                                this->_perform_base_compaction(tablet);
+                                this->_permit_limiter.release(permits);
+                            });
+                        }
+                    }
+                }
+            }
+        }
+    } while (!_stop_background_threads_latch.wait_for(
+            
MonoDelta::FromSeconds(config::generate_compaction_tasks_interval_seconds)));
+}
+
+vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
+        CompactionType compaction_type, std::vector<DataDir*> data_dirs) {
+    vector<TabletSharedPtr> tablets_compaction;
+    std::random_shuffle(data_dirs.begin(), data_dirs.end());

Review comment:
       Why need to shuffle the `data_dirs`?

##########
File path: be/src/olap/olap_server.cpp
##########
@@ -391,4 +313,70 @@ void StorageEngine::_tablet_checkpoint_callback(DataDir* 
data_dir) {
     } while 
(!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(interval)));
 }
 
-}  // namespace doris
+void StorageEngine::_compaction_tasks_producer_callback() {
+#ifdef GOOGLE_PROFILER
+    ProfilerRegisterThread();
+#endif
+    LOG(INFO) << "try to start compaction producer process!";
+
+    std::vector<DataDir*> data_dirs;
+    for (auto& tmp_store : _store_map) {
+        data_dirs.push_back(tmp_store.second);
+    }
+
+    int round = 0;
+    CompactionType compaction_type;
+    do {
+        if (!config::disable_auto_compaction) {
+            if (round < 
config::cumulative_compaction_rounds_for_each_base_compaction_round) {
+                compaction_type = CompactionType::CUMULATIVE_COMPACTION;
+                round++;
+            } else {
+                compaction_type = CompactionType::BASE_COMPACTION;
+                round = 0;
+            }
+            LOG(INFO) << "try to generate a batch of compaction tasks!";
+            vector<TabletSharedPtr> tablets_compaction =
+                    _compaction_tasks_generator(compaction_type, data_dirs);
+            for (const auto& tablet : tablets_compaction) {
+                if (tablet->data_dir()->get_disks_compaction_num() <
+                    config::compaction_task_num_per_disk) {
+                    int64_t permits = 
tablet->calc_compaction_score(compaction_type);
+                    if (_permit_limiter.request(permits)) {
+                        if (compaction_type == 
CompactionType::CUMULATIVE_COMPACTION) {
+                            _compaction_thread_pool->submit_func([this, 
tablet, permits]() {
+                                CgroupsMgr::apply_system_cgroup();
+                                this->_perform_cumulative_compaction(tablet);
+                                this->_permit_limiter.release(permits);
+                            });
+                        } else {
+                            _compaction_thread_pool->submit_func([this, 
tablet, permits]() {
+                                CgroupsMgr::apply_system_cgroup();
+                                this->_perform_base_compaction(tablet);
+                                this->_permit_limiter.release(permits);
+                            });
+                        }
+                    }
+                }
+            }
+        }
+    } while (!_stop_background_threads_latch.wait_for(
+            
MonoDelta::FromSeconds(config::generate_compaction_tasks_interval_seconds)));
+}
+
+vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
+        CompactionType compaction_type, std::vector<DataDir*> data_dirs) {
+    vector<TabletSharedPtr> tablets_compaction;
+    std::random_shuffle(data_dirs.begin(), data_dirs.end());
+    for (auto data_dir : data_dirs) {

Review comment:
       I think we can find more than one tablet for each data dir at this time.
   the number of tablet found here can be `compaction_task_num_per_disk`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to