ic4y commented on code in PR #2366:
URL: 
https://github.com/apache/incubator-seatunnel/pull/2366#discussion_r942104611


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -18,134 +18,149 @@
 package org.apache.seatunnel.engine.server;
 
 import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import static com.hazelcast.jet.impl.util.Util.uncheckRun;
+import static java.util.Collections.emptyList;
 import static java.util.concurrent.Executors.newCachedThreadPool;
+import static java.util.stream.Collectors.partitioningBy;
+import static java.util.stream.Collectors.toList;
 
+import org.apache.seatunnel.engine.common.utils.NonCompletableFuture;
+import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.ProgressState;
 import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskCallTimer;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionContext;
+import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroup;
+import org.apache.seatunnel.engine.server.execution.TaskTracker;
 
-import com.hazelcast.jet.impl.util.NonCompletableFuture;
 import com.hazelcast.logging.ILogger;
-import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.properties.HazelcastProperties;
 import lombok.NonNull;
+import lombok.SneakyThrows;
 
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * This class is responsible for the execution of the Task
  */
 public class TaskExecutionService {
 
     private final String hzInstanceName;
-    private final NodeEngine nodeEngine;
+    private final NodeEngineImpl nodeEngine;
     private final ILogger logger;
     private volatile boolean isShutdown;
-    private final ExecutorService blockingTaskletExecutor = 
newCachedThreadPool(new BlockingTaskThreadFactory());
+    private final LinkedBlockingDeque<TaskTracker> threadShareTaskQueue = new 
LinkedBlockingDeque<>();
+    private final ExecutorService executorService = newCachedThreadPool(new 
BlockingTaskThreadFactory());
+    private final RunBusWorkSupplier runBusWorkSupplier = new 
RunBusWorkSupplier(executorService, threadShareTaskQueue);
     // key: TaskID
-    private final ConcurrentMap<Long, TaskExecutionContext> executionContexts 
= new ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, ConcurrentMap<Long, 
TaskExecutionContext>> executionContexts = new ConcurrentHashMap<>();
 
     public TaskExecutionService(NodeEngineImpl nodeEngine, HazelcastProperties 
properties) {
         this.hzInstanceName = nodeEngine.getHazelcastInstance().getName();
         this.nodeEngine = nodeEngine;
         this.logger = 
nodeEngine.getLoggingService().getLogger(TaskExecutionService.class);
     }
 
+    public void start() {
+        runBusWorkSupplier.runNewBusWork(false);
+    }
+
     public void shutdown() {
         isShutdown = true;
-        blockingTaskletExecutor.shutdownNow();
+        executorService.shutdownNow();
     }
 
-    public TaskExecutionContext getExecutionContext(long taskId) {
-        return executionContexts.get(taskId);
+    public ConcurrentMap<Long, TaskExecutionContext> getExecutionContext(long 
taskGroupId) {
+        return executionContexts.get(taskGroupId);
     }
 
-    /**
-     * Submit a TaskGroup and run the Task in it
-     */
-    public Map<Long, TaskExecutionContext> submitTask(
-        TaskGroup taskGroup
-    ) {
-        Map<Long, TaskExecutionContext> contextMap = new 
HashMap<>(taskGroup.getTasks().size());
-        taskGroup.getTasks().forEach(task -> {
-            contextMap.put(task.getTaskID(), submitTask(task));
-        });
-        return contextMap;
+    private void submitThreadShareTask(TaskGroupExecutionTracker 
taskGroupExecutionTracker, List<Task> tasks) {
+        tasks.stream()
+            .map(t -> new TaskTracker(t, taskGroupExecutionTracker))
+            .forEach(threadShareTaskQueue::add);
     }
 
-    public TaskExecutionContext submitTask(Task task) {
-        CompletableFuture<Void> cancellationFuture = new 
CompletableFuture<Void>();
-        TaskletTracker taskletTracker = new TaskletTracker(task, 
cancellationFuture);
-        taskletTracker.taskletFutures =
-            blockingTaskletExecutor.submit(new BlockingWorker(taskletTracker));
+    private void submitBlockingTask(TaskGroupExecutionTracker 
taskGroupExecutionTracker, List<Task> tasks) {
 
-        TaskExecutionContext taskExecutionContext = new TaskExecutionContext(
-            taskletTracker.future,
-            cancellationFuture,
-            this
-        );
-
-        executionContexts.put(task.getTaskID(), taskExecutionContext);
-        return taskExecutionContext;
+        CountDownLatch startedLatch = new CountDownLatch(tasks.size());
+        taskGroupExecutionTracker.blockingFutures = tasks
+            .stream()
+            .map(t -> new BlockingWorker(new TaskTracker(t, 
taskGroupExecutionTracker), startedLatch))
+            .map(executorService::submit)
+            .collect(toList());
 
+        // Do not return from this method until all workers have started. 
Otherwise
+        // on cancellation there is a race where the executor might not have 
started
+        // the worker yet. This would result in taskletDone() never being 
called for
+        // a worker.
+        uncheckRun(startedLatch::await);
     }
 
-    private final class TaskletTracker {
-        final NonCompletableFuture future = new NonCompletableFuture();
-        final Task task;
-        volatile Future<?> taskletFutures;
-
-        TaskletTracker(Task task, CompletableFuture<Void> cancellationFuture) {
-            this.task = task;
-
-            cancellationFuture.whenComplete(withTryCatch(logger, (r, e) -> {
-                if (e == null) {
-                    e = new IllegalStateException("cancellationFuture should 
be completed exceptionally");
-                }
-                future.internalCompleteExceptionally(e);
-                taskletFutures.cancel(true);
-            }));
-        }
-
-        @Override
-        public String toString() {
-            return "Tracking " + task;
+    public CompletableFuture<TaskExecutionState> submitTaskGroup(
+        TaskGroup taskGroup,
+        CompletableFuture<Void> cancellationFuture
+    ) {
+        Collection<Task> tasks = taskGroup.getTasks();
+        final TaskGroupExecutionTracker executionTracker = new 
TaskGroupExecutionTracker(cancellationFuture, taskGroup);
+        try {
+            ConcurrentMap<Long, TaskExecutionContext> taskExecutionContextMap 
= new ConcurrentHashMap<>();
+            final Map<Boolean, List<Task>> byCooperation =
+                tasks.stream()
+                    .peek(x -> {
+                        TaskExecutionContext taskExecutionContext = new 
TaskExecutionContext(x, nodeEngine);
+                        x.setTaskExecutionContext(taskExecutionContext);
+                        taskExecutionContextMap.put(x.getTaskID(), 
taskExecutionContext);
+                    })
+                    .collect(partitioningBy(Task::isThreadsShare));
+            submitThreadShareTask(executionTracker, byCooperation.get(true));
+            submitBlockingTask(executionTracker, byCooperation.get(false));
+            executionContexts.put(taskGroup.getId(), taskExecutionContextMap);
+        } catch (Throwable t) {
+            executionTracker.future.complete(new 
TaskExecutionState(taskGroup.getId(), ExecutionState.FAILED, t));
         }
+        return new NonCompletableFuture<>(executionTracker.future);

Review Comment:
   If the user wants to cancel, he needs to use the 
cancellationFuture.complete() parameter of this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to