morningman commented on a change in pull request #333: Add distributor which 
schedule task to be fairly
URL: https://github.com/apache/incubator-doris/pull/333#discussion_r235689363
 
 

 ##########
 File path: 
fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
 ##########
 @@ -228,173 +170,65 @@ public long getMinTaskBeId() {
                     maxIdleSlotNum = Math.max(maxIdleSlotNum, idelTaskNum);
                 }
             }
+            if (result < 0) {
+                throw new LoadException("There is no empty slot in cluster");
+            }
             return result;
         } finally {
             readUnlock();
         }
     }
 
-    public Queue<RoutineLoadTaskInfo> getNeedSchedulerRoutineLoadTasks() {
-        readLock();
-        try {
-            return needSchedulerRoutineLoadTasks;
-        } finally {
-            readUnlock();
+    public List<RoutineLoadTaskInfo> getNeedSchedulerRoutineLoadTasks() {
+        List<RoutineLoadTaskInfo> routineLoadTaskInfoList = new ArrayList<>();
+        for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
+            
routineLoadTaskInfoList.addAll(routineLoadJob.getNeedSchedulerTaskInfoList());
         }
+        return routineLoadTaskInfoList;
     }
 
-    public RoutineLoadJob getJobByTaskId(long taskId) {
-        readLock();
-        try {
-            return idToRoutineLoadJob.get(taskIdToJobId.get(taskId));
-        } finally {
-            readUnlock();
-        }
+    public RoutineLoadJob getJob(String jobId) {
+        return idToRoutineLoadJob.get(jobId);
     }
 
     public List<RoutineLoadJob> 
getRoutineLoadJobByState(RoutineLoadJob.JobState jobState) throws LoadException 
{
         List<RoutineLoadJob> jobs = new ArrayList<>();
         Collection<RoutineLoadJob> stateJobs = null;
-        readLock();
         LOG.debug("begin to get routine load job by state {}", 
jobState.name());
-        try {
-            switch (jobState) {
-                case NEED_SCHEDULER:
-                    stateJobs = idToNeedSchedulerRoutineLoadJob.values();
-                    break;
-                case PAUSED:
-                    throw new LoadException("not support getting paused 
routine load jobs");
-                case RUNNING:
-                    stateJobs = idToRunningRoutineLoadJob.values();
-                    break;
-                case STOPPED:
-                    throw new LoadException("not support getting stopped 
routine load jobs");
-                default:
-                    break;
-            }
-            if (stateJobs != null) {
-                jobs.addAll(stateJobs);
-                LOG.info("got {} routine load jobs by state {}", jobs.size(), 
jobState.name());
-            }
-        } finally {
-            readUnlock();
+        switch (jobState) {
+            case NEED_SCHEDULER:
 
 Review comment:
   Why not just:
   stateJobs = idToRoutineLoadJob.values().stream()
                           .filter(entity -> entity.getState() == jobState)
                           .collect(Collectors.toList());

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to