This is an automated email from the ASF dual-hosted git repository.

wuzhiguo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bigtop-manager.git


The following commit(s) were added to refs/heads/main by this push:
     new d053a9d  BIGTOP-4152: Add task log gRPC service to read agent logs 
(#14)
d053a9d is described below

commit d053a9d98d2002764685629940b1631a5053fda7
Author: Zhiguo Wu <[email protected]>
AuthorDate: Mon Jul 8 09:48:43 2024 +0800

    BIGTOP-4152: Add task log gRPC service to read agent logs (#14)
---
 .../apache/bigtop/manager/agent/cache/Caches.java  |  18 +---
 .../agent/executor/AbstractCommandExecutor.java    |   1 +
 .../agent/executor/HostCheckCommandExecutor.java   |   1 +
 .../agent/service/CommandServiceGrpcImpl.java      |   3 +
 .../agent/service/TaskLogServiceGrpcImpl.java      | 103 +++++++++++++++++++++
 .../src/main/resources/proto/command.proto         |   1 -
 .../src/main/resources/proto/task_log.proto        |  25 ++---
 .../command/stage/runner/AbstractStageRunner.java  |   8 --
 .../manager/server/controller/SseController.java   |  14 ++-
 .../bigtop/manager/server/grpc/GrpcClient.java     |   2 +-
 .../server/scheduler/HostInfoScheduler.java        |   4 -
 ...{CommandLogService.java => TaskLogService.java} |  10 +-
 .../server/service/impl/CommandLogServiceImpl.java |  91 ------------------
 .../server/service/impl/TaskLogServiceImpl.java    |  81 ++++++++++++++++
 .../src/main/resources/ddl/DaMeng-DDL-CREATE.sql   |  21 -----
 .../stack/nop/v1_0_0/kafka/KafkaBrokerScript.java  |  12 +--
 .../v1_0_0/zookeeper/ZookeeperClientScript.java    |   4 +-
 .../v1_0_0/zookeeper/ZookeeperServerScript.java    |  10 +-
 bigtop-manager-ui/src/components/job-info/job.vue  |   1 +
 19 files changed, 229 insertions(+), 181 deletions(-)

diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
 
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/cache/Caches.java
similarity index 67%
copy from 
bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
copy to 
bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/cache/Caches.java
index e0bcdfd..568a057 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
+++ 
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/cache/Caches.java
@@ -16,19 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bigtop.manager.server.service;
+package org.apache.bigtop.manager.agent.cache;
 
-import reactor.core.publisher.FluxSink;
+import java.util.ArrayList;
+import java.util.List;
 
-public interface CommandLogService {
-
-    void registerSink(Long taskId, FluxSink<String> sink);
-
-    void unregisterSink(Long taskId);
-
-    void onLogStarted(Long taskId, String hostname);
-
-    void onLogReceived(Long taskId, String hostname, String log);
-
-    void onLogEnded(Long taskId, String hostname);
+public class Caches {
+    public static final List<Long> RUNNING_TASKS = new ArrayList<>();
 }
diff --git 
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/AbstractCommandExecutor.java
 
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/AbstractCommandExecutor.java
index 8a44bb4..fda8b75 100644
--- 
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/AbstractCommandExecutor.java
+++ 
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/AbstractCommandExecutor.java
@@ -60,6 +60,7 @@ public abstract class AbstractCommandExecutor implements 
CommandExecutor {
     }
 
     protected void doExecuteOnDevMode() {
+        log.info("Running command on dev mode");
         commandReplyBuilder.setCode(MessageConstants.SUCCESS_CODE);
         commandReplyBuilder.setResult(ShellResult.success().getResult());
     }
diff --git 
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/HostCheckCommandExecutor.java
 
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/HostCheckCommandExecutor.java
index 82eb1e9..90b2f43 100644
--- 
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/HostCheckCommandExecutor.java
+++ 
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/HostCheckCommandExecutor.java
@@ -44,6 +44,7 @@ public class HostCheckCommandExecutor extends 
AbstractCommandExecutor {
 
     @Override
     public void doExecute() {
+        log.info("[agent executeTask] taskEvent is: {}", commandRequest);
         ShellResult shellResult = runChecks(List.of(this::checkTimeSync));
         commandReplyBuilder.setCode(shellResult.getExitCode());
         commandReplyBuilder.setResult(shellResult.getResult());
diff --git 
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/CommandServiceGrpcImpl.java
 
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/CommandServiceGrpcImpl.java
index 42591ba..ed1f829 100644
--- 
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/CommandServiceGrpcImpl.java
+++ 
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/CommandServiceGrpcImpl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bigtop.manager.agent.service;
 
+import org.apache.bigtop.manager.agent.cache.Caches;
 import org.apache.bigtop.manager.agent.executor.CommandExecutor;
 import org.apache.bigtop.manager.agent.executor.CommandExecutors;
 import org.apache.bigtop.manager.grpc.generated.CommandReply;
@@ -39,6 +40,7 @@ public class CommandServiceGrpcImpl extends 
CommandServiceGrpc.CommandServiceImp
     public void exec(CommandRequest request, StreamObserver<CommandReply> 
responseObserver) {
         try {
             MDC.put("taskId", String.valueOf(request.getTaskId()));
+            Caches.RUNNING_TASKS.add(request.getTaskId());
             CommandExecutor commandExecutor = 
CommandExecutors.getCommandExecutor(request.getType());
             CommandReply reply = commandExecutor.execute(request);
             responseObserver.onNext(reply);
@@ -48,6 +50,7 @@ public class CommandServiceGrpcImpl extends 
CommandServiceGrpc.CommandServiceImp
             Status status = Status.UNKNOWN.withDescription(e.getMessage());
             responseObserver.onError(status.asRuntimeException());
         } finally {
+            Caches.RUNNING_TASKS.remove(request.getTaskId());
             MDC.clear();
         }
     }
diff --git 
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/TaskLogServiceGrpcImpl.java
 
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/TaskLogServiceGrpcImpl.java
new file mode 100644
index 0000000..4585527
--- /dev/null
+++ 
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/TaskLogServiceGrpcImpl.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bigtop.manager.agent.service;
+
+import org.apache.bigtop.manager.agent.cache.Caches;
+import org.apache.bigtop.manager.grpc.generated.TaskLogReply;
+import org.apache.bigtop.manager.grpc.generated.TaskLogRequest;
+import org.apache.bigtop.manager.grpc.generated.TaskLogServiceGrpc;
+
+import org.apache.commons.lang3.SystemUtils;
+
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import net.devh.boot.grpc.server.service.GrpcService;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+
+@Slf4j
+@GrpcService
+public class TaskLogServiceGrpcImpl extends 
TaskLogServiceGrpc.TaskLogServiceImplBase {
+
+    @Override
+    public void getLog(TaskLogRequest request, StreamObserver<TaskLogReply> 
responseObserver) {
+        String path = getLogFilePath(request.getTaskId());
+        try (RandomAccessFile file = new RandomAccessFile(path, "r")) {
+            // Read from beginning
+            long fileLength = file.length();
+            while (file.getFilePointer() < fileLength) {
+                String line = file.readLine();
+                if (line != null) {
+                    responseObserver.onNext(
+                            TaskLogReply.newBuilder().setText(line).build());
+                }
+            }
+
+            // Waiting for new logs
+            boolean isTaskRunning = true;
+            while (isTaskRunning) {
+                isTaskRunning = 
Caches.RUNNING_TASKS.contains(request.getTaskId());
+                readNewLogs(file, responseObserver);
+                Thread.sleep(1000);
+            }
+        } catch (Exception e) {
+            String errMsg = "Error when reading task log: " + e.getMessage() + 
", please fix it";
+            
responseObserver.onNext(TaskLogReply.newBuilder().setText(errMsg).build());
+
+            log.error("Error reading task log", e);
+            Status status = Status.UNKNOWN.withDescription(e.getMessage());
+            responseObserver.onError(status.asRuntimeException());
+        }
+    }
+
+    private void readNewLogs(RandomAccessFile file, 
StreamObserver<TaskLogReply> responseObserver) throws Exception {
+        long position = file.getFilePointer();
+        if (position < file.length()) {
+            // Read new logs
+            file.seek(position);
+            if (file.readByte() != '\n') {
+                file.seek(position);
+            }
+
+            String line = file.readLine();
+            while (line != null) {
+                
responseObserver.onNext(TaskLogReply.newBuilder().setText(line).build());
+                line = file.readLine();
+            }
+        }
+    }
+
+    private String getLogFilePath(Long taskId) {
+        String baseDir;
+        if (SystemUtils.IS_OS_WINDOWS) {
+            baseDir = SystemUtils.getUserDir().getPath();
+        } else {
+            File file = new File(this.getClass()
+                    .getProtectionDomain()
+                    .getCodeSource()
+                    .getLocation()
+                    .getPath());
+            baseDir = file.getParentFile().getParentFile().getPath();
+        }
+
+        return baseDir + File.separator + "tasklogs" + File.separator + 
"task-" + taskId + ".log";
+    }
+}
diff --git a/bigtop-manager-grpc/src/main/resources/proto/command.proto 
b/bigtop-manager-grpc/src/main/resources/proto/command.proto
index c86c571..e2c09a4 100644
--- a/bigtop-manager-grpc/src/main/resources/proto/command.proto
+++ b/bigtop-manager-grpc/src/main/resources/proto/command.proto
@@ -30,7 +30,6 @@ enum CommandType {
   COMPONENT = 0;
   HOST_CHECK = 1;
   CACHE_DISTRIBUTE = 2;
-  TASK_LOG = 3;
 }
 
 message CommandRequest {
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
 b/bigtop-manager-grpc/src/main/resources/proto/task_log.proto
similarity index 67%
copy from 
bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
copy to bigtop-manager-grpc/src/main/resources/proto/task_log.proto
index e0bcdfd..af7bff4 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
+++ b/bigtop-manager-grpc/src/main/resources/proto/task_log.proto
@@ -16,19 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.bigtop.manager.server.service;
+syntax = "proto3";
 
-import reactor.core.publisher.FluxSink;
+option java_multiple_files = true;
+option java_package = "org.apache.bigtop.manager.grpc.generated";
+option java_outer_classname = "TaskLogProto";
 
-public interface CommandLogService {
-
-    void registerSink(Long taskId, FluxSink<String> sink);
-
-    void unregisterSink(Long taskId);
-
-    void onLogStarted(Long taskId, String hostname);
-
-    void onLogReceived(Long taskId, String hostname, String log);
+service TaskLogService {
+  rpc GetLog (TaskLogRequest) returns (stream TaskLogReply) {}
+}
 
-    void onLogEnded(Long taskId, String hostname);
+message TaskLogRequest {
+  int64 task_id = 1;
 }
+
+message TaskLogReply {
+  string text = 1;
+}
\ No newline at end of file
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/runner/AbstractStageRunner.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/runner/AbstractStageRunner.java
index f508fdd..30145da 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/runner/AbstractStageRunner.java
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/runner/AbstractStageRunner.java
@@ -30,7 +30,6 @@ import 
org.apache.bigtop.manager.grpc.generated.CommandServiceGrpc;
 import org.apache.bigtop.manager.grpc.utils.ProtobufUtil;
 import org.apache.bigtop.manager.server.command.stage.factory.StageContext;
 import org.apache.bigtop.manager.server.grpc.GrpcClient;
-import org.apache.bigtop.manager.server.service.CommandLogService;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -42,9 +41,6 @@ import java.util.concurrent.CompletableFuture;
 @Slf4j
 public abstract class AbstractStageRunner implements StageRunner {
 
-    @Resource
-    private CommandLogService commandLogService;
-
     @Resource
     protected StageRepository stageRepository;
 
@@ -81,7 +77,6 @@ public abstract class AbstractStageRunner implements 
StageRunner {
             CommandRequest request = builder.build();
 
             futures.add(CompletableFuture.supplyAsync(() -> {
-                commandLogService.onLogStarted(task.getId(), 
task.getHostname());
                 CommandServiceGrpc.CommandServiceBlockingStub stub = 
GrpcClient.getBlockingStub(
                         task.getHostname(), 
CommandServiceGrpc.CommandServiceBlockingStub.class);
                 CommandReply reply = stub.exec(request);
@@ -90,14 +85,11 @@ public abstract class AbstractStageRunner implements 
StageRunner {
                 boolean taskSuccess = reply != null && reply.getCode() == 
MessageConstants.SUCCESS_CODE;
 
                 if (taskSuccess) {
-                    commandLogService.onLogReceived(task.getId(), 
task.getHostname(), "Success!");
                     onTaskSuccess(task);
                 } else {
-                    commandLogService.onLogReceived(task.getId(), 
task.getHostname(), "Failed!");
                     onTaskFailure(task);
                 }
 
-                commandLogService.onLogEnded(task.getId(), task.getHostname());
                 return taskSuccess;
             }));
         }
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/SseController.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/SseController.java
index 9ffccef..7ed8642 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/SseController.java
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/SseController.java
@@ -18,7 +18,7 @@
  */
 package org.apache.bigtop.manager.server.controller;
 
-import org.apache.bigtop.manager.server.service.CommandLogService;
+import org.apache.bigtop.manager.server.service.TaskLogService;
 
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -32,7 +32,6 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.FluxSink;
 
 import jakarta.annotation.Resource;
-import java.io.IOException;
 
 @Tag(name = "Sse Controller")
 @RestController
@@ -40,21 +39,21 @@ import java.io.IOException;
 public class SseController {
 
     @Resource
-    private CommandLogService commandLogService;
+    private TaskLogService taskLogService;
 
     @Operation(summary = "get task log", description = "Get a task log")
     @GetMapping("/tasks/{id}/log")
     public SseEmitter log(@PathVariable Long id, @PathVariable Long clusterId) 
{
-        SseEmitter emitter = new SseEmitter();
+        // Default timeout to 5 minutes
+        SseEmitter emitter = new SseEmitter(5 * 60 * 1000L);
 
         Flux<String> flux =
-                Flux.create(sink -> commandLogService.registerSink(id, sink), 
FluxSink.OverflowStrategy.BUFFER);
+                Flux.create(sink -> taskLogService.registerSink(id, sink), 
FluxSink.OverflowStrategy.BUFFER);
         flux.subscribe(
                 s -> {
                     try {
                         emitter.send(s);
-                    } catch (IOException e) {
-                        commandLogService.unregisterSink(id);
+                    } catch (Exception e) {
                         emitter.completeWithError(e);
                     }
                 },
@@ -62,7 +61,6 @@ public class SseController {
                 emitter::complete);
 
         emitter.onTimeout(emitter::complete);
-        emitter.onCompletion(() -> commandLogService.unregisterSink(id));
         return emitter;
     }
 }
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java
index 6142245..f759e16 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java
@@ -122,7 +122,7 @@ public class GrpcClient {
         if (isChannelAlive(host)) {
             return CHANNELS.get(host);
         } else {
-            throw new ApiException(ApiExceptionEnum.HOST_NOT_CONNECTED, host);
+            return createChannel(host);
         }
     }
 
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java
index 81705ab..9944246 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java
@@ -54,10 +54,6 @@ public class HostInfoScheduler {
     private void getHostInfo(Host host) {
         String hostname = host.getHostname();
         try {
-            if (!GrpcClient.isChannelAlive(hostname)) {
-                GrpcClient.createChannel(hostname);
-            }
-
             HostInfoServiceGrpc.HostInfoServiceBlockingStub stub =
                     GrpcClient.getBlockingStub(hostname, 
HostInfoServiceGrpc.HostInfoServiceBlockingStub.class);
             HostInfoReply reply = 
stub.getHostInfo(HostInfoRequest.newBuilder().build());
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/TaskLogService.java
similarity index 79%
rename from 
bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
rename to 
bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/TaskLogService.java
index e0bcdfd..cc1bee7 100644
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/TaskLogService.java
@@ -20,15 +20,7 @@ package org.apache.bigtop.manager.server.service;
 
 import reactor.core.publisher.FluxSink;
 
-public interface CommandLogService {
+public interface TaskLogService {
 
     void registerSink(Long taskId, FluxSink<String> sink);
-
-    void unregisterSink(Long taskId);
-
-    void onLogStarted(Long taskId, String hostname);
-
-    void onLogReceived(Long taskId, String hostname, String log);
-
-    void onLogEnded(Long taskId, String hostname);
 }
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/CommandLogServiceImpl.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/CommandLogServiceImpl.java
deleted file mode 100644
index 945eb47..0000000
--- 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/CommandLogServiceImpl.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.bigtop.manager.server.service.impl;
-
-import org.apache.bigtop.manager.server.service.CommandLogService;
-
-import org.apache.commons.collections4.CollectionUtils;
-
-import org.springframework.stereotype.Service;
-
-import reactor.core.publisher.FluxSink;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@Service
-public class CommandLogServiceImpl implements CommandLogService {
-
-    private final Map<Long, FluxSink<String>> taskSinks = new HashMap<>();
-
-    private final Map<Long, List<String>> logs = new HashMap<>();
-
-    public void registerSink(Long taskId, FluxSink<String> sink) {
-        List<String> list = logs.get(taskId);
-        if (CollectionUtils.isNotEmpty(list)) {
-            synchronized (list) {
-                taskSinks.put(taskId, sink);
-                for (String log : list) {
-                    sink.next(log);
-                }
-            }
-        } else {
-            // Task already completed, get logs from database
-            sink.next("Task finished, please check the log details on agent 
machine.");
-            sink.complete();
-        }
-    }
-
-    public void unregisterSink(Long taskId) {
-        taskSinks.remove(taskId);
-    }
-
-    @Override
-    public void onLogStarted(Long taskId, String hostname) {
-        logs.put(taskId, new ArrayList<>());
-    }
-
-    @Override
-    public void onLogReceived(Long taskId, String hostname, String log) {
-        List<String> list = logs.get(taskId);
-
-        synchronized (list) {
-            list.add(log);
-            FluxSink<String> sink = taskSinks.get(taskId);
-            if (sink != null) {
-                sink.next(log);
-            }
-        }
-    }
-
-    @Override
-    public void onLogEnded(Long taskId, String hostname) {
-        List<String> list = logs.get(taskId);
-        synchronized (list) {
-            FluxSink<String> sink = taskSinks.remove(taskId);
-            if (sink != null) {
-                sink.complete();
-            }
-        }
-
-        logs.remove(taskId);
-    }
-}
diff --git 
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/TaskLogServiceImpl.java
 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/TaskLogServiceImpl.java
new file mode 100644
index 0000000..7d91360
--- /dev/null
+++ 
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/TaskLogServiceImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bigtop.manager.server.service.impl;
+
+import org.apache.bigtop.manager.common.enums.JobState;
+import org.apache.bigtop.manager.dao.entity.Task;
+import org.apache.bigtop.manager.dao.repository.TaskRepository;
+import org.apache.bigtop.manager.grpc.generated.TaskLogReply;
+import org.apache.bigtop.manager.grpc.generated.TaskLogRequest;
+import org.apache.bigtop.manager.grpc.generated.TaskLogServiceGrpc;
+import org.apache.bigtop.manager.server.grpc.GrpcClient;
+import org.apache.bigtop.manager.server.service.TaskLogService;
+
+import org.springframework.stereotype.Service;
+
+import io.grpc.stub.StreamObserver;
+import reactor.core.publisher.FluxSink;
+
+import jakarta.annotation.Resource;
+
+@Service
+public class TaskLogServiceImpl implements TaskLogService {
+
+    @Resource
+    private TaskRepository taskRepository;
+
+    public void registerSink(Long taskId, FluxSink<String> sink) {
+        Task task = taskRepository.getReferenceById(taskId);
+        String hostname = task.getHostname();
+
+        if (task.getState() == JobState.PENDING || task.getState() == 
JobState.CANCELED) {
+            new Thread(() -> {
+                        sink.next("There is no log when task is in status: "
+                                + task.getState().name().toLowerCase()
+                                + ", please reopen the window when status 
changed");
+                        sink.complete();
+                    })
+                    .start();
+        } else {
+            TaskLogServiceGrpc.TaskLogServiceStub asyncStub =
+                    GrpcClient.getAsyncStub(hostname, 
TaskLogServiceGrpc.TaskLogServiceStub.class);
+            TaskLogRequest request =
+                    TaskLogRequest.newBuilder().setTaskId(taskId).build();
+            asyncStub.getLog(request, new LogReader(sink));
+        }
+    }
+
+    private record LogReader(FluxSink<String> sink) implements 
StreamObserver<TaskLogReply> {
+
+        @Override
+        public void onNext(TaskLogReply reply) {
+            sink.next(reply.getText());
+        }
+
+        @Override
+        public void onError(Throwable t) {
+            sink.error(t);
+        }
+
+        @Override
+        public void onCompleted() {
+            sink.complete();
+        }
+    }
+}
diff --git a/bigtop-manager-server/src/main/resources/ddl/DaMeng-DDL-CREATE.sql 
b/bigtop-manager-server/src/main/resources/ddl/DaMeng-DDL-CREATE.sql
index d86816e..f2eabdd 100644
--- a/bigtop-manager-server/src/main/resources/ddl/DaMeng-DDL-CREATE.sql
+++ b/bigtop-manager-server/src/main/resources/ddl/DaMeng-DDL-CREATE.sql
@@ -62,27 +62,6 @@ CREATE UNIQUE INDEX INDEX33555701 ON 
"bigtop_manager"."cluster" ("cluster_name")
 CREATE INDEX "idx_cluster_stack_id" ON "bigtop_manager"."cluster" ("stack_id");
 
 
--- "bigtop_manager"."command_log" definition
-
-CREATE TABLE "bigtop_manager"."command_log" (
-       "id" BIGINT NOT NULL,
-       "create_by" BIGINT NULL,
-       "create_time" DATETIME NULL,
-       "update_by" BIGINT NULL,
-       "update_time" DATETIME NULL,
-       "hostname" VARCHAR(255) NULL,
-       "result" CLOB NULL,
-       "job_id" BIGINT NULL,
-       "stage_id" BIGINT NULL,
-       "task_id" BIGINT NULL,
-       CONSTRAINT CONS134218865 PRIMARY KEY ("id")
-);
-CREATE UNIQUE INDEX INDEX33555669 ON "bigtop_manager"."command_log" ("id");
-CREATE INDEX "idx_cl_job_id" ON "bigtop_manager"."command_log" ("job_id");
-CREATE INDEX "idx_cl_stage_id" ON "bigtop_manager"."command_log" ("stage_id");
-CREATE INDEX "idx_cl_task_id" ON "bigtop_manager"."command_log" ("task_id");
-
-
 -- "bigtop_manager"."component" definition
 
 CREATE TABLE "bigtop_manager"."component" (
diff --git 
a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/kafka/KafkaBrokerScript.java
 
b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/kafka/KafkaBrokerScript.java
index fc173c3..dbb18eb 100644
--- 
a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/kafka/KafkaBrokerScript.java
+++ 
b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/kafka/KafkaBrokerScript.java
@@ -31,36 +31,36 @@ public class KafkaBrokerScript implements Script {
 
     @Override
     public ShellResult install(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 
     @Override
     public ShellResult configure(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 
     @Override
     public ShellResult start(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 
     @Override
     public ShellResult stop(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 
     @Override
     public ShellResult status(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 
     public ShellResult test(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 }
diff --git 
a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperClientScript.java
 
b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperClientScript.java
index 08e3bf0..fced816 100644
--- 
a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperClientScript.java
+++ 
b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperClientScript.java
@@ -32,13 +32,13 @@ public class ZookeeperClientScript implements ClientScript {
 
     @Override
     public ShellResult install(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 
     @Override
     public ShellResult configure(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 }
diff --git 
a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperServerScript.java
 
b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperServerScript.java
index 48d75da..2487150 100644
--- 
a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperServerScript.java
+++ 
b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperServerScript.java
@@ -31,31 +31,31 @@ public class ZookeeperServerScript implements Script {
 
     @Override
     public ShellResult install(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 
     @Override
     public ShellResult configure(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 
     @Override
     public ShellResult start(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 
     @Override
     public ShellResult stop(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 
     @Override
     public ShellResult status(Params params) {
-        log.info("Default to success in dev mode");
+        log.info("Default to success on dev mode");
         return ShellResult.success();
     }
 }
diff --git a/bigtop-manager-ui/src/components/job-info/job.vue 
b/bigtop-manager-ui/src/components/job-info/job.vue
index 7abb2b7..9ca8568 100644
--- a/bigtop-manager-ui/src/components/job-info/job.vue
+++ b/bigtop-manager-ui/src/components/job-info/job.vue
@@ -223,6 +223,7 @@
   const clickTask = (record: TaskVO) => {
     breadcrumbs.value.push(record)
     currTaskInfo.value = record
+    logTextOrigin.value = ''
     getLogsInfo(record.id)
   }
 

Reply via email to