imay commented on a change in pull request #1003: Support kafka routine load
URL: https://github.com/apache/incubator-doris/pull/1003#discussion_r277599086
 
 

 ##########
 File path: 
fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 ##########
 @@ -301,332 +388,771 @@ public int getSizeOfRoutineLoadTaskInfoList() {
         } finally {
             readUnlock();
         }
-
-    }
-
-    public List<RoutineLoadTaskInfo> getNeedScheduleTaskInfoList() {
-        return needScheduleTaskInfoList;
     }
 
-    public void updateState(JobState jobState) {
-        writeLock();
-        try {
-            state = jobState;
-        } finally {
-            writeUnlock();
-        }
-    }
-
-    public List<RoutineLoadTaskInfo> processTimeoutTasks() {
-        List<RoutineLoadTaskInfo> result = new ArrayList<>();
+    // only check loading task
+    public void processTimeoutTasks() {
         writeLock();
         try {
             List<RoutineLoadTaskInfo> runningTasks = new 
ArrayList<>(routineLoadTaskInfoList);
-            runningTasks.removeAll(needScheduleTaskInfoList);
-
             for (RoutineLoadTaskInfo routineLoadTaskInfo : runningTasks) {
-                if ((System.currentTimeMillis() - 
routineLoadTaskInfo.getLoadStartTimeMs())
-                        > DEFAULT_TASK_TIMEOUT_SECONDS * 1000) {
-                    String oldSignature = routineLoadTaskInfo.getId();
-                    // abort txn if not committed
-                    try {
-                        Catalog.getCurrentGlobalTransactionMgr()
-                                
.abortTransaction(routineLoadTaskInfo.getTxnId(), "routine load task of txn was 
timeout");
-                    } catch (UserException e) {
-                        if (e.getMessage().contains("committed")) {
-                            LOG.debug("txn of task {} has been committed, 
timeout task has been ignored", oldSignature);
-                            continue;
-                        }
-                    }
-
-                    try {
-                        result.add(reNewTask(routineLoadTaskInfo));
-                        LOG.debug("Task {} was ran more then {} minutes. It 
was removed and rescheduled",
-                                  oldSignature, DEFAULT_TASK_TIMEOUT_SECONDS);
-                    } catch (UserException e) {
-                        state = JobState.CANCELLED;
-                        // TODO(ml): edit log
-                        LOG.warn("failed to renew a routine load task in job 
{} with error message {}", id, e.getMessage());
-                    }
+                if (routineLoadTaskInfo.isRunning()
+                        && ((System.currentTimeMillis() - 
routineLoadTaskInfo.getExecuteStartTimeMs())
+                        > maxBatchIntervalS * 2 * 1000)) {
+                    RoutineLoadTaskInfo newTask = 
unprotectRenewTask(routineLoadTaskInfo);
+                    
Catalog.getCurrentCatalog().getRoutineLoadTaskScheduler().addTaskInQueue(newTask);
                 }
             }
         } finally {
             writeUnlock();
         }
-        return result;
     }
 
-    abstract List<RoutineLoadTaskInfo> divideRoutineLoadJob(int 
currentConcurrentTaskNum);
+    abstract void divideRoutineLoadJob(int currentConcurrentTaskNum) throws 
UserException;
 
     public int calculateCurrentConcurrentTaskNum() throws 
MetaNotFoundException {
         return 0;
     }
 
-    @Override
-    public void write(DataOutput out) throws IOException {
-        // TODO(ml)
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-        // TODO(ml)
-    }
-
-
-    public void removeNeedScheduleTask(RoutineLoadTaskInfo 
routineLoadTaskInfo) {
-        writeLock();
+    public Map<Long, Integer> getBeIdToConcurrentTaskNum() {
+        Map<Long, Integer> beIdConcurrentTasksNum = Maps.newHashMap();
+        readLock();
         try {
-            needScheduleTaskInfoList.remove(routineLoadTaskInfo);
+            for (RoutineLoadTaskInfo routineLoadTaskInfo : 
routineLoadTaskInfoList) {
+                if (routineLoadTaskInfo.getBeId() != -1L) {
+                    long beId = routineLoadTaskInfo.getBeId();
+                    if (beIdConcurrentTasksNum.containsKey(beId)) {
+                        beIdConcurrentTasksNum.put(beId, 
beIdConcurrentTasksNum.get(beId) + 1);
+                    } else {
+                        beIdConcurrentTasksNum.put(beId, 1);
+                    }
+                }
+            }
+            return beIdConcurrentTasksNum;
         } finally {
-            writeUnlock();
+            readUnlock();
         }
     }
 
-    abstract void updateProgress(RoutineLoadProgress progress);
-
-    public boolean containsTask(String taskId) {
+    public boolean containsTask(UUID taskId) {
         readLock();
         try {
-            return routineLoadTaskInfoList.parallelStream()
+            return routineLoadTaskInfoList.stream()
                     .anyMatch(entity -> entity.getId().equals(taskId));
         } finally {
             readUnlock();
         }
     }
 
     // All of private method could not be call without lock
-    private void checkStateTransform(RoutineLoadJob.JobState desireState)
-            throws UnsupportedOperationException {
+    private void checkStateTransform(RoutineLoadJob.JobState desireState) 
throws UserException {
         switch (state) {
             case RUNNING:
                 if (desireState == JobState.NEED_SCHEDULE) {
-                    throw new UnsupportedOperationException("Could not 
transform " + state + " to " + desireState);
+                    throw new DdlException("Could not transform " + state + " 
to " + desireState);
                 }
                 break;
             case PAUSED:
                 if (desireState == JobState.PAUSED) {
-                    throw new UnsupportedOperationException("Could not 
transform " + state + " to " + desireState);
+                    throw new DdlException("Could not transform " + state + " 
to " + desireState);
                 }
                 break;
             case STOPPED:
             case CANCELLED:
-                throw new UnsupportedOperationException("Could not transfrom " 
+ state + " to " + desireState);
+                throw new DdlException("Could not transform " + state + " to " 
+ desireState);
             default:
                 break;
         }
     }
 
-    private void loadTxnCommit(TLoadTxnCommitRequest request) throws 
TException {
-        FrontendServiceImpl frontendService = new 
FrontendServiceImpl(ExecuteEnv.getInstance());
-        frontendService.loadTxnCommit(request);
+    // if rate of error data is more then max_filter_ratio, pause job
+    protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws 
UserException {
+        updateNumOfData(attachment.getTotalRows(), 
attachment.getFilteredRows(), attachment.getUnselectedRows(),
+                attachment.getReceivedBytes(), 
attachment.getTaskExecutionTimeMs(),
+                false /* not replay */);
     }
 
-    private void updateNumOfData(int numOfErrorData, int numOfTotalData) {
-        currentErrorNum += numOfErrorData;
-        currentTotalNum += numOfTotalData;
-        if (currentTotalNum > BASE_OF_ERROR_RATE) {
-            if (currentErrorNum > maxErrorNum) {
-                LOG.info("current error num {} of job {} is more then max 
error num {}. begin to pause job",
-                         currentErrorNum, id, maxErrorNum);
-                // remove all of task in jobs and change job state to paused
-                executePause("current error num of job is more then max error 
num");
+    private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, 
long unselectedRows, long receivedBytes,
+            long taskExecutionTime, boolean isReplay) throws UserException {
+        this.totalRows += numOfTotalRows;
+        this.errorRows += numOfErrorRows;
+        this.unselectedRows += unselectedRows;
+        this.receivedBytes += receivedBytes;
+        this.totalTaskExcutionTimeMs += taskExecutionTime;
+
+        if (MetricRepo.isInit.get()) {
+            MetricRepo.COUNTER_ROUTINE_LOAD_ROWS.increase(numOfTotalRows);
+            
MetricRepo.COUNTER_ROUTINE_LOAD_ERROR_ROWS.increase(numOfErrorRows);
+            
MetricRepo.COUNTER_ROUTINE_LOAD_RECEIVED_BYTES.increase(receivedBytes);
+        }
+
+        // check error rate
+        currentErrorRows += numOfErrorRows;
+        currentTotalRows += numOfTotalRows;
+        if (currentTotalRows > maxBatchRows * 10) {
+            if (currentErrorRows > maxErrorNum) {
+                LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
+                                 .add("current_total_rows", currentTotalRows)
+                                 .add("current_error_rows", currentErrorRows)
+                                 .add("max_error_num", maxErrorNum)
+                                 .add("msg", "current error rows is more then 
max error num, begin to pause job")
+                                 .build());
+                // if this is a replay thread, the update state should already 
be replayed by OP_CHANGE_ROUTINE_LOAD_JOB
+                if (!isReplay) {
+                    // remove all of task in jobs and change job state to 
paused
+                    updateState(JobState.PAUSED, "current error rows of job is 
more then max error num", isReplay);
+                }
             }
 
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
+                                  .add("current_total_rows", currentTotalRows)
+                                  .add("current_error_rows", currentErrorRows)
+                                  .add("max_error_num", maxErrorNum)
+                                  .add("msg", "reset current total rows and 
current error rows "
+                                          + "when current total rows is more 
then base")
+                                  .build());
+            }
             // reset currentTotalNum and currentErrorNum
-            currentErrorNum = 0;
-            currentTotalNum = 0;
-        } else if (currentErrorNum > maxErrorNum) {
-            LOG.info("current error num {} of job {} is more then max error 
num {}. begin to pause job",
-                     currentErrorNum, id, maxErrorNum);
-            // remove all of task in jobs and change job state to paused
-            executePause("current error num is more then max error num");
+            currentErrorRows = 0;
+            currentTotalRows = 0;
+        } else if (currentErrorRows > maxErrorNum) {
+            LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
+                             .add("current_total_rows", currentTotalRows)
+                             .add("current_error_rows", currentErrorRows)
+                             .add("max_error_num", maxErrorNum)
+                             .add("msg", "current error rows is more then max 
error rows, begin to pause job")
+                             .build());
+            if (!isReplay) {
+                // remove all of task in jobs and change job state to paused
+                updateState(JobState.PAUSED, "current error rows is more then 
max error num", isReplay);
+            }
             // reset currentTotalNum and currentErrorNum
-            currentErrorNum = 0;
-            currentTotalNum = 0;
+            currentErrorRows = 0;
+            currentTotalRows = 0;
         }
     }
 
-    abstract RoutineLoadTaskInfo reNewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo) throws AnalysisException,
-            LabelAlreadyUsedException, BeginTransactionException;
+    protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
+        try {
+            updateNumOfData(attachment.getTotalRows(), 
attachment.getFilteredRows(), attachment.getUnselectedRows(),
+                    attachment.getReceivedBytes(), 
attachment.getTaskExecutionTimeMs(), true /* is replay */);
+        } catch (UserException e) {
+            LOG.error("should not happen", e);
+        }
+    }
 
-    @Override
-    public void beforeAborted(TransactionState txnState, 
TransactionState.TxnStatusChangeReason txnStatusChangeReason)
-            throws AbortTransactionException {
-        readLock();
+    abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo);
+
+    public void initPlanner() throws UserException {
+        StreamLoadTask streamLoadTask = 
StreamLoadTask.fromRoutineLoadJob(this);
+        Database db = Catalog.getCurrentCatalog().getDb(dbId);
+        if (db == null) {
+            throw new MetaNotFoundException("db " + dbId + " does not exist");
+        }
+        planner = new StreamLoadPlanner(db, (OlapTable) 
db.getTable(this.tableId), streamLoadTask);
+    }
+
+    public TExecPlanFragmentParams plan() throws UserException {
+        Preconditions.checkNotNull(planner);
+        Database db = Catalog.getCurrentCatalog().getDb(dbId);
+        if (db == null) {
+            throw new MetaNotFoundException("db " + dbId + " does not exist");
+        }
+        db.readLock();
         try {
-            if (txnStatusChangeReason != null) {
-                switch (txnStatusChangeReason) {
-                    case TIMEOUT:
-                        String taskId = txnState.getLabel();
-                        if 
(routineLoadTaskInfoList.parallelStream().anyMatch(entity -> 
entity.getId().equals(taskId))) {
-                            throw new AbortTransactionException(
-                                    "there are task " + taskId + " related to 
this txn, "
-                                            + "txn could not be abort", 
txnState.getTransactionId());
-                        }
-                        break;
-                }
-            }
+            return planner.plan();
         } finally {
-            readUnlock();
+            db.readUnlock();
         }
     }
 
+    // if task not exists, before aborted will throw exception
+    // if task pass the checker, job lock will be locked
+    // if tryLock timeout, txn will be aborted by timeout progress.
+    // *** Please do not call before individually. It must be combined use 
with after ***
     @Override
-    public void onCommitted(TransactionState txnState) {
-        // step0: get progress from transaction state
-        RLTaskTxnCommitAttachment rlTaskTxnCommitAttachment = 
(RLTaskTxnCommitAttachment) txnState.getTxnCommitAttachment();
-
-        writeLock();
-        try {
-            // step1: find task in job
-            Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional =
-                    routineLoadTaskInfoList.parallelStream()
-                            .filter(entity -> 
entity.getId().equals(txnState.getLabel())).findFirst();
-            RoutineLoadTaskInfo routineLoadTaskInfo = 
routineLoadTaskInfoOptional.get();
+    public void beforeAborted(TransactionState txnState) throws 
TransactionException {
+        LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
+                          .add("txn_state", txnState)
+                          .add("msg", "task before aborted")
+                          .build());
+        executeBeforeCheck(txnState, TransactionStatus.ABORTED);
+    }
 
-            // step2: update job progress
-            updateProgress(rlTaskTxnCommitAttachment.getProgress());
+    // if task not exists, before aborted will throw exception
+    // if task pass the checker, lock job will be locked
+    // if tryLock timeout, txn will be aborted by timeout progress.
+    // *** Please do not call before individually. It must be combined use 
with after ***
+    @Override
+    public void beforeCommitted(TransactionState txnState) throws 
TransactionException {
+        LOG.debug(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, txnState.getLabel())
+                          .add("txn_state", txnState)
+                          .add("msg", "task before committed")
+                          .build());
+        executeBeforeCheck(txnState, TransactionStatus.COMMITTED);
+    }
 
-            // step3: remove task in agentTaskQueue
-            
AgentTaskQueue.removeTask(rlTaskTxnCommitAttachment.getBackendId(), 
TTaskType.STREAM_LOAD,
-                                      
rlTaskTxnCommitAttachment.getTaskSignature());
+    /*
+     * try lock the write lock.
+     * Make sure lock is released if any exception being thrown
+     */
+    private void executeBeforeCheck(TransactionState txnState, 
TransactionStatus transactionStatus)
+            throws TransactionException {
+        if (!tryWriteLock(2000, TimeUnit.MILLISECONDS)) {
+            // The lock of job has been locked by another thread more then 
timeout seconds.
+            // The commit txn by thread2 will be failed after waiting for 
timeout seconds.
+            // Maybe thread1 hang on somewhere
+            LOG.warn(new LogBuilder(LogKey.ROUINTE_LOAD_TASK, 
txnState.getLabel()).add("job_id", id).add("txn_status",
+                    transactionStatus).add("error_msg",
+                            "txn could not be transformed while waiting for 
timeout of routine load job"));
+            throw new TransactionException("txn " + 
txnState.getTransactionId() + "could not be " + transactionStatus
+                    + "while waiting for timeout of routine load job.");
+        }
 
-            // step4: if rate of error data is more then max_filter_ratio, 
pause job
-            updateNumOfData(rlTaskTxnCommitAttachment.getNumOfErrorData(), 
rlTaskTxnCommitAttachment.getNumOfTotalData());
+        // task already pass the checker
+        try {
+            // check if task has been aborted
+            Optional<RoutineLoadTaskInfo> routineLoadTaskInfoOptional =
+                    routineLoadTaskInfoList.stream()
+                            .filter(entity -> entity.getTxnId() == 
txnState.getTransactionId()).findFirst();
+            if (!routineLoadTaskInfoOptional.isPresent()) {
+                throw new TransactionException("txn " + 
txnState.getTransactionId()
+                                                       + " could not be " + 
transactionStatus
+                                                       + " while task " + 
txnState.getLabel() + " has been aborted.");
+            }
+        } catch (TransactionException e) {
 
 Review comment:
   other exception will cause your `lock` is locked all the time.
   
   you can set a boolean to true if check ok, and unlock write lock in finally 
block if this boolean is false

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to