This is an automated email from the ASF dual-hosted git repository.

nathanma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 43c2ee178 [AMORO-3794] Wrap task operation with process lock (#3798)
43c2ee178 is described below

commit 43c2ee17871a1d43968c9bcce875daf1c3b5d7ce
Author: ZhouJinsong <[email protected]>
AuthorDate: Sat Sep 27 21:59:39 2025 +0800

    [AMORO-3794] Wrap task operation with process lock (#3798)
    
    * wrap task operation with process lock
    
    * Fix some checkstyle errors
    
    * Add a lock timeout while polling task
---
 .../amoro/server/DefaultOptimizingService.java     |  30 ++---
 .../amoro/server/optimizing/OptimizingQueue.java   | 128 ++++++++++++++-------
 .../amoro/server/optimizing/TaskRuntime.java       |   6 +-
 .../server/dashboard/utils/TestOptimizingUtil.java |   7 +-
 .../server/optimizing/TestOptimizingQueue.java     | 106 ++++++++---------
 5 files changed, 148 insertions(+), 129 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 23b7c0464..6cc05bc56 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -32,7 +32,6 @@ import org.apache.amoro.exception.ForbiddenException;
 import org.apache.amoro.exception.IllegalTaskStateException;
 import org.apache.amoro.exception.ObjectNotExistsException;
 import org.apache.amoro.exception.PluginRetryAuthException;
-import org.apache.amoro.exception.TaskNotFoundException;
 import org.apache.amoro.resource.ResourceGroup;
 import org.apache.amoro.server.catalog.CatalogManager;
 import org.apache.amoro.server.optimizing.OptimizingProcess;
@@ -54,7 +53,6 @@ import org.apache.amoro.server.table.TableService;
 import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
 import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
 import 
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.amoro.shade.thrift.org.apache.thrift.TException;
 import org.apache.commons.lang3.StringUtils;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
@@ -214,33 +212,21 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
   @Override
   public OptimizingTask pollTask(String authToken, int threadId) {
     LOG.debug("Optimizer {} (threadId {}) try polling task", authToken, 
threadId);
+    OptimizerThread optimizerThread = 
getAuthenticatedOptimizer(authToken).getThread(threadId);
     OptimizingQueue queue = getQueueByToken(authToken);
-    return Optional.ofNullable(queue.pollTask(pollingTimeout, breakQuotaLimit))
-        .map(task -> extractOptimizingTask(task, authToken, threadId, queue))
-        .orElse(null);
-  }
-
-  private OptimizingTask extractOptimizingTask(
-      TaskRuntime<?> task, String authToken, int threadId, OptimizingQueue 
queue) {
-    try {
-      OptimizerThread optimizerThread = 
getAuthenticatedOptimizer(authToken).getThread(threadId);
-      task.schedule(optimizerThread);
+    TaskRuntime<?> task = queue.pollTask(optimizerThread, pollingTimeout, 
breakQuotaLimit);
+    if (task != null) {
       LOG.info("OptimizerThread {} polled task {}", optimizerThread, 
task.getTaskId());
       return task.extractProtocolTask();
-    } catch (Throwable throwable) {
-      LOG.error("Schedule task {} failed, put it to retry queue", 
task.getTaskId(), throwable);
-      queue.retryTask(task);
-      return null;
     }
+    return null;
   }
 
   @Override
   public void ackTask(String authToken, int threadId, OptimizingTaskId taskId) 
{
     LOG.info("Ack task {} by optimizer {} (threadId {})", taskId, authToken, 
threadId);
     OptimizingQueue queue = getQueueByToken(authToken);
-    Optional.ofNullable(queue.getTask(taskId))
-        .orElseThrow(() -> new TaskNotFoundException(taskId))
-        .ack(getAuthenticatedOptimizer(authToken).getThread(threadId));
+    queue.ackTask(taskId, 
getAuthenticatedOptimizer(authToken).getThread(threadId));
   }
 
   @Override
@@ -254,9 +240,7 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
     OptimizingQueue queue = getQueueByToken(authToken);
     OptimizerThread thread =
         
getAuthenticatedOptimizer(authToken).getThread(taskResult.getThreadId());
-    Optional.ofNullable(queue.getTask(taskResult.getTaskId()))
-        .orElseThrow(() -> new TaskNotFoundException(taskResult.getTaskId()))
-        .complete(thread, taskResult);
+    queue.completeTask(thread, taskResult);
   }
 
   @Override
@@ -284,7 +268,7 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
   }
 
   @Override
-  public boolean cancelProcess(long processId) throws TException {
+  public boolean cancelProcess(long processId) {
     TableProcessMeta processMeta =
         getAs(TableProcessMapper.class, m -> m.getProcessMeta(processId));
     if (processMeta == null) {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index 41216e701..909815bce 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -23,8 +23,10 @@ import org.apache.amoro.OptimizerProperties;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.api.BlockableOperation;
 import org.apache.amoro.api.OptimizingTaskId;
+import org.apache.amoro.api.OptimizingTaskResult;
 import org.apache.amoro.exception.OptimizingClosedException;
 import org.apache.amoro.exception.PersistenceException;
+import org.apache.amoro.exception.TaskNotFoundException;
 import org.apache.amoro.optimizing.MetricsSummary;
 import org.apache.amoro.optimizing.OptimizingType;
 import org.apache.amoro.optimizing.RewriteFilesInput;
@@ -44,6 +46,7 @@ import 
org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
 import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
 import org.apache.amoro.server.process.TableProcessMeta;
 import org.apache.amoro.server.resource.OptimizerInstance;
+import org.apache.amoro.server.resource.OptimizerThread;
 import org.apache.amoro.server.resource.QuotaProvider;
 import org.apache.amoro.server.table.DefaultTableRuntime;
 import org.apache.amoro.server.table.blocker.TableBlocker;
@@ -90,7 +93,6 @@ public class OptimizingQueue extends PersistentBase {
 
   private final QuotaProvider quotaProvider;
   private final Queue<TableOptimizingProcess> tableQueue = new 
LinkedTransferQueue<>();
-  private final Queue<TaskRuntime<?>> retryTaskQueue = new 
LinkedTransferQueue<>();
   private final SchedulingPolicy scheduler;
   private final CatalogManager catalogManager;
   private final Executor planExecutor;
@@ -200,24 +202,23 @@ public class OptimizingQueue extends PersistentBase {
 
   private void clearProcess(OptimizingProcess optimizingProcess) {
     tableQueue.removeIf(process -> process.getProcessId() == 
optimizingProcess.getProcessId());
-    retryTaskQueue.removeIf(
-        taskRuntime -> taskRuntime.getTaskId().getProcessId() == 
optimizingProcess.getProcessId());
   }
 
-  public TaskRuntime<?> pollTask(long maxWaitTime, boolean breakQuotaLimit) {
+  public TaskRuntime<?> pollTask(
+      OptimizerThread thread, long maxWaitTime, boolean breakQuotaLimit) {
     long deadline = calculateDeadline(maxWaitTime);
-    TaskRuntime<?> task = fetchTask();
+    TaskRuntime<?> task = fetchScheduledTask(thread, true);
     while (task == null && waitTask(deadline)) {
-      task = fetchTask();
+      task = fetchScheduledTask(thread, true);
     }
     if (task == null && breakQuotaLimit && planningTables.isEmpty()) {
-      task = fetchScheduledTask(false);
+      task = fetchScheduledTask(thread, false);
     }
     return task;
   }
 
-  public TaskRuntime<?> pollTask(long maxWaitTime) {
-    return pollTask(maxWaitTime, false);
+  public TaskRuntime<?> pollTask(OptimizerThread thread, long maxWaitTime) {
+    return pollTask(thread, maxWaitTime, true);
   }
 
   private long calculateDeadline(long maxWaitTime) {
@@ -240,14 +241,9 @@ public class OptimizingQueue extends PersistentBase {
     }
   }
 
-  private TaskRuntime<?> fetchTask() {
-    TaskRuntime<?> task = retryTaskQueue.poll();
-    return task != null ? task : fetchScheduledTask(true);
-  }
-
-  private TaskRuntime<?> fetchScheduledTask(boolean needQuotaChecking) {
+  private TaskRuntime<?> fetchScheduledTask(OptimizerThread thread, boolean 
needQuotaChecking) {
     return tableQueue.stream()
-        .map(process -> process.poll(needQuotaChecking))
+        .map(process -> process.poll(thread, needQuotaChecking))
         .filter(Objects::nonNull)
         .findFirst()
         .orElse(null);
@@ -352,12 +348,12 @@ public class OptimizingQueue extends PersistentBase {
     }
   }
 
-  public TaskRuntime<?> getTask(OptimizingTaskId taskId) {
-    return tableQueue.stream()
-        .filter(p -> p.getProcessId() == taskId.getProcessId())
-        .findFirst()
-        .map(p -> p.getTaskMap().get(taskId))
-        .orElse(null);
+  public void ackTask(OptimizingTaskId taskId, OptimizerThread thread) {
+    findProcess(taskId).ackTask(taskId, thread);
+  }
+
+  public void completeTask(OptimizerThread thread, OptimizingTaskResult 
result) {
+    findProcess(result.getTaskId()).completeTask(thread, result);
   }
 
   public List<TaskRuntime<?>> collectTasks() {
@@ -374,8 +370,7 @@ public class OptimizingQueue extends PersistentBase {
   }
 
   public void retryTask(TaskRuntime<?> taskRuntime) {
-    taskRuntime.reset();
-    retryTaskQueue.offer(taskRuntime);
+    
findProcess(taskRuntime.getTaskId()).resetTask((TaskRuntime<RewriteStageTask>) 
taskRuntime);
   }
 
   public ResourceGroup getOptimizerGroup() {
@@ -402,6 +397,13 @@ public class OptimizingQueue extends PersistentBase {
     this.metrics.unregister();
   }
 
+  private TableOptimizingProcess findProcess(OptimizingTaskId taskId) {
+    return tableQueue.stream()
+        .filter(p -> p.getProcessId() == taskId.getProcessId())
+        .findFirst()
+        .orElseThrow(() -> new TaskNotFoundException(taskId));
+  }
+
   private double getAvailableCore() {
     // the available core should be at least 1
     return Math.max(quotaProvider.getTotalQuota(optimizerGroup.getName()), 1);
@@ -437,26 +439,32 @@ public class OptimizingQueue extends PersistentBase {
     private Map<String, Long> toSequence = Maps.newHashMap();
     private boolean hasCommitted = false;
 
-    public TaskRuntime<?> poll(boolean needQuotaChecking) {
-      if (lock.tryLock()) {
-        try {
-          TaskRuntime<?> task = null;
-          if (status != ProcessStatus.KILLED && status != 
ProcessStatus.FAILED) {
-            int actualQuota = getActualQuota();
-            int quotaLimit = getQuotaLimit();
-            if (!needQuotaChecking || actualQuota < quotaLimit) {
-              task = taskQueue.poll();
+    public TaskRuntime<?> poll(OptimizerThread thread, boolean 
needQuotaChecking) {
+      try {
+        // Wait 10ms here for some light operation like poll/ack
+        if (lock.tryLock(10, TimeUnit.MILLISECONDS)) {
+          try {
+            TaskRuntime<?> task = null;
+            if (status != ProcessStatus.KILLED && status != 
ProcessStatus.FAILED) {
+              int actualQuota = getActualQuota();
+              int quotaLimit = getQuotaLimit();
+              if (!needQuotaChecking || actualQuota < quotaLimit) {
+                task = taskQueue.poll();
+              }
             }
+            if (task != null) {
+              optimizingTasksMap
+                  .computeIfAbsent(tableRuntime.getTableIdentifier(), k -> new 
AtomicInteger(0))
+                  .incrementAndGet();
+              task.schedule(thread);
+            }
+            return task;
+          } finally {
+            lock.unlock();
           }
-          if (task != null) {
-            optimizingTasksMap
-                .computeIfAbsent(tableRuntime.getTableIdentifier(), k -> new 
AtomicInteger(0))
-                .incrementAndGet();
-          }
-          return task;
-        } finally {
-          lock.unlock();
         }
+      } catch (InterruptedException e) {
+        // ignore it.
       }
       return null;
     }
@@ -543,6 +551,34 @@ public class OptimizingQueue extends PersistentBase {
       }
     }
 
+    private void ackTask(OptimizingTaskId taskId, OptimizerThread thread) {
+      TaskRuntime<?> taskRuntime = getTaskRuntime(taskId);
+      lock.lock();
+      try {
+        taskRuntime.ack(thread);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    private void completeTask(OptimizerThread thread, OptimizingTaskResult 
result) {
+      TaskRuntime<?> taskRuntime = getTaskRuntime(result.getTaskId());
+      lock.lock();
+      try {
+        taskRuntime.complete(thread, result);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    private TaskRuntime<?> getTaskRuntime(OptimizingTaskId taskId) {
+      TaskRuntime<?> taskRuntime = getTaskMap().get(taskId);
+      if (taskRuntime == null) {
+        throw new TaskNotFoundException(taskId);
+      }
+      return taskRuntime;
+    }
+
     private void acceptResult(TaskRuntime<?> taskRuntime) {
       lock.lock();
       try {
@@ -612,6 +648,16 @@ public class OptimizingQueue extends PersistentBase {
       }
     }
 
+    private void resetTask(TaskRuntime<RewriteStageTask> taskRuntime) {
+      lock.lock();
+      try {
+        taskRuntime.reset();
+        taskQueue.add(taskRuntime);
+      } finally {
+        lock.unlock();
+      }
+    }
+
     @Override
     public boolean isClosed() {
       return status == ProcessStatus.KILLED;
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
index f6db6e430..d0c3f9f7c 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
@@ -81,7 +81,7 @@ public class TaskRuntime<T extends StagedTaskDescriptor<?, ?, 
?>> extends Stated
     return future;
   }
 
-  public void complete(OptimizerThread thread, OptimizingTaskResult result) {
+  void complete(OptimizerThread thread, OptimizingTaskResult result) {
     invokeConsistency(
         () -> {
           validThread(thread);
@@ -121,7 +121,7 @@ public class TaskRuntime<T extends StagedTaskDescriptor<?, 
?, ?>> extends Stated
         });
   }
 
-  public void schedule(OptimizerThread thread) {
+  void schedule(OptimizerThread thread) {
     invokeConsistency(
         () -> {
           statusMachine.accept(Status.SCHEDULED);
@@ -132,7 +132,7 @@ public class TaskRuntime<T extends StagedTaskDescriptor<?, 
?, ?>> extends Stated
         });
   }
 
-  public void ack(OptimizerThread thread) {
+  void ack(OptimizerThread thread) {
     invokeConsistency(
         () -> {
           validThread(thread);
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
index 94737cbd7..b7b7bed03 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
@@ -93,13 +93,12 @@ public class TestOptimizingUtil extends AMSTableTestBase {
     DefaultTableRuntime tableRuntime = initTableWithFiles();
     OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);
     Assert.assertEquals(0, queue.collectTasks().size());
-    TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
-    task.schedule(optimizerThread);
-    task.ack(optimizerThread);
+    TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
+    queue.ackTask(task.getTaskId(), optimizerThread);
     Assert.assertEquals(
         1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.ACKED).size());
     Assert.assertNotNull(task);
-    task.complete(
+    queue.completeTask(
         optimizerThread,
         buildOptimizingTaskResult(task.getTaskId(), 
optimizerThread.getThreadId()));
     Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
index f5509afa6..342b533aa 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
@@ -131,7 +131,7 @@ public class TestOptimizingQueue extends AMSTableTestBase {
     DefaultTableRuntime tableRuntimeMeta =
         buildTableRuntimeMeta(OptimizingStatus.PENDING, 
defaultResourceGroup());
     OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta);
-    Assert.assertNull(queue.pollTask(0));
+    Assert.assertNull(queue.pollTask(optimizerThread, 0));
     queue.dispose();
   }
 
@@ -160,11 +160,11 @@ public class TestOptimizingQueue extends AMSTableTestBase 
{
     OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);
 
     // 1.poll task
-    TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
 
     Assert.assertNotNull(task);
-    Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
-    Assert.assertNull(queue.pollTask(0));
+    Assert.assertEquals(TaskRuntime.Status.SCHEDULED, task.getStatus());
+    Assert.assertNull(queue.pollTask(optimizerThread, 0));
     queue.dispose();
   }
 
@@ -180,24 +180,23 @@ public class TestOptimizingQueue extends AMSTableTestBase 
{
             Collections.singletonList(tableRuntime),
             1);
 
-    TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME, 
false);
     Assert.assertNotNull(task);
-    Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
-    task.schedule(optimizerThread);
-    task.ack(optimizerThread);
+    Assert.assertEquals(TaskRuntime.Status.SCHEDULED, task.getStatus());
+    queue.ackTask(task.getTaskId(), optimizerThread);
     Assert.assertEquals(
         1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.ACKED).size());
     Assert.assertNotNull(task);
 
-    TaskRuntime task2 = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> task2 = queue.pollTask(optimizerThread, MAX_POLLING_TIME, 
false);
     Assert.assertNull(task2);
 
-    task.complete(
+    queue.completeTask(
         optimizerThread,
         buildOptimizingTaskResult(task.getTaskId(), 
optimizerThread.getThreadId()));
     Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
 
-    TaskRuntime retryTask = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> retryTask = queue.pollTask(optimizerThread, 
MAX_POLLING_TIME);
     Assert.assertNotNull(retryTask);
 
     queue.dispose();
@@ -215,25 +214,24 @@ public class TestOptimizingQueue extends AMSTableTestBase 
{
             Collections.singletonList(tableRuntime),
             1);
 
-    TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
     Assert.assertNotNull(task);
-    Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
-    task.schedule(optimizerThread);
-    task.ack(optimizerThread);
+    Assert.assertEquals(TaskRuntime.Status.SCHEDULED, task.getStatus());
+    queue.ackTask(task.getTaskId(), optimizerThread);
     Assert.assertEquals(
         1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.ACKED).size());
     Assert.assertNotNull(task);
 
-    TaskRuntime task2 = queue.pollTask(MAX_POLLING_TIME, true);
+    TaskRuntime<?> task2 = queue.pollTask(optimizerThread, MAX_POLLING_TIME, 
true);
     Assert.assertNotNull(task2);
 
-    task.complete(
+    queue.completeTask(
         optimizerThread,
         buildOptimizingTaskResult(task.getTaskId(), 
optimizerThread.getThreadId()));
     Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
-    TaskRuntime task4 = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> task4 = queue.pollTask(optimizerThread, MAX_POLLING_TIME, 
false);
     Assert.assertNull(task4);
-    TaskRuntime retryTask = queue.pollTask(MAX_POLLING_TIME, true);
+    TaskRuntime<?> retryTask = queue.pollTask(optimizerThread, 
MAX_POLLING_TIME, true);
     Assert.assertNotNull(retryTask);
     queue.dispose();
   }
@@ -250,14 +248,13 @@ public class TestOptimizingQueue extends AMSTableTestBase 
{
             planExecutor,
             Collections.singletonList(tableRuntime),
             1);
-    TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
-    task.schedule(optimizerThread);
-    task.ack(optimizerThread);
+    TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
+    queue.ackTask(task.getTaskId(), optimizerThread);
     Assert.assertEquals(
         1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.ACKED).size());
     Assert.assertNotNull(task);
     Assert.assertTrue(tableRuntime.getTableIdentifier().getId() == 
task.getTableId());
-    task.complete(
+    queue.completeTask(
         optimizerThread,
         buildOptimizingTaskResult(task.getTaskId(), 
optimizerThread.getThreadId()));
     Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
@@ -280,10 +277,10 @@ public class TestOptimizingQueue extends AMSTableTestBase 
{
     queue.refreshTable(tableRuntime2);
     queue.refreshTable(tableRuntime);
 
-    TaskRuntime task2 = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> task2 = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
     Assert.assertNotNull(task2);
     Assert.assertTrue(tableRuntime2.getTableIdentifier().getId() == 
task2.getTableId());
-    TaskRuntime task3 = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> task3 = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
     Assert.assertNotNull(task3);
     Assert.assertTrue(tableRuntime.getTableIdentifier().getId() == 
task3.getTableId());
     queue.dispose();
@@ -296,27 +293,25 @@ public class TestOptimizingQueue extends AMSTableTestBase 
{
     OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta);
 
     // 1.poll task
-    TaskRuntime<?> task = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
     Assert.assertNotNull(task);
 
     for (int i = 0; i < 
TableProperties.SELF_OPTIMIZING_EXECUTE_RETRY_NUMBER_DEFAULT; i++) {
       queue.retryTask(task);
-      TaskRuntime<?> retryTask = queue.pollTask(MAX_POLLING_TIME);
+      TaskRuntime<?> retryTask = queue.pollTask(optimizerThread, 
MAX_POLLING_TIME);
       Assert.assertEquals(retryTask.getTaskId(), task.getTaskId());
-      retryTask.schedule(optimizerThread);
-      retryTask.ack(optimizerThread);
-      retryTask.complete(
+      queue.ackTask(task.getTaskId(), optimizerThread);
+      queue.completeTask(
           optimizerThread,
-          buildOptimizingTaskFailed(retryTask.getTaskId(), 
optimizerThread.getThreadId()));
+          buildOptimizingTaskFailed(task.getTaskId(), 
optimizerThread.getThreadId()));
       Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
     }
 
     queue.retryTask(task);
-    TaskRuntime<?> retryTask = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> retryTask = queue.pollTask(optimizerThread, 
MAX_POLLING_TIME);
     Assert.assertEquals(retryTask.getTaskId(), task.getTaskId());
-    retryTask.schedule(optimizerThread);
-    retryTask.ack(optimizerThread);
-    retryTask.complete(
+    queue.ackTask(task.getTaskId(), optimizerThread);
+    queue.completeTask(
         optimizerThread,
         buildOptimizingTaskFailed(task.getTaskId(), 
optimizerThread.getThreadId()));
     Assert.assertEquals(TaskRuntime.Status.FAILED, task.getStatus());
@@ -329,13 +324,12 @@ public class TestOptimizingQueue extends AMSTableTestBase 
{
     OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);
     Assert.assertEquals(0, queue.collectTasks().size());
 
-    TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
-    task.schedule(optimizerThread);
-    task.ack(optimizerThread);
+    TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
+    queue.ackTask(task.getTaskId(), optimizerThread);
     Assert.assertEquals(
         1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.ACKED).size());
     Assert.assertNotNull(task);
-    task.complete(
+    queue.completeTask(
         optimizerThread,
         buildOptimizingTaskResult(task.getTaskId(), 
optimizerThread.getThreadId()));
     Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
@@ -361,40 +355,38 @@ public class TestOptimizingQueue extends AMSTableTestBase 
{
     OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);
     Assert.assertEquals(0, queue.collectTasks().size());
 
-    TaskRuntime firstTask = queue.pollTask(MAX_POLLING_TIME);
-    firstTask.schedule(optimizerThread);
-    firstTask.ack(optimizerThread);
+    TaskRuntime<?> firstTask = queue.pollTask(optimizerThread, 
MAX_POLLING_TIME);
+    queue.ackTask(firstTask.getTaskId(), optimizerThread);
     Assert.assertEquals(
         1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.ACKED).size());
     Assert.assertNotNull(firstTask);
-    firstTask.complete(
+    queue.completeTask(
         optimizerThread,
         buildOptimizingTaskResult(firstTask.getTaskId(), 
optimizerThread.getThreadId()));
     Assert.assertEquals(TaskRuntime.Status.SUCCESS, firstTask.getStatus());
 
-    TaskRuntime<?> task = queue.pollTask(MAX_POLLING_TIME);
+    queue.pollTask(optimizerThread, MAX_POLLING_TIME);
+    TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
     Assert.assertNotNull(task);
 
     for (int i = 0; i < 
TableProperties.SELF_OPTIMIZING_EXECUTE_RETRY_NUMBER_DEFAULT; i++) {
       queue.retryTask(task);
-      TaskRuntime<?> retryTask = queue.pollTask(MAX_POLLING_TIME);
+      TaskRuntime<?> retryTask = queue.pollTask(optimizerThread, 
MAX_POLLING_TIME);
       Assert.assertEquals(retryTask.getTaskId(), task.getTaskId());
-      retryTask.schedule(optimizerThread);
-      retryTask.ack(optimizerThread);
-      retryTask.complete(
+      queue.ackTask(task.getTaskId(), optimizerThread);
+      queue.completeTask(
           optimizerThread,
           buildOptimizingTaskFailed(task.getTaskId(), 
optimizerThread.getThreadId()));
       Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
     }
 
     queue.retryTask(task);
-    TaskRuntime<?> retryTask = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> retryTask = queue.pollTask(optimizerThread, 
MAX_POLLING_TIME);
     Assert.assertEquals(retryTask.getTaskId(), task.getTaskId());
-    retryTask.schedule(optimizerThread);
-    retryTask.ack(optimizerThread);
+    queue.ackTask(retryTask.getTaskId(), optimizerThread);
 
     OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess();
-    retryTask.complete(
+    queue.completeTask(
         optimizerThread,
         buildOptimizingTaskFailed(task.getTaskId(), 
optimizerThread.getThreadId()));
     Assert.assertEquals(TaskRuntime.Status.FAILED, task.getStatus());
@@ -411,9 +403,8 @@ public class TestOptimizingQueue extends AMSTableTestBase {
     OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);
     Assert.assertEquals(0, queue.collectTasks().size());
 
-    TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
     Assert.assertNotNull(task);
-    task.schedule(optimizerThread);
     Assert.assertEquals(1, queue.collectTasks().size());
     Assert.assertEquals(
         1, queue.collectTasks(t -> t.getStatus() == 
TaskRuntime.Status.SCHEDULED).size());
@@ -457,9 +448,8 @@ public class TestOptimizingQueue extends AMSTableTestBase {
     Assert.assertEquals(0, idleTablesGauge.getValue().longValue());
     Assert.assertEquals(0, committingTablesGauge.getValue().longValue());
 
-    TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+    TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
     Assert.assertNotNull(task);
-    task.schedule(optimizerThread);
     Assert.assertEquals(1, queueTasksGauge.getValue().longValue());
     Assert.assertEquals(0, executingTasksGauge.getValue().longValue());
     Assert.assertEquals(0, planingTablesGauge.getValue().longValue());
@@ -468,7 +458,7 @@ public class TestOptimizingQueue extends AMSTableTestBase {
     Assert.assertEquals(0, idleTablesGauge.getValue().longValue());
     Assert.assertEquals(0, committingTablesGauge.getValue().longValue());
 
-    task.ack(optimizerThread);
+    queue.ackTask(task.getTaskId(), optimizerThread);
     Assert.assertEquals(0, queueTasksGauge.getValue().longValue());
     Assert.assertEquals(1, executingTasksGauge.getValue().longValue());
     Assert.assertEquals(0, planingTablesGauge.getValue().longValue());
@@ -477,7 +467,7 @@ public class TestOptimizingQueue extends AMSTableTestBase {
     Assert.assertEquals(0, idleTablesGauge.getValue().longValue());
     Assert.assertEquals(0, committingTablesGauge.getValue().longValue());
 
-    task.complete(
+    queue.completeTask(
         optimizerThread,
         buildOptimizingTaskResult(task.getTaskId(), 
optimizerThread.getThreadId()));
     Assert.assertEquals(0, queueTasksGauge.getValue().longValue());

Reply via email to