This is an automated email from the ASF dual-hosted git repository.
nathanma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 43c2ee178 [AMORO-3794] Wrap task operation with process lock (#3798)
43c2ee178 is described below
commit 43c2ee17871a1d43968c9bcce875daf1c3b5d7ce
Author: ZhouJinsong <[email protected]>
AuthorDate: Sat Sep 27 21:59:39 2025 +0800
[AMORO-3794] Wrap task operation with process lock (#3798)
* wrap task operation with process lock
* Fix some checkstyle errors
* Add a lock timeout while polling task
---
.../amoro/server/DefaultOptimizingService.java | 30 ++---
.../amoro/server/optimizing/OptimizingQueue.java | 128 ++++++++++++++-------
.../amoro/server/optimizing/TaskRuntime.java | 6 +-
.../server/dashboard/utils/TestOptimizingUtil.java | 7 +-
.../server/optimizing/TestOptimizingQueue.java | 106 ++++++++---------
5 files changed, 148 insertions(+), 129 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 23b7c0464..6cc05bc56 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -32,7 +32,6 @@ import org.apache.amoro.exception.ForbiddenException;
import org.apache.amoro.exception.IllegalTaskStateException;
import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.exception.PluginRetryAuthException;
-import org.apache.amoro.exception.TaskNotFoundException;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.optimizing.OptimizingProcess;
@@ -54,7 +53,6 @@ import org.apache.amoro.server.table.TableService;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.amoro.shade.thrift.org.apache.thrift.TException;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
@@ -214,33 +212,21 @@ public class DefaultOptimizingService extends
StatedPersistentBase
@Override
public OptimizingTask pollTask(String authToken, int threadId) {
LOG.debug("Optimizer {} (threadId {}) try polling task", authToken,
threadId);
+ OptimizerThread optimizerThread =
getAuthenticatedOptimizer(authToken).getThread(threadId);
OptimizingQueue queue = getQueueByToken(authToken);
- return Optional.ofNullable(queue.pollTask(pollingTimeout, breakQuotaLimit))
- .map(task -> extractOptimizingTask(task, authToken, threadId, queue))
- .orElse(null);
- }
-
- private OptimizingTask extractOptimizingTask(
- TaskRuntime<?> task, String authToken, int threadId, OptimizingQueue
queue) {
- try {
- OptimizerThread optimizerThread =
getAuthenticatedOptimizer(authToken).getThread(threadId);
- task.schedule(optimizerThread);
+ TaskRuntime<?> task = queue.pollTask(optimizerThread, pollingTimeout,
breakQuotaLimit);
+ if (task != null) {
LOG.info("OptimizerThread {} polled task {}", optimizerThread,
task.getTaskId());
return task.extractProtocolTask();
- } catch (Throwable throwable) {
- LOG.error("Schedule task {} failed, put it to retry queue",
task.getTaskId(), throwable);
- queue.retryTask(task);
- return null;
}
+ return null;
}
@Override
public void ackTask(String authToken, int threadId, OptimizingTaskId taskId)
{
LOG.info("Ack task {} by optimizer {} (threadId {})", taskId, authToken,
threadId);
OptimizingQueue queue = getQueueByToken(authToken);
- Optional.ofNullable(queue.getTask(taskId))
- .orElseThrow(() -> new TaskNotFoundException(taskId))
- .ack(getAuthenticatedOptimizer(authToken).getThread(threadId));
+ queue.ackTask(taskId,
getAuthenticatedOptimizer(authToken).getThread(threadId));
}
@Override
@@ -254,9 +240,7 @@ public class DefaultOptimizingService extends
StatedPersistentBase
OptimizingQueue queue = getQueueByToken(authToken);
OptimizerThread thread =
getAuthenticatedOptimizer(authToken).getThread(taskResult.getThreadId());
- Optional.ofNullable(queue.getTask(taskResult.getTaskId()))
- .orElseThrow(() -> new TaskNotFoundException(taskResult.getTaskId()))
- .complete(thread, taskResult);
+ queue.completeTask(thread, taskResult);
}
@Override
@@ -284,7 +268,7 @@ public class DefaultOptimizingService extends
StatedPersistentBase
}
@Override
- public boolean cancelProcess(long processId) throws TException {
+ public boolean cancelProcess(long processId) {
TableProcessMeta processMeta =
getAs(TableProcessMapper.class, m -> m.getProcessMeta(processId));
if (processMeta == null) {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index 41216e701..909815bce 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -23,8 +23,10 @@ import org.apache.amoro.OptimizerProperties;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.api.BlockableOperation;
import org.apache.amoro.api.OptimizingTaskId;
+import org.apache.amoro.api.OptimizingTaskResult;
import org.apache.amoro.exception.OptimizingClosedException;
import org.apache.amoro.exception.PersistenceException;
+import org.apache.amoro.exception.TaskNotFoundException;
import org.apache.amoro.optimizing.MetricsSummary;
import org.apache.amoro.optimizing.OptimizingType;
import org.apache.amoro.optimizing.RewriteFilesInput;
@@ -44,6 +46,7 @@ import
org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
import org.apache.amoro.server.process.TableProcessMeta;
import org.apache.amoro.server.resource.OptimizerInstance;
+import org.apache.amoro.server.resource.OptimizerThread;
import org.apache.amoro.server.resource.QuotaProvider;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.server.table.blocker.TableBlocker;
@@ -90,7 +93,6 @@ public class OptimizingQueue extends PersistentBase {
private final QuotaProvider quotaProvider;
private final Queue<TableOptimizingProcess> tableQueue = new
LinkedTransferQueue<>();
- private final Queue<TaskRuntime<?>> retryTaskQueue = new
LinkedTransferQueue<>();
private final SchedulingPolicy scheduler;
private final CatalogManager catalogManager;
private final Executor planExecutor;
@@ -200,24 +202,23 @@ public class OptimizingQueue extends PersistentBase {
private void clearProcess(OptimizingProcess optimizingProcess) {
tableQueue.removeIf(process -> process.getProcessId() ==
optimizingProcess.getProcessId());
- retryTaskQueue.removeIf(
- taskRuntime -> taskRuntime.getTaskId().getProcessId() ==
optimizingProcess.getProcessId());
}
- public TaskRuntime<?> pollTask(long maxWaitTime, boolean breakQuotaLimit) {
+ public TaskRuntime<?> pollTask(
+ OptimizerThread thread, long maxWaitTime, boolean breakQuotaLimit) {
long deadline = calculateDeadline(maxWaitTime);
- TaskRuntime<?> task = fetchTask();
+ TaskRuntime<?> task = fetchScheduledTask(thread, true);
while (task == null && waitTask(deadline)) {
- task = fetchTask();
+ task = fetchScheduledTask(thread, true);
}
if (task == null && breakQuotaLimit && planningTables.isEmpty()) {
- task = fetchScheduledTask(false);
+ task = fetchScheduledTask(thread, false);
}
return task;
}
- public TaskRuntime<?> pollTask(long maxWaitTime) {
- return pollTask(maxWaitTime, false);
+ public TaskRuntime<?> pollTask(OptimizerThread thread, long maxWaitTime) {
+ return pollTask(thread, maxWaitTime, true);
}
private long calculateDeadline(long maxWaitTime) {
@@ -240,14 +241,9 @@ public class OptimizingQueue extends PersistentBase {
}
}
- private TaskRuntime<?> fetchTask() {
- TaskRuntime<?> task = retryTaskQueue.poll();
- return task != null ? task : fetchScheduledTask(true);
- }
-
- private TaskRuntime<?> fetchScheduledTask(boolean needQuotaChecking) {
+ private TaskRuntime<?> fetchScheduledTask(OptimizerThread thread, boolean
needQuotaChecking) {
return tableQueue.stream()
- .map(process -> process.poll(needQuotaChecking))
+ .map(process -> process.poll(thread, needQuotaChecking))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
@@ -352,12 +348,12 @@ public class OptimizingQueue extends PersistentBase {
}
}
- public TaskRuntime<?> getTask(OptimizingTaskId taskId) {
- return tableQueue.stream()
- .filter(p -> p.getProcessId() == taskId.getProcessId())
- .findFirst()
- .map(p -> p.getTaskMap().get(taskId))
- .orElse(null);
+ public void ackTask(OptimizingTaskId taskId, OptimizerThread thread) {
+ findProcess(taskId).ackTask(taskId, thread);
+ }
+
+ public void completeTask(OptimizerThread thread, OptimizingTaskResult
result) {
+ findProcess(result.getTaskId()).completeTask(thread, result);
}
public List<TaskRuntime<?>> collectTasks() {
@@ -374,8 +370,7 @@ public class OptimizingQueue extends PersistentBase {
}
public void retryTask(TaskRuntime<?> taskRuntime) {
- taskRuntime.reset();
- retryTaskQueue.offer(taskRuntime);
+
findProcess(taskRuntime.getTaskId()).resetTask((TaskRuntime<RewriteStageTask>)
taskRuntime);
}
public ResourceGroup getOptimizerGroup() {
@@ -402,6 +397,13 @@ public class OptimizingQueue extends PersistentBase {
this.metrics.unregister();
}
+ private TableOptimizingProcess findProcess(OptimizingTaskId taskId) {
+ return tableQueue.stream()
+ .filter(p -> p.getProcessId() == taskId.getProcessId())
+ .findFirst()
+ .orElseThrow(() -> new TaskNotFoundException(taskId));
+ }
+
private double getAvailableCore() {
// the available core should be at least 1
return Math.max(quotaProvider.getTotalQuota(optimizerGroup.getName()), 1);
@@ -437,26 +439,32 @@ public class OptimizingQueue extends PersistentBase {
private Map<String, Long> toSequence = Maps.newHashMap();
private boolean hasCommitted = false;
- public TaskRuntime<?> poll(boolean needQuotaChecking) {
- if (lock.tryLock()) {
- try {
- TaskRuntime<?> task = null;
- if (status != ProcessStatus.KILLED && status !=
ProcessStatus.FAILED) {
- int actualQuota = getActualQuota();
- int quotaLimit = getQuotaLimit();
- if (!needQuotaChecking || actualQuota < quotaLimit) {
- task = taskQueue.poll();
+ public TaskRuntime<?> poll(OptimizerThread thread, boolean
needQuotaChecking) {
+ try {
+ // Wait 10ms here for some light operation like poll/ack
+ if (lock.tryLock(10, TimeUnit.MILLISECONDS)) {
+ try {
+ TaskRuntime<?> task = null;
+ if (status != ProcessStatus.KILLED && status !=
ProcessStatus.FAILED) {
+ int actualQuota = getActualQuota();
+ int quotaLimit = getQuotaLimit();
+ if (!needQuotaChecking || actualQuota < quotaLimit) {
+ task = taskQueue.poll();
+ }
}
+ if (task != null) {
+ optimizingTasksMap
+ .computeIfAbsent(tableRuntime.getTableIdentifier(), k -> new
AtomicInteger(0))
+ .incrementAndGet();
+ task.schedule(thread);
+ }
+ return task;
+ } finally {
+ lock.unlock();
}
- if (task != null) {
- optimizingTasksMap
- .computeIfAbsent(tableRuntime.getTableIdentifier(), k -> new
AtomicInteger(0))
- .incrementAndGet();
- }
- return task;
- } finally {
- lock.unlock();
}
+ } catch (InterruptedException e) {
+ // ignore it.
}
return null;
}
@@ -543,6 +551,34 @@ public class OptimizingQueue extends PersistentBase {
}
}
+ private void ackTask(OptimizingTaskId taskId, OptimizerThread thread) {
+ TaskRuntime<?> taskRuntime = getTaskRuntime(taskId);
+ lock.lock();
+ try {
+ taskRuntime.ack(thread);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void completeTask(OptimizerThread thread, OptimizingTaskResult
result) {
+ TaskRuntime<?> taskRuntime = getTaskRuntime(result.getTaskId());
+ lock.lock();
+ try {
+ taskRuntime.complete(thread, result);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private TaskRuntime<?> getTaskRuntime(OptimizingTaskId taskId) {
+ TaskRuntime<?> taskRuntime = getTaskMap().get(taskId);
+ if (taskRuntime == null) {
+ throw new TaskNotFoundException(taskId);
+ }
+ return taskRuntime;
+ }
+
private void acceptResult(TaskRuntime<?> taskRuntime) {
lock.lock();
try {
@@ -612,6 +648,16 @@ public class OptimizingQueue extends PersistentBase {
}
}
+ private void resetTask(TaskRuntime<RewriteStageTask> taskRuntime) {
+ lock.lock();
+ try {
+ taskRuntime.reset();
+ taskQueue.add(taskRuntime);
+ } finally {
+ lock.unlock();
+ }
+ }
+
@Override
public boolean isClosed() {
return status == ProcessStatus.KILLED;
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
index f6db6e430..d0c3f9f7c 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java
@@ -81,7 +81,7 @@ public class TaskRuntime<T extends StagedTaskDescriptor<?, ?,
?>> extends Stated
return future;
}
- public void complete(OptimizerThread thread, OptimizingTaskResult result) {
+ void complete(OptimizerThread thread, OptimizingTaskResult result) {
invokeConsistency(
() -> {
validThread(thread);
@@ -121,7 +121,7 @@ public class TaskRuntime<T extends StagedTaskDescriptor<?,
?, ?>> extends Stated
});
}
- public void schedule(OptimizerThread thread) {
+ void schedule(OptimizerThread thread) {
invokeConsistency(
() -> {
statusMachine.accept(Status.SCHEDULED);
@@ -132,7 +132,7 @@ public class TaskRuntime<T extends StagedTaskDescriptor<?,
?, ?>> extends Stated
});
}
- public void ack(OptimizerThread thread) {
+ void ack(OptimizerThread thread) {
invokeConsistency(
() -> {
validThread(thread);
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
index 94737cbd7..b7b7bed03 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/utils/TestOptimizingUtil.java
@@ -93,13 +93,12 @@ public class TestOptimizingUtil extends AMSTableTestBase {
DefaultTableRuntime tableRuntime = initTableWithFiles();
OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);
Assert.assertEquals(0, queue.collectTasks().size());
- TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
- task.schedule(optimizerThread);
- task.ack(optimizerThread);
+ TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
+ queue.ackTask(task.getTaskId(), optimizerThread);
Assert.assertEquals(
1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.ACKED).size());
Assert.assertNotNull(task);
- task.complete(
+ queue.completeTask(
optimizerThread,
buildOptimizingTaskResult(task.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
index f5509afa6..342b533aa 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
@@ -131,7 +131,7 @@ public class TestOptimizingQueue extends AMSTableTestBase {
DefaultTableRuntime tableRuntimeMeta =
buildTableRuntimeMeta(OptimizingStatus.PENDING,
defaultResourceGroup());
OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta);
- Assert.assertNull(queue.pollTask(0));
+ Assert.assertNull(queue.pollTask(optimizerThread, 0));
queue.dispose();
}
@@ -160,11 +160,11 @@ public class TestOptimizingQueue extends AMSTableTestBase
{
OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);
// 1.poll task
- TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(task);
- Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
- Assert.assertNull(queue.pollTask(0));
+ Assert.assertEquals(TaskRuntime.Status.SCHEDULED, task.getStatus());
+ Assert.assertNull(queue.pollTask(optimizerThread, 0));
queue.dispose();
}
@@ -180,24 +180,23 @@ public class TestOptimizingQueue extends AMSTableTestBase
{
Collections.singletonList(tableRuntime),
1);
- TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME,
false);
Assert.assertNotNull(task);
- Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
- task.schedule(optimizerThread);
- task.ack(optimizerThread);
+ Assert.assertEquals(TaskRuntime.Status.SCHEDULED, task.getStatus());
+ queue.ackTask(task.getTaskId(), optimizerThread);
Assert.assertEquals(
1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.ACKED).size());
Assert.assertNotNull(task);
- TaskRuntime task2 = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> task2 = queue.pollTask(optimizerThread, MAX_POLLING_TIME,
false);
Assert.assertNull(task2);
- task.complete(
+ queue.completeTask(
optimizerThread,
buildOptimizingTaskResult(task.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
- TaskRuntime retryTask = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> retryTask = queue.pollTask(optimizerThread,
MAX_POLLING_TIME);
Assert.assertNotNull(retryTask);
queue.dispose();
@@ -215,25 +214,24 @@ public class TestOptimizingQueue extends AMSTableTestBase
{
Collections.singletonList(tableRuntime),
1);
- TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(task);
- Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
- task.schedule(optimizerThread);
- task.ack(optimizerThread);
+ Assert.assertEquals(TaskRuntime.Status.SCHEDULED, task.getStatus());
+ queue.ackTask(task.getTaskId(), optimizerThread);
Assert.assertEquals(
1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.ACKED).size());
Assert.assertNotNull(task);
- TaskRuntime task2 = queue.pollTask(MAX_POLLING_TIME, true);
+ TaskRuntime<?> task2 = queue.pollTask(optimizerThread, MAX_POLLING_TIME,
true);
Assert.assertNotNull(task2);
- task.complete(
+ queue.completeTask(
optimizerThread,
buildOptimizingTaskResult(task.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
- TaskRuntime task4 = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> task4 = queue.pollTask(optimizerThread, MAX_POLLING_TIME,
false);
Assert.assertNull(task4);
- TaskRuntime retryTask = queue.pollTask(MAX_POLLING_TIME, true);
+ TaskRuntime<?> retryTask = queue.pollTask(optimizerThread,
MAX_POLLING_TIME, true);
Assert.assertNotNull(retryTask);
queue.dispose();
}
@@ -250,14 +248,13 @@ public class TestOptimizingQueue extends AMSTableTestBase
{
planExecutor,
Collections.singletonList(tableRuntime),
1);
- TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
- task.schedule(optimizerThread);
- task.ack(optimizerThread);
+ TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
+ queue.ackTask(task.getTaskId(), optimizerThread);
Assert.assertEquals(
1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.ACKED).size());
Assert.assertNotNull(task);
Assert.assertTrue(tableRuntime.getTableIdentifier().getId() ==
task.getTableId());
- task.complete(
+ queue.completeTask(
optimizerThread,
buildOptimizingTaskResult(task.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
@@ -280,10 +277,10 @@ public class TestOptimizingQueue extends AMSTableTestBase
{
queue.refreshTable(tableRuntime2);
queue.refreshTable(tableRuntime);
- TaskRuntime task2 = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> task2 = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(task2);
Assert.assertTrue(tableRuntime2.getTableIdentifier().getId() ==
task2.getTableId());
- TaskRuntime task3 = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> task3 = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(task3);
Assert.assertTrue(tableRuntime.getTableIdentifier().getId() ==
task3.getTableId());
queue.dispose();
@@ -296,27 +293,25 @@ public class TestOptimizingQueue extends AMSTableTestBase
{
OptimizingQueue queue = buildOptimizingGroupService(tableRuntimeMeta);
// 1.poll task
- TaskRuntime<?> task = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(task);
for (int i = 0; i <
TableProperties.SELF_OPTIMIZING_EXECUTE_RETRY_NUMBER_DEFAULT; i++) {
queue.retryTask(task);
- TaskRuntime<?> retryTask = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> retryTask = queue.pollTask(optimizerThread,
MAX_POLLING_TIME);
Assert.assertEquals(retryTask.getTaskId(), task.getTaskId());
- retryTask.schedule(optimizerThread);
- retryTask.ack(optimizerThread);
- retryTask.complete(
+ queue.ackTask(task.getTaskId(), optimizerThread);
+ queue.completeTask(
optimizerThread,
- buildOptimizingTaskFailed(retryTask.getTaskId(),
optimizerThread.getThreadId()));
+ buildOptimizingTaskFailed(task.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
}
queue.retryTask(task);
- TaskRuntime<?> retryTask = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> retryTask = queue.pollTask(optimizerThread,
MAX_POLLING_TIME);
Assert.assertEquals(retryTask.getTaskId(), task.getTaskId());
- retryTask.schedule(optimizerThread);
- retryTask.ack(optimizerThread);
- retryTask.complete(
+ queue.ackTask(task.getTaskId(), optimizerThread);
+ queue.completeTask(
optimizerThread,
buildOptimizingTaskFailed(task.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(TaskRuntime.Status.FAILED, task.getStatus());
@@ -329,13 +324,12 @@ public class TestOptimizingQueue extends AMSTableTestBase
{
OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);
Assert.assertEquals(0, queue.collectTasks().size());
- TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
- task.schedule(optimizerThread);
- task.ack(optimizerThread);
+ TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
+ queue.ackTask(task.getTaskId(), optimizerThread);
Assert.assertEquals(
1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.ACKED).size());
Assert.assertNotNull(task);
- task.complete(
+ queue.completeTask(
optimizerThread,
buildOptimizingTaskResult(task.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(TaskRuntime.Status.SUCCESS, task.getStatus());
@@ -361,40 +355,38 @@ public class TestOptimizingQueue extends AMSTableTestBase
{
OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);
Assert.assertEquals(0, queue.collectTasks().size());
- TaskRuntime firstTask = queue.pollTask(MAX_POLLING_TIME);
- firstTask.schedule(optimizerThread);
- firstTask.ack(optimizerThread);
+ TaskRuntime<?> firstTask = queue.pollTask(optimizerThread,
MAX_POLLING_TIME);
+ queue.ackTask(firstTask.getTaskId(), optimizerThread);
Assert.assertEquals(
1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.ACKED).size());
Assert.assertNotNull(firstTask);
- firstTask.complete(
+ queue.completeTask(
optimizerThread,
buildOptimizingTaskResult(firstTask.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(TaskRuntime.Status.SUCCESS, firstTask.getStatus());
- TaskRuntime<?> task = queue.pollTask(MAX_POLLING_TIME);
+ queue.pollTask(optimizerThread, MAX_POLLING_TIME);
+ TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(task);
for (int i = 0; i <
TableProperties.SELF_OPTIMIZING_EXECUTE_RETRY_NUMBER_DEFAULT; i++) {
queue.retryTask(task);
- TaskRuntime<?> retryTask = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> retryTask = queue.pollTask(optimizerThread,
MAX_POLLING_TIME);
Assert.assertEquals(retryTask.getTaskId(), task.getTaskId());
- retryTask.schedule(optimizerThread);
- retryTask.ack(optimizerThread);
- retryTask.complete(
+ queue.ackTask(task.getTaskId(), optimizerThread);
+ queue.completeTask(
optimizerThread,
buildOptimizingTaskFailed(task.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(TaskRuntime.Status.PLANNED, task.getStatus());
}
queue.retryTask(task);
- TaskRuntime<?> retryTask = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> retryTask = queue.pollTask(optimizerThread,
MAX_POLLING_TIME);
Assert.assertEquals(retryTask.getTaskId(), task.getTaskId());
- retryTask.schedule(optimizerThread);
- retryTask.ack(optimizerThread);
+ queue.ackTask(retryTask.getTaskId(), optimizerThread);
OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess();
- retryTask.complete(
+ queue.completeTask(
optimizerThread,
buildOptimizingTaskFailed(task.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(TaskRuntime.Status.FAILED, task.getStatus());
@@ -411,9 +403,8 @@ public class TestOptimizingQueue extends AMSTableTestBase {
OptimizingQueue queue = buildOptimizingGroupService(tableRuntime);
Assert.assertEquals(0, queue.collectTasks().size());
- TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(task);
- task.schedule(optimizerThread);
Assert.assertEquals(1, queue.collectTasks().size());
Assert.assertEquals(
1, queue.collectTasks(t -> t.getStatus() ==
TaskRuntime.Status.SCHEDULED).size());
@@ -457,9 +448,8 @@ public class TestOptimizingQueue extends AMSTableTestBase {
Assert.assertEquals(0, idleTablesGauge.getValue().longValue());
Assert.assertEquals(0, committingTablesGauge.getValue().longValue());
- TaskRuntime task = queue.pollTask(MAX_POLLING_TIME);
+ TaskRuntime<?> task = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(task);
- task.schedule(optimizerThread);
Assert.assertEquals(1, queueTasksGauge.getValue().longValue());
Assert.assertEquals(0, executingTasksGauge.getValue().longValue());
Assert.assertEquals(0, planingTablesGauge.getValue().longValue());
@@ -468,7 +458,7 @@ public class TestOptimizingQueue extends AMSTableTestBase {
Assert.assertEquals(0, idleTablesGauge.getValue().longValue());
Assert.assertEquals(0, committingTablesGauge.getValue().longValue());
- task.ack(optimizerThread);
+ queue.ackTask(task.getTaskId(), optimizerThread);
Assert.assertEquals(0, queueTasksGauge.getValue().longValue());
Assert.assertEquals(1, executingTasksGauge.getValue().longValue());
Assert.assertEquals(0, planingTablesGauge.getValue().longValue());
@@ -477,7 +467,7 @@ public class TestOptimizingQueue extends AMSTableTestBase {
Assert.assertEquals(0, idleTablesGauge.getValue().longValue());
Assert.assertEquals(0, committingTablesGauge.getValue().longValue());
- task.complete(
+ queue.completeTask(
optimizerThread,
buildOptimizingTaskResult(task.getTaskId(),
optimizerThread.getThreadId()));
Assert.assertEquals(0, queueTasksGauge.getValue().longValue());