ic4y commented on code in PR #2366:
URL:
https://github.com/apache/incubator-seatunnel/pull/2366#discussion_r942104611
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -18,134 +18,149 @@
package org.apache.seatunnel.engine.server;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import static com.hazelcast.jet.impl.util.Util.uncheckRun;
+import static java.util.Collections.emptyList;
import static java.util.concurrent.Executors.newCachedThreadPool;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toList;
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.ProgressState;
import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskCallTimer;
import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.execution.TaskTracker;
-import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.logging.ILogger;
-import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.properties.HazelcastProperties;
import lombok.NonNull;
+import lombok.SneakyThrows;
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
/**
* This class is responsible for the execution of the Task
*/
public class TaskExecutionService {
private final String hzInstanceName;
- private final NodeEngine nodeEngine;
+ private final NodeEngineImpl nodeEngine;
private final ILogger logger;
private volatile boolean isShutdown;
- private final ExecutorService blockingTaskletExecutor =
newCachedThreadPool(new BlockingTaskThreadFactory());
+ private final LinkedBlockingDeque<TaskTracker> threadShareTaskQueue = new
LinkedBlockingDeque<>();
+ private final ExecutorService executorService = newCachedThreadPool(new
BlockingTaskThreadFactory());
+ private final RunBusWorkSupplier runBusWorkSupplier = new
RunBusWorkSupplier(executorService, threadShareTaskQueue);
// key: TaskID
- private final ConcurrentMap<Long, TaskExecutionContext> executionContexts
= new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, ConcurrentMap<Long,
TaskExecutionContext>> executionContexts = new ConcurrentHashMap<>();
public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties
properties) {
this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
this.nodeEngine = nodeEngine;
this.logger =
nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
}
+ public void start() {
+ runBusWorkSupplier.runNewBusWork(false);
+ }
+
public void shutdown() {
isShutdown = true;
- blockingTaskletExecutor.shutdownNow();
+ executorService.shutdownNow();
}
- public TaskExecutionContext getExecutionContext(long taskId) {
- return executionContexts.get(taskId);
+ public ConcurrentMap<Long, TaskExecutionContext> getExecutionContext(long
taskGroupId) {
+ return executionContexts.get(taskGroupId);
}
- /**
- * Submit a TaskGroup and run the Task in it
- */
- public Map<Long, TaskExecutionContext> submitTask(
- TaskGroup taskGroup
- ) {
- Map<Long, TaskExecutionContext> contextMap = new
HashMap<>(taskGroup.getTasks().size());
- taskGroup.getTasks().forEach(task -> {
- contextMap.put(task.getTaskID(), submitTask(task));
- });
- return contextMap;
+ private void submitThreadShareTask(TaskGroupExecutionTracker
taskGroupExecutionTracker, List<Task> tasks) {
+ tasks.stream()
+ .map(t -> new TaskTracker(t, taskGroupExecutionTracker))
+ .forEach(threadShareTaskQueue::add);
}
- public TaskExecutionContext submitTask(Task task) {
- CompletableFuture<Void> cancellationFuture = new
CompletableFuture<Void>();
- TaskletTracker taskletTracker = new TaskletTracker(task,
cancellationFuture);
- taskletTracker.taskletFutures =
- blockingTaskletExecutor.submit(new BlockingWorker(taskletTracker));
+ private void submitBlockingTask(TaskGroupExecutionTracker
taskGroupExecutionTracker, List<Task> tasks) {
- TaskExecutionContext taskExecutionContext = new TaskExecutionContext(
- taskletTracker.future,
- cancellationFuture,
- this
- );
-
- executionContexts.put(task.getTaskID(), taskExecutionContext);
- return taskExecutionContext;
+ CountDownLatch startedLatch = new CountDownLatch(tasks.size());
+ taskGroupExecutionTracker.blockingFutures = tasks
+ .stream()
+ .map(t -> new BlockingWorker(new TaskTracker(t,
taskGroupExecutionTracker), startedLatch))
+ .map(executorService::submit)
+ .collect(toList());
+ // Do not return from this method until all workers have started.
Otherwise
+ // on cancellation there is a race where the executor might not have
started
+ // the worker yet. This would result in taskletDone() never being
called for
+ // a worker.
+ uncheckRun(startedLatch::await);
}
- private final class TaskletTracker {
- final NonCompletableFuture future = new NonCompletableFuture();
- final Task task;
- volatile Future<?> taskletFutures;
-
- TaskletTracker(Task task, CompletableFuture<Void> cancellationFuture) {
- this.task = task;
-
- cancellationFuture.whenComplete(withTryCatch(logger, (r, e) -> {
- if (e == null) {
- e = new IllegalStateException("cancellationFuture should
be completed exceptionally");
- }
- future.internalCompleteExceptionally(e);
- taskletFutures.cancel(true);
- }));
- }
-
- @Override
- public String toString() {
- return "Tracking " + task;
+ public CompletableFuture<TaskExecutionState> submitTaskGroup(
+ TaskGroup taskGroup,
+ CompletableFuture<Void> cancellationFuture
+ ) {
+ Collection<Task> tasks = taskGroup.getTasks();
+ final TaskGroupExecutionTracker executionTracker = new
TaskGroupExecutionTracker(cancellationFuture, taskGroup);
+ try {
+ ConcurrentMap<Long, TaskExecutionContext> taskExecutionContextMap
= new ConcurrentHashMap<>();
+ final Map<Boolean, List<Task>> byCooperation =
+ tasks.stream()
+ .peek(x -> {
+ TaskExecutionContext taskExecutionContext = new
TaskExecutionContext(x, nodeEngine);
+ x.setTaskExecutionContext(taskExecutionContext);
+ taskExecutionContextMap.put(x.getTaskID(),
taskExecutionContext);
+ })
+ .collect(partitioningBy(Task::isThreadsShare));
+ submitThreadShareTask(executionTracker, byCooperation.get(true));
+ submitBlockingTask(executionTracker, byCooperation.get(false));
+ executionContexts.put(taskGroup.getId(), taskExecutionContextMap);
+ } catch (Throwable t) {
+ executionTracker.future.complete(new
TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
}
+ return new NonCompletableFuture<>(executionTracker.future);
Review Comment:
If the user wants to cancel, he needs to use the
cancellationFuture.complete() parameter of this method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]