EmmyMiao87 commented on a change in pull request #2093: Fix some routine load
bugs
URL: https://github.com/apache/incubator-doris/pull/2093#discussion_r340069332
##########
File path:
fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
##########
@@ -171,54 +194,63 @@ private void updateBackendSlotIfNecessary() {
|| (currentTime - lastBackendSlotUpdateTime >
BACKEND_SLOT_UPDATE_INTERVAL_MS)) {
routineLoadManager.updateBeIdToMaxConcurrentTasks();
lastBackendSlotUpdateTime = currentTime;
- if (LOG.isDebugEnabled()) {
- LOG.debug("update backend max slot for routine load task
scheduling");
- }
+ LOG.debug("update backend max slot for routine load task
scheduling. current task num per BE: {}",
+ Config.max_concurrent_task_num_per_be);
}
}
public void addTaskInQueue(RoutineLoadTaskInfo routineLoadTaskInfo) {
needScheduleTasksQueue.add(routineLoadTaskInfo);
+ LOG.debug("total tasks num in routine load task queue: {}",
needScheduleTasksQueue.size());
}
public void addTasksInQueue(List<RoutineLoadTaskInfo>
routineLoadTaskInfoList) {
needScheduleTasksQueue.addAll(routineLoadTaskInfoList);
+ LOG.debug("total tasks num in routine load task queue: {}",
needScheduleTasksQueue.size());
}
- private void submitBatchTasksIfNotEmpty(Map<Long, List<TRoutineLoadTask>>
beIdToRoutineLoadTask) {
- for (Map.Entry<Long, List<TRoutineLoadTask>> entry :
beIdToRoutineLoadTask.entrySet()) {
- Backend backend =
Catalog.getCurrentSystemInfo().getBackend(entry.getKey());
- if (backend == null) {
- LOG.warn("failed to send tasks to be {} when backend is
unavailable", entry.getKey());
- continue;
+ private boolean submitTask(long beId, TRoutineLoadTask tTask) {
+ Backend backend = Catalog.getCurrentSystemInfo().getBackend(beId);
+ if (backend == null) {
+ LOG.warn("failed to send tasks to backend {} because not exist",
beId);
+ return false;
+ }
+
+ TNetworkAddress address = new TNetworkAddress(backend.getHost(),
backend.getBePort());
+ boolean ok = false;
+ BackendService.Client client = null;
+ try {
+ client = ClientPool.backendPool.borrowObject(address);
+ TStatus tStatus =
client.submit_routine_load_task(Lists.newArrayList(tTask));
+ ok = true;
+
+ if (tStatus.getStatus_code() == TStatusCode.OK) {
+ LOG.debug("send routine load task {} to BE: {}",
DebugUtil.printId(tTask.id), beId);
+ return true;
+ } else {
+ LOG.info("failed to submit task {}, BE: {}, error code: {}",
+ DebugUtil.printId(tTask.getId()), beId,
tStatus.getStatus_code());
+ return false;
}
- TNetworkAddress address = new TNetworkAddress(backend.getHost(),
backend.getBePort());
- boolean ok = false;
- BackendService.Client client = null;
- try {
- client = ClientPool.backendPool.borrowObject(address);
- client.submit_routine_load_task(entry.getValue());
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} tasks sent to be {}",
entry.getValue().size(), entry.getKey());
- }
- ok = true;
- entry.getValue().clear();
- } catch (Exception e) {
- LOG.warn("task send error. backend[{}]", entry.getKey(), e);
- } finally {
- if (ok) {
- ClientPool.backendPool.returnObject(address, client);
- } else {
- ClientPool.backendPool.invalidateObject(address, client);
- }
+ } catch (Exception e) {
+ LOG.warn("task send error. backend[{}]", beId, e);
Review comment:
Please use the new Log Builder
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]