This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 647cbae400 [DSIP-32][Master] Add command fetcher strategy for master 
fetch command (#15900)
647cbae400 is described below

commit 647cbae4002c0ab3758d57827460ad7125a2c853
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon Apr 29 16:14:23 2024 +0800

    [DSIP-32][Master] Add command fetcher strategy for master fetch command 
(#15900)
---
 .../dolphinscheduler_env.sh                        |  1 -
 .../dolphinscheduler_env.sh                        |  1 -
 .../dolphinscheduler_env.sh                        |  1 -
 .../dolphinscheduler_env.sh                        |  1 -
 docs/docs/en/architecture/configuration.md         |  4 +-
 docs/docs/en/guide/installation/pseudo-cluster.md  |  1 -
 docs/docs/zh/architecture/configuration.md         | 47 ++++++------
 docs/docs/zh/guide/installation/pseudo-cluster.md  |  1 -
 .../dolphinscheduler/dao/mapper/CommandMapper.java | 12 +--
 .../dolphinscheduler/dao/repository/BaseDao.java   |  5 ++
 .../dao/repository/CommandDao.java                 | 39 ++++++++++
 .../dolphinscheduler/dao/repository/IDao.java      |  5 ++
 .../dao/repository/impl/CommandDaoImpl.java        | 41 ++++++++++
 .../dolphinscheduler/dao/mapper/CommandMapper.xml  |  6 +-
 .../dao/mapper/CommandMapperTest.java              | 13 +++-
 .../dao/repository/impl/CommandDaoImplTest.java    | 88 ++++++++++++++++++++++
 .../command/CommandFetcherConfiguration.java       | 49 ++++++++++++
 .../server/master/command/ICommandFetcher.java     | 36 +++++++++
 .../master/command/IdSlotBasedCommandFetcher.java  | 73 ++++++++++++++++++
 .../server/master/config/CommandFetchStrategy.java | 63 ++++++++++++++++
 .../server/master/config/MasterConfig.java         | 12 +--
 .../master/runner/MasterSchedulerBootstrap.java    | 38 ++--------
 .../src/main/resources/application.yaml            |  9 ++-
 .../server/master/config/MasterConfigTest.java     | 12 +++
 .../src/test/resources/application.yaml            |  9 ++-
 .../service/command/CommandService.java            | 11 ---
 .../service/command/CommandServiceImpl.java        |  9 ---
 .../service/command/MessageServiceImplTest.java    | 10 ---
 .../src/main/resources/application.yaml            |  9 ++-
 29 files changed, 484 insertions(+), 122 deletions(-)

diff --git 
a/.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh
 
b/.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh
index 58937e740c..8536eb0905 100755
--- 
a/.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh
+++ 
b/.github/workflows/cluster-test/mysql_with_mysql_registry/dolphinscheduler_env.sh
@@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=123456
 # DolphinScheduler server related configuration
 export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
 export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
-export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
 
 # Registry center configuration, determines the type and link of the registry 
center
 export REGISTRY_TYPE=${REGISTRY_TYPE:-jdbc}
diff --git 
a/.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh
 
b/.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh
index 671c70a5bb..f64e59b768 100755
--- 
a/.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh
+++ 
b/.github/workflows/cluster-test/mysql_with_zookeeper_registry/dolphinscheduler_env.sh
@@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=123456
 # DolphinScheduler server related configuration
 export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
 export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
-export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
 
 # Registry center configuration, determines the type and link of the registry 
center
 export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
diff --git 
a/.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh
 
b/.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh
index e7fd1b7204..29f8570319 100644
--- 
a/.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh
+++ 
b/.github/workflows/cluster-test/postgresql_with_postgresql_registry/dolphinscheduler_env.sh
@@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=postgres
 # DolphinScheduler server related configuration
 export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
 export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
-export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
 
 # Registry center configuration, determines the type and link of the registry 
center
 export REGISTRY_TYPE=jdbc
diff --git 
a/.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh
 
b/.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh
index 1dbd63254e..6851716058 100644
--- 
a/.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh
+++ 
b/.github/workflows/cluster-test/postgresql_with_zookeeper_registry/dolphinscheduler_env.sh
@@ -28,7 +28,6 @@ export SPRING_DATASOURCE_PASSWORD=postgres
 # DolphinScheduler server related configuration
 export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
 export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
-export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
 
 # Registry center configuration, determines the type and link of the registry 
center
 export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
diff --git a/docs/docs/en/architecture/configuration.md 
b/docs/docs/en/architecture/configuration.md
index 13d8932943..fe0b7851ba 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -286,7 +286,6 @@ Location: `master-server/conf/application.yaml`
 |                                 Parameters                                  
| Default value |                                                               
                                                                                
          Description                                                           
                                                                                
               |
 
|-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | master.listen-port                                                          
| 5678          | master listen port                                            
                                                                                
                                                                                
                                                                                
               |
-| master.fetch-command-num                                                    
| 10            | the number of commands fetched by master                      
                                                                                
                                                                                
                                                                                
               |
 | master.pre-exec-threads                                                     
| 10            | master prepare execute thread number to limit handle commands 
in parallel                                                                     
                                                                                
                                                                                
               |
 | master.exec-threads                                                         
| 100           | master execute thread number to limit process instances in 
parallel                                                                        
                                                                                
                                                                                
                  |
 | master.dispatch-task-number                                                 
| 3             | master dispatch task number per batch                         
                                                                                
                                                                                
                                                                                
               |
@@ -305,6 +304,9 @@ Location: `master-server/conf/application.yaml`
 | master.registry-disconnect-strategy.strategy                                
| stop          | Used when the master disconnect from registry, default value: 
stop. Optional values include stop, waiting                                     
                                                                                
                                                                                
               |
 | master.registry-disconnect-strategy.max-waiting-time                        
| 100s          | Used when the master disconnect from registry, and the 
disconnect strategy is waiting, this config means the master will waiting to 
reconnect to registry in given times, and after the waiting times, if the 
master still cannot connect to registry, will stop itself, if the value is 0s, 
the Master will wait infinitely |
 | master.worker-group-refresh-interval                                        
| 10s           | The interval to refresh worker group from db to memory        
                                                                                
                                                                                
                                                                                
               |
+| master.command-fetch-strategy.type                                          
| ID_SLOT_BASED | The command fetch strategy, only support `ID_SLOT_BASED`      
                                                                                
                                                                                
                                                                                
               |
+| master.command-fetch-strategy.config.id-step                                
| 1             | The id auto incremental step of t_ds_command in db            
                                                                                
                                                                                
                                                                                
               |
+| master.command-fetch-strategy.config.fetch-size                             
| 10            | The number of commands fetched by master                      
                                                                                
                                                                                
                                                                                
               |
 
 ### Worker Server related configuration
 
diff --git a/docs/docs/en/guide/installation/pseudo-cluster.md 
b/docs/docs/en/guide/installation/pseudo-cluster.md
index e63436f203..7a3b43b00e 100644
--- a/docs/docs/en/guide/installation/pseudo-cluster.md
+++ b/docs/docs/en/guide/installation/pseudo-cluster.md
@@ -123,7 +123,6 @@ export SPRING_DATASOURCE_PASSWORD={password}
 # DolphinScheduler server related configuration
 export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
 export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
-export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
 
 # Registry center configuration, determines the type and link of the registry 
center
 export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
diff --git a/docs/docs/zh/architecture/configuration.md 
b/docs/docs/zh/architecture/configuration.md
index 08fded19e0..d8d1d42d1e 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -281,29 +281,30 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId
 
 位置:`master-server/conf/application.yaml`
 
-|                                     参数                                      
|     默认值      |                                           描述                   
                         |
-|-----------------------------------------------------------------------------|--------------|-----------------------------------------------------------------------------------------|
-| master.listen-port                                                          
| 5678         | master监听端口                                                     
                         |
-| master.fetch-command-num                                                    
| 10           | master拉取command数量                                              
                         |
-| master.pre-exec-threads                                                     
| 10           | master准备执行任务的数量,用于限制并行的command                                 
                         |
-| master.exec-threads                                                         
| 100          | master工作线程数量,用于限制并行的流程实例数量                                     
                         |
-| master.dispatch-task-number                                                 
| 3            | master每个批次的派发任务数量                                              
                         |
-| master.host-selector                                                        
| lower_weight | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, 
lower_weight                 |
-| master.max-heartbeat-interval                                               
| 10s          | master最大心跳间隔                                                   
                         |
-| master.task-commit-retry-times                                              
| 5            | 任务重试次数                                                         
                         |
-| master.task-commit-interval                                                 
| 1000         | 任务提交间隔,单位为毫秒                                                   
                         |
-| master.state-wheel-interval                                                 
| 5            | 轮询检查状态时间                                                       
                         |
-| master.server-load-protection.enabled                                       
| true         | 是否开启系统保护策略                                                     
                         |
-| master.server-load-protection.max-system-cpu-usage-percentage-thresholds    
| 0.7          | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 
默认值为0.7: 会使用70%的操作系统CPU       |
-| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds       
| 0.7          | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM 
cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU |
-| master.server-load-protection.max-system-memory-usage-percentage-thresholds 
| 0.7          | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 
默认值为0.7: 会使用70%的操作系统内存          |
-| master.server-load-protection.max-disk-usage-percentage-thresholds          
| 0.7          | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 
默认值为0.7: 会使用70%的操作系统磁盘空间         |
-| master.failover-interval                                                    
| 10           | failover间隔,单位为分钟                                               
                         |
-| master.kill-application-when-task-failover                                  
| true         | 当任务实例failover时,是否kill掉yarn或k8s application                     
                         |
-| master.registry-disconnect-strategy.strategy                                
| stop         | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting        
                         |
-| master.registry-disconnect-strategy.max-waiting-time                        
| 100s         | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 
该值表示当Master与注册中心失联时会在给定时间之内进行重连,       |
-| 在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待                      |
-| master.master.worker-group-refresh-interval                                 
| 10s          | 定期将workerGroup从数据库中同步到内存的时间间隔                                  
                         |
+|                                     参数                                      
|      默认值      |                                                               
     描述                                                                    |
+|-----------------------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------|
+| master.listen-port                                                          
| 5678          | master监听端口                                                    
                                                                           |
+| master.pre-exec-threads                                                     
| 10            | master准备执行任务的数量,用于限制并行的command                                
                                                                           |
+| master.exec-threads                                                         
| 100           | master工作线程数量,用于限制并行的流程实例数量                                    
                                                                           |
+| master.dispatch-task-number                                                 
| 3             | master每个批次的派发任务数量                                             
                                                                           |
+| master.host-selector                                                        
| lower_weight  | master host选择器,用于选择合适的worker执行任务,可选值: random, round_robin, 
lower_weight                                                                  |
+| master.max-heartbeat-interval                                               
| 10s           | master最大心跳间隔                                                  
                                                                           |
+| master.task-commit-retry-times                                              
| 5             | 任务重试次数                                                        
                                                                           |
+| master.task-commit-interval                                                 
| 1000          | 任务提交间隔,单位为毫秒                                                  
                                                                           |
+| master.state-wheel-interval                                                 
| 5             | 轮询检查状态时间                                                      
                                                                           |
+| master.server-load-protection.enabled                                       
| true          | 是否开启系统保护策略                                                    
                                                                           |
+| master.server-load-protection.max-system-cpu-usage-percentage-thresholds    
| 0.7           | master最大系统cpu使用值,只有当前系统cpu使用值低于最大系统cpu使用值,master服务才能调度任务. 
默认值为0.7: 会使用70%的操作系统CPU                                                        |
+| master.server-load-protection.max-jvm-cpu-usage-percentage-thresholds       
| 0.7           | master最大JVM cpu使用值,只有当前JVM cpu使用值低于最大JVM 
cpu使用值,master服务才能调度任务. 默认值为0.7: 会使用70%的JVM CPU                                  
                |
+| master.server-load-protection.max-system-memory-usage-percentage-thresholds 
| 0.7           | master最大系统 内存使用值,只有当前系统内存使用值低于最大系统内存使用值,master服务才能调度任务. 
默认值为0.7: 会使用70%的操作系统内存                                                          
 |
+| master.server-load-protection.max-disk-usage-percentage-thresholds          
| 0.7           | master最大系统磁盘使用值,只有当前系统磁盘使用值低于最大系统磁盘使用值,master服务才能调度任务. 
默认值为0.7: 会使用70%的操作系统磁盘空间                                                        
  |
+| master.failover-interval                                                    
| 10            | failover间隔,单位为分钟                                              
                                                                           |
+| master.kill-application-when-task-failover                                  
| true          | 当任务实例failover时,是否kill掉yarn或k8s application                    
                                                                           |
+| master.registry-disconnect-strategy.strategy                                
| stop          | 当Master与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting       
                                                                           |
+| master.registry-disconnect-strategy.max-waiting-time                        
| 100s          | 当Master与注册中心失联之后重连时间, 之后当strategy为waiting时,该值生效。 
该值表示当Master与注册中心失联时会在给定时间之内进行重连, 
在给定时间之内重连失败将会停止自己,在重连时,Master会丢弃目前正在执行的工作流,值为0表示会无限期等待 |
+| master.master.worker-group-refresh-interval                                 
| 10s           | 定期将workerGroup从数据库中同步到内存的时间间隔                                 
                                                                           |
+| master.command-fetch-strategy.type                                          
| ID_SLOT_BASED | Command拉取策略, 目前仅支持 `ID_SLOT_BASED`                            
                                                                           |
+| master.command-fetch-strategy.config.id-step                                
| 1             | 数据库中t_ds_command的id自增步长                                       
                                                                           |
+| master.command-fetch-strategy.config.fetch-size                             
| 10            | master拉取command数量                                             
                                                                           |
 
 ## Worker Server相关配置
 
diff --git a/docs/docs/zh/guide/installation/pseudo-cluster.md 
b/docs/docs/zh/guide/installation/pseudo-cluster.md
index 13479e0d9e..a199167e04 100644
--- a/docs/docs/zh/guide/installation/pseudo-cluster.md
+++ b/docs/docs/zh/guide/installation/pseudo-cluster.md
@@ -118,7 +118,6 @@ export SPRING_DATASOURCE_PASSWORD={password}
 # DolphinScheduler server related configuration
 export SPRING_CACHE_TYPE=${SPRING_CACHE_TYPE:-none}
 export SPRING_JACKSON_TIME_ZONE=${SPRING_JACKSON_TIME_ZONE:-UTC}
-export MASTER_FETCH_COMMAND_NUM=${MASTER_FETCH_COMMAND_NUM:-10}
 
 # Registry center configuration, determines the type and link of the registry 
center
 export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
index 9fb6643227..8c8314e7cc 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/CommandMapper.java
@@ -52,14 +52,10 @@ public interface CommandMapper extends BaseMapper<Command> {
      */
     List<Command> queryCommandPage(@Param("limit") int limit, @Param("offset") 
int offset);
 
-    /**
-     * query command page by slot
-     *
-     * @return command list
-     */
-    List<Command> queryCommandPageBySlot(@Param("limit") int limit,
-                                         @Param("masterCount") int masterCount,
-                                         @Param("thisMasterSlot") int 
thisMasterSlot);
+    List<Command> queryCommandByIdSlot(@Param("currentSlotIndex") int 
currentSlotIndex,
+                                       @Param("totalSlot") int totalSlot,
+                                       @Param("idStep") int idStep,
+                                       @Param("fetchNumber") int fetchNum);
 
     void deleteByWorkflowInstanceIds(@Param("workflowInstanceIds") 
List<Integer> workflowInstanceIds);
 }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java
index 2937957dbd..664b56ee47 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/BaseDao.java
@@ -56,6 +56,11 @@ public abstract class BaseDao<ENTITY, MYBATIS_MAPPER extends 
BaseMapper<ENTITY>>
         return mybatisMapper.selectBatchIds(ids);
     }
 
+    @Override
+    public List<ENTITY> queryAll() {
+        return mybatisMapper.selectList(null);
+    }
+
     @Override
     public List<ENTITY> queryByCondition(ENTITY queryCondition) {
         if (queryCondition == null) {
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
new file mode 100644
index 0000000000..daa52b8318
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/CommandDao.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository;
+
+import org.apache.dolphinscheduler.dao.entity.Command;
+
+import java.util.List;
+
+public interface CommandDao extends IDao<Command> {
+
+    /**
+     * Query command by command id and server slot, return the command which 
match (commandId / step) %s totalSlot = currentSlotIndex
+     *
+     * @param currentSlotIndex current slot index
+     * @param totalSlot        total slot number
+     * @param idStep           id step in db
+     * @param fetchNum         fetch number
+     * @return command list
+     */
+    List<Command> queryCommandByIdSlot(int currentSlotIndex,
+                                       int totalSlot,
+                                       int idStep,
+                                       int fetchNum);
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java
index c566d9b904..ab77419600 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/IDao.java
@@ -41,6 +41,11 @@ public interface IDao<Entity> {
      */
     List<Entity> queryByIds(Collection<? extends Serializable> ids);
 
+    /**
+     * Query all entities.
+     */
+    List<Entity> queryAll();
+
     /**
      * Query the entity by condition.
      */
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
new file mode 100644
index 0000000000..0b510d15b5
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
+import org.apache.dolphinscheduler.dao.repository.BaseDao;
+import org.apache.dolphinscheduler.dao.repository.CommandDao;
+
+import java.util.List;
+
+import org.springframework.stereotype.Repository;
+
+@Repository
+public class CommandDaoImpl extends BaseDao<Command, CommandMapper> implements 
CommandDao {
+
+    public CommandDaoImpl(CommandMapper commandMapper) {
+        super(commandMapper);
+    }
+
+    @Override
+    public List<Command> queryCommandByIdSlot(int currentSlotIndex, int 
totalSlot, int idStep, int fetchNum) {
+        return mybatisMapper.queryCommandByIdSlot(currentSlotIndex, totalSlot, 
idStep, fetchNum);
+    }
+
+}
diff --git 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
index 56db890ef0..16f7c05f25 100644
--- 
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
+++ 
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/CommandMapper.xml
@@ -40,12 +40,12 @@
         limit #{limit} offset #{offset}
     </select>
 
-    <select id="queryCommandPageBySlot" 
resultType="org.apache.dolphinscheduler.dao.entity.Command">
+    <select id="queryCommandByIdSlot" 
resultType="org.apache.dolphinscheduler.dao.entity.Command">
         select *
         from t_ds_command
-        where id % #{masterCount} = #{thisMasterSlot}
+        where (id / #{idStep}) % #{totalSlot} = #{currentSlotIndex}
         order by process_instance_priority, id asc
-            limit #{limit}
+            limit #{fetchNumber}
     </select>
     <delete id="deleteByWorkflowInstanceIds" >
         delete from t_ds_command
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
index 2d367e46e4..560b68754a 100644
--- 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/CommandMapperTest.java
@@ -187,7 +187,7 @@ public class CommandMapperTest extends BaseDaoTest {
         Command command = createCommand();
         Integer id = command.getId();
         boolean hit = id % masterCount == thisMasterSlot;
-        List<Command> commandList = commandMapper.queryCommandPageBySlot(1, 
masterCount, thisMasterSlot);
+        List<Command> commandList = 
commandMapper.queryCommandByIdSlot(thisMasterSlot, masterCount, 1, 1);
         if (hit) {
             Assertions.assertEquals(id, commandList.get(0).getId());
         } else {
@@ -201,8 +201,9 @@ public class CommandMapperTest extends BaseDaoTest {
 
     /**
      * create command map
-     * @param count map count
-     * @param commandType comman type
+     *
+     * @param count                 map count
+     * @param commandType           comman type
      * @param processDefinitionCode process definition code
      * @return command map
      */
@@ -223,7 +224,8 @@ public class CommandMapperTest extends BaseDaoTest {
     }
 
     /**
-     *  create process definition
+     * create process definition
+     *
      * @return process definition
      */
     private ProcessDefinition createProcessDefinition() {
@@ -243,6 +245,7 @@ public class CommandMapperTest extends BaseDaoTest {
 
     /**
      * create command map
+     *
      * @param count map count
      * @return command map
      */
@@ -258,6 +261,7 @@ public class CommandMapperTest extends BaseDaoTest {
 
     /**
      * create command
+     *
      * @return
      */
     private Command createCommand() {
@@ -266,6 +270,7 @@ public class CommandMapperTest extends BaseDaoTest {
 
     /**
      * create command
+     *
      * @return Command
      */
     private Command createCommand(CommandType commandType, long 
processDefinitionCode) {
diff --git 
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
new file mode 100644
index 0000000000..85867ef3b5
--- /dev/null
+++ 
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/CommandDaoImplTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.dao.repository.impl;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.FailureStrategy;
+import org.apache.dolphinscheduler.common.enums.Priority;
+import org.apache.dolphinscheduler.common.enums.TaskDependType;
+import org.apache.dolphinscheduler.common.enums.WarningType;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.dao.BaseDaoTest;
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.repository.CommandDao;
+
+import org.apache.commons.lang3.RandomUtils;
+
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+class CommandDaoImplTest extends BaseDaoTest {
+
+    @Autowired
+    private CommandDao commandDao;
+
+    @Test
+    void fetchCommandByIdSlot() {
+        int commandSize = RandomUtils.nextInt(1, 1000);
+        for (int i = 0; i < commandSize; i++) {
+            createCommand(CommandType.START_PROCESS, 0);
+        }
+        int totalSlot = RandomUtils.nextInt(1, 10);
+        int currentSlotIndex = RandomUtils.nextInt(0, totalSlot);
+        int fetchSize = RandomUtils.nextInt(10, 100);
+        for (int i = 1; i < 5; i++) {
+            int idStep = i;
+            List<Command> commands = 
commandDao.queryCommandByIdSlot(currentSlotIndex, totalSlot, idStep, fetchSize);
+            assertThat(commands.size()).isGreaterThan(0);
+            assertThat(commands.size())
+                    .isEqualTo(commandDao.queryAll()
+                            .stream()
+                            .filter(command -> (command.getId() / idStep) % 
totalSlot == currentSlotIndex)
+                            .limit(fetchSize)
+                            .count());
+
+        }
+
+    }
+
+    private void createCommand(CommandType commandType, int 
processDefinitionCode) {
+        Command command = new Command();
+        command.setCommandType(commandType);
+        command.setProcessDefinitionCode(processDefinitionCode);
+        command.setExecutorId(4);
+        command.setCommandParam("test command param");
+        command.setTaskDependType(TaskDependType.TASK_ONLY);
+        command.setFailureStrategy(FailureStrategy.CONTINUE);
+        command.setWarningType(WarningType.ALL);
+        command.setWarningGroupId(1);
+        command.setScheduleTime(DateUtils.stringToDate("2019-12-29 12:10:00"));
+        command.setProcessInstancePriority(Priority.MEDIUM);
+        command.setStartTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
+        command.setUpdateTime(DateUtils.stringToDate("2019-12-29 10:10:00"));
+        command.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
+        command.setProcessInstanceId(0);
+        command.setProcessDefinitionVersion(0);
+        commandDao.insert(command);
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/CommandFetcherConfiguration.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/CommandFetcherConfiguration.java
new file mode 100644
index 0000000000..4a4d3c1efc
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/CommandFetcherConfiguration.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.command;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.dolphinscheduler.dao.repository.CommandDao;
+import org.apache.dolphinscheduler.server.master.config.CommandFetchStrategy;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class CommandFetcherConfiguration {
+
+    @Bean
+    public ICommandFetcher commandFetcher(MasterConfig masterConfig,
+                                          MasterSlotManager masterSlotManager,
+                                          CommandDao commandDao) {
+        CommandFetchStrategy commandFetchStrategy =
+                checkNotNull(masterConfig.getCommandFetchStrategy(), "command 
fetch strategy is null");
+        switch (commandFetchStrategy.getType()) {
+            case ID_SLOT_BASED:
+                CommandFetchStrategy.IdSlotBasedFetchConfig 
idSlotBasedFetchConfig =
+                        (CommandFetchStrategy.IdSlotBasedFetchConfig) 
commandFetchStrategy.getConfig();
+                return new IdSlotBasedCommandFetcher(idSlotBasedFetchConfig, 
masterSlotManager, commandDao);
+            default:
+                throw new IllegalArgumentException(
+                        "unsupported command fetch strategy type: " + 
commandFetchStrategy.getType());
+        }
+    }
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/ICommandFetcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/ICommandFetcher.java
new file mode 100644
index 0000000000..c315a9b294
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/ICommandFetcher.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.command;
+
+import org.apache.dolphinscheduler.dao.entity.Command;
+
+import java.util.List;
+
+/**
+ * The command fetcher used to fetch commands
+ */
+public interface ICommandFetcher {
+
+    /**
+     * Fetch commands
+     *
+     * @return command list which need to be handled
+     */
+    List<Command> fetchCommands();
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/IdSlotBasedCommandFetcher.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/IdSlotBasedCommandFetcher.java
new file mode 100644
index 0000000000..a417820093
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/command/IdSlotBasedCommandFetcher.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.command;
+
+import org.apache.dolphinscheduler.dao.entity.Command;
+import org.apache.dolphinscheduler.dao.repository.CommandDao;
+import org.apache.dolphinscheduler.server.master.config.CommandFetchStrategy;
+import 
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
+import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
+
+import java.util.Collections;
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * The command fetcher which is fetch commands by command id and slot.
+ */
+@Slf4j
+public class IdSlotBasedCommandFetcher implements ICommandFetcher {
+
+    private final CommandFetchStrategy.IdSlotBasedFetchConfig 
idSlotBasedFetchConfig;
+
+    private final CommandDao commandDao;
+
+    private final MasterSlotManager masterSlotManager;
+
+    public 
IdSlotBasedCommandFetcher(CommandFetchStrategy.IdSlotBasedFetchConfig 
idSlotBasedFetchConfig,
+                                     MasterSlotManager masterSlotManager,
+                                     CommandDao commandDao) {
+        this.idSlotBasedFetchConfig = idSlotBasedFetchConfig;
+        this.masterSlotManager = masterSlotManager;
+        this.commandDao = commandDao;
+    }
+
+    @Override
+    public List<Command> fetchCommands() {
+        long scheduleStartTime = System.currentTimeMillis();
+        int currentSlotIndex = masterSlotManager.getSlot();
+        int totalSlot = masterSlotManager.getMasterSize();
+        if (totalSlot <= 0 || currentSlotIndex < 0) {
+            log.warn("Slot is validated, current master slots: {}, the current 
slot index is {}", totalSlot,
+                    currentSlotIndex);
+            return Collections.emptyList();
+        }
+        List<Command> commands = commandDao.queryCommandByIdSlot(
+                currentSlotIndex,
+                totalSlot,
+                idSlotBasedFetchConfig.getIdStep(),
+                idSlotBasedFetchConfig.getFetchSize());
+        long cost = System.currentTimeMillis() - scheduleStartTime;
+        log.info("Fetch commands: {} success, cost: {}ms, totalSlot: {}, 
currentSlotIndex: {}", commands.size(), cost,
+                totalSlot, currentSlotIndex);
+        ProcessInstanceMetrics.recordCommandQueryTime(cost);
+        return commands;
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/CommandFetchStrategy.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/CommandFetchStrategy.java
new file mode 100644
index 0000000000..e61941677c
--- /dev/null
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/CommandFetchStrategy.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.config;
+
+import lombok.Data;
+
+import org.springframework.validation.Errors;
+
+@Data
+public class CommandFetchStrategy {
+
+    private CommandFetchStrategyType type = 
CommandFetchStrategyType.ID_SLOT_BASED;
+
+    private CommandFetchConfig config = new IdSlotBasedFetchConfig();
+
+    public void validate(Errors errors) {
+        config.validate(errors);
+    }
+
+    public enum CommandFetchStrategyType {
+        ID_SLOT_BASED,
+        ;
+    }
+
+    public interface CommandFetchConfig {
+
+        void validate(Errors errors);
+
+    }
+
+    @Data
+    public static class IdSlotBasedFetchConfig implements CommandFetchConfig {
+
+        private int idStep = 1;
+        private int fetchSize = 10;
+
+        @Override
+        public void validate(Errors errors) {
+            if (idStep <= 0) {
+                errors.rejectValue("step", null, "step must be greater than 
0");
+            }
+            if (fetchSize <= 0) {
+                errors.rejectValue("fetchSize", null, "fetchSize must be 
greater than 0");
+            }
+        }
+    }
+
+}
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 02c0dcb819..20d3cccef3 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -48,10 +48,6 @@ public class MasterConfig implements Validator {
      * The master RPC server listen port.
      */
     private int listenPort = 5678;
-    /**
-     * The max batch size used to fetch command from database.
-     */
-    private int fetchCommandNum = 10;
     /**
      * The thread number used to prepare processInstance. This number 
shouldn't bigger than fetchCommandNum.
      */
@@ -98,6 +94,8 @@ public class MasterConfig implements Validator {
 
     private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
 
+    private CommandFetchStrategy commandFetchStrategy = new 
CommandFetchStrategy();
+
     // ip:listenPort
     private String masterAddress;
 
@@ -115,9 +113,6 @@ public class MasterConfig implements Validator {
         if (masterConfig.getListenPort() <= 0) {
             errors.rejectValue("listen-port", null, "is invalidated");
         }
-        if (masterConfig.getFetchCommandNum() <= 0) {
-            errors.rejectValue("fetch-command-num", null, "should be a 
positive value");
-        }
         if (masterConfig.getPreExecThreads() <= 0) {
             errors.rejectValue("per-exec-threads", null, "should be a positive 
value");
         }
@@ -149,6 +144,7 @@ public class MasterConfig implements Validator {
         if (StringUtils.isEmpty(masterConfig.getMasterAddress())) {
             
masterConfig.setMasterAddress(NetUtils.getAddr(masterConfig.getListenPort()));
         }
+        commandFetchStrategy.validate(errors);
 
         masterConfig.setMasterRegistryPath(
                 RegistryNodeType.MASTER.getRegistryPath() + "/" + 
masterConfig.getMasterAddress());
@@ -159,7 +155,6 @@ public class MasterConfig implements Validator {
         String config =
                 "\n****************************Master 
Configuration**************************************" +
                         "\n  listen-port -> " + listenPort +
-                        "\n  fetch-command-num -> " + fetchCommandNum +
                         "\n  pre-exec-threads -> " + preExecThreads +
                         "\n  exec-threads -> " + execThreads +
                         "\n  dispatch-task-number -> " + dispatchTaskNumber +
@@ -175,6 +170,7 @@ public class MasterConfig implements Validator {
                         "\n  master-address -> " + masterAddress +
                         "\n  master-registry-path: " + masterRegistryPath +
                         "\n  worker-group-refresh-interval: " + 
workerGroupRefreshInterval +
+                        "\n  command-fetch-strategy: " + commandFetchStrategy +
                         "\n****************************Master 
Configuration**************************************";
         log.info(config);
     }
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
index 2fddd94384..c1b5d0ffab 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
@@ -26,21 +26,18 @@ import 
org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
 import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
 import 
org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
+import org.apache.dolphinscheduler.server.master.command.ICommandFetcher;
 import org.apache.dolphinscheduler.server.master.config.MasterConfig;
 import 
org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
 import org.apache.dolphinscheduler.server.master.event.WorkflowEvent;
 import org.apache.dolphinscheduler.server.master.event.WorkflowEventQueue;
 import org.apache.dolphinscheduler.server.master.event.WorkflowEventType;
-import org.apache.dolphinscheduler.server.master.exception.MasterException;
 import 
org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException;
 import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
-import 
org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
-import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
 import org.apache.dolphinscheduler.service.command.CommandService;
 
 import org.apache.commons.collections4.CollectionUtils;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
@@ -56,6 +53,9 @@ import org.springframework.stereotype.Service;
 @Slf4j
 public class MasterSchedulerBootstrap extends BaseDaemonThread implements 
AutoCloseable {
 
+    @Autowired
+    private ICommandFetcher commandFetcher;
+
     @Autowired
     private CommandService commandService;
 
@@ -74,9 +74,6 @@ public class MasterSchedulerBootstrap extends 
BaseDaemonThread implements AutoCl
     @Autowired
     private WorkflowEventLooper workflowEventLooper;
 
-    @Autowired
-    private MasterSlotManager masterSlotManager;
-
     @Autowired
     private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap;
 
@@ -125,7 +122,7 @@ public class MasterSchedulerBootstrap extends 
BaseDaemonThread implements AutoCl
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                     continue;
                 }
-                List<Command> commands = findCommands();
+                List<Command> commands = commandFetcher.fetchCommands();
                 if (CollectionUtils.isEmpty(commands)) {
                     // indicate that no command ,sleep for 1s
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
@@ -170,29 +167,4 @@ public class MasterSchedulerBootstrap extends 
BaseDaemonThread implements AutoCl
         }
     }
 
-    private List<Command> findCommands() throws MasterException {
-        try {
-            long scheduleStartTime = System.currentTimeMillis();
-            int thisMasterSlot = masterSlotManager.getSlot();
-            int masterCount = masterSlotManager.getMasterSize();
-            if (masterCount <= 0) {
-                log.warn("Master count: {} is invalid, the current slot: {}", 
masterCount, thisMasterSlot);
-                return Collections.emptyList();
-            }
-            int pageSize = masterConfig.getFetchCommandNum();
-            final List<Command> result =
-                    commandService.findCommandPageBySlot(pageSize, 
masterCount, thisMasterSlot);
-            if (CollectionUtils.isNotEmpty(result)) {
-                long cost = System.currentTimeMillis() - scheduleStartTime;
-                log.info(
-                        "Master schedule bootstrap loop command success, fetch 
command size: {}, cost: {}ms, current slot: {}, total slot size: {}",
-                        result.size(), cost, thisMasterSlot, masterCount);
-                ProcessInstanceMetrics.recordCommandQueryTime(cost);
-            }
-            return result;
-        } catch (Exception ex) {
-            throw new MasterException("Master loop command from database 
error", ex);
-        }
-    }
-
 }
diff --git a/dolphinscheduler-master/src/main/resources/application.yaml 
b/dolphinscheduler-master/src/main/resources/application.yaml
index f18c6ef61d..17b1e41a71 100644
--- a/dolphinscheduler-master/src/main/resources/application.yaml
+++ b/dolphinscheduler-master/src/main/resources/application.yaml
@@ -83,8 +83,6 @@ registry:
 
 master:
   listen-port: 5678
-  # master fetch command num
-  fetch-command-num: 10
   # master prepare execute thread number to limit handle commands in parallel
   pre-exec-threads: 10
   # master execute thread number to limit process instances in parallel
@@ -121,6 +119,13 @@ master:
     # The max waiting time to reconnect to registry if you set the strategy to 
waiting
     max-waiting-time: 100s
   worker-group-refresh-interval: 10s
+  command-fetch-strategy:
+    type: ID_SLOT_BASED
+    config:
+      # The incremental id step
+      id-step: 1
+      # master fetch command num
+      fetch-size: 10
 
 server:
   port: 5679
diff --git 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
index faab44cf85..9d26aa81f4 100644
--- 
a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
+++ 
b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/config/MasterConfigTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.dolphinscheduler.server.master.config;
 
+import static com.google.common.truth.Truth.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -47,6 +48,17 @@ public class MasterConfigTest {
         assertEquals(0.77, 
serverLoadProtection.getMaxJvmCpuUsagePercentageThresholds());
         assertEquals(0.77, 
serverLoadProtection.getMaxSystemMemoryUsagePercentageThresholds());
         assertEquals(0.77, 
serverLoadProtection.getMaxDiskUsagePercentageThresholds());
+    }
+
+    @Test
+    public void getCommandFetchStrategy() {
+        CommandFetchStrategy commandFetchStrategy = 
masterConfig.getCommandFetchStrategy();
+        assertThat(commandFetchStrategy.getType())
+                
.isEqualTo(CommandFetchStrategy.CommandFetchStrategyType.ID_SLOT_BASED);
 
+        CommandFetchStrategy.IdSlotBasedFetchConfig idSlotBasedFetchConfig =
+                (CommandFetchStrategy.IdSlotBasedFetchConfig) 
commandFetchStrategy.getConfig();
+        assertThat(idSlotBasedFetchConfig.getIdStep()).isEqualTo(3);
+        assertThat(idSlotBasedFetchConfig.getFetchSize()).isEqualTo(11);
     }
 }
diff --git a/dolphinscheduler-master/src/test/resources/application.yaml 
b/dolphinscheduler-master/src/test/resources/application.yaml
index f4827d4b3c..15f9199609 100644
--- a/dolphinscheduler-master/src/test/resources/application.yaml
+++ b/dolphinscheduler-master/src/test/resources/application.yaml
@@ -89,8 +89,6 @@ registry:
 
 master:
   listen-port: 5678
-  # master fetch command num
-  fetch-command-num: 10
   # master prepare execute thread number to limit handle commands in parallel
   pre-exec-threads: 10
   # master execute thread number to limit process instances in parallel
@@ -127,6 +125,13 @@ master:
     # The max waiting time to reconnect to registry if you set the strategy to 
waiting
     max-waiting-time: 100s
   worker-group-refresh-interval: 10s
+  command-fetch-strategy:
+    type: ID_SLOT_BASED
+    config:
+      # The incremental id step
+      id-step: 3
+      # master fetch command num
+      fetch-size: 11
 
 server:
   port: 5679
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java
index cff73c503f..43b81c4e5c 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandService.java
@@ -22,8 +22,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstanceMap;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 
-import java.util.List;
-
 /**
  * Command Service
  */
@@ -44,15 +42,6 @@ public interface CommandService {
      */
     int createCommand(Command command);
 
-    /**
-     * Get command page
-     * @param pageSize page size
-     * @param masterCount master count
-     * @param thisMasterSlot master slot
-     * @return command page
-     */
-    List<Command> findCommandPageBySlot(int pageSize, int masterCount, int 
thisMasterSlot);
-
     /**
      * check the input command exists in queue list
      *
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
index 483899446b..ee833a80b0 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/command/CommandServiceImpl.java
@@ -57,7 +57,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
 import io.micrometer.core.annotation.Counted;
 
 /**
@@ -107,14 +106,6 @@ public class CommandServiceImpl implements CommandService {
         return result;
     }
 
-    @Override
-    public List<Command> findCommandPageBySlot(int pageSize, int masterCount, 
int thisMasterSlot) {
-        if (masterCount <= 0) {
-            return Lists.newArrayList();
-        }
-        return commandMapper.queryCommandPageBySlot(pageSize, masterCount, 
thisMasterSlot);
-    }
-
     @Override
     public boolean verifyIsNeedCreateCommand(Command command) {
         boolean isNeedCreate = true;
diff --git 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java
 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java
index 0cde76bdfe..f60320fc63 100644
--- 
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java
+++ 
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/command/MessageServiceImplTest.java
@@ -214,14 +214,4 @@ class MessageServiceImplTest {
         Mockito.verify(commandMapper, Mockito.times(1)).insert(command);
     }
 
-    @Test
-    public void testFindCommandPageBySlot() {
-        int pageSize = 1;
-        int masterCount = 0;
-        int thisMasterSlot = 2;
-        List<Command> commandList =
-                commandService.findCommandPageBySlot(pageSize, masterCount, 
thisMasterSlot);
-        Assertions.assertEquals(0, commandList.size());
-    }
-
 }
diff --git 
a/dolphinscheduler-standalone-server/src/main/resources/application.yaml 
b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
index 5122eea2a1..6757718929 100644
--- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml
+++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml
@@ -160,8 +160,6 @@ casdoor:
 
 master:
   listen-port: 5678
-  # master fetch command num
-  fetch-command-num: 10
   # master prepare execute thread number to limit handle commands in parallel
   pre-exec-threads: 10
   # master execute thread number to limit process instances in parallel
@@ -192,6 +190,13 @@ master:
   # kill yarn/k8s application when failover taskInstance, default true
   kill-application-when-task-failover: true
   worker-group-refresh-interval: 10s
+  command-fetch-strategy:
+    type: ID_SLOT_BASED
+    config:
+      # The incremental id step
+      id-step: 1
+      # master fetch command num
+      fetch-size: 10
 
 worker:
   # worker listener port

Reply via email to