This is an automated email from the ASF dual-hosted git repository.
wuzhiguo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bigtop-manager.git
The following commit(s) were added to refs/heads/main by this push:
new d053a9d BIGTOP-4152: Add task log gRPC service to read agent logs
(#14)
d053a9d is described below
commit d053a9d98d2002764685629940b1631a5053fda7
Author: Zhiguo Wu <[email protected]>
AuthorDate: Mon Jul 8 09:48:43 2024 +0800
BIGTOP-4152: Add task log gRPC service to read agent logs (#14)
---
.../apache/bigtop/manager/agent/cache/Caches.java | 18 +---
.../agent/executor/AbstractCommandExecutor.java | 1 +
.../agent/executor/HostCheckCommandExecutor.java | 1 +
.../agent/service/CommandServiceGrpcImpl.java | 3 +
.../agent/service/TaskLogServiceGrpcImpl.java | 103 +++++++++++++++++++++
.../src/main/resources/proto/command.proto | 1 -
.../src/main/resources/proto/task_log.proto | 25 ++---
.../command/stage/runner/AbstractStageRunner.java | 8 --
.../manager/server/controller/SseController.java | 14 ++-
.../bigtop/manager/server/grpc/GrpcClient.java | 2 +-
.../server/scheduler/HostInfoScheduler.java | 4 -
...{CommandLogService.java => TaskLogService.java} | 10 +-
.../server/service/impl/CommandLogServiceImpl.java | 91 ------------------
.../server/service/impl/TaskLogServiceImpl.java | 81 ++++++++++++++++
.../src/main/resources/ddl/DaMeng-DDL-CREATE.sql | 21 -----
.../stack/nop/v1_0_0/kafka/KafkaBrokerScript.java | 12 +--
.../v1_0_0/zookeeper/ZookeeperClientScript.java | 4 +-
.../v1_0_0/zookeeper/ZookeeperServerScript.java | 10 +-
bigtop-manager-ui/src/components/job-info/job.vue | 1 +
19 files changed, 229 insertions(+), 181 deletions(-)
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/cache/Caches.java
similarity index 67%
copy from
bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
copy to
bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/cache/Caches.java
index e0bcdfd..568a057 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
+++
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/cache/Caches.java
@@ -16,19 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.bigtop.manager.server.service;
+package org.apache.bigtop.manager.agent.cache;
-import reactor.core.publisher.FluxSink;
+import java.util.ArrayList;
+import java.util.List;
-public interface CommandLogService {
-
- void registerSink(Long taskId, FluxSink<String> sink);
-
- void unregisterSink(Long taskId);
-
- void onLogStarted(Long taskId, String hostname);
-
- void onLogReceived(Long taskId, String hostname, String log);
-
- void onLogEnded(Long taskId, String hostname);
+public class Caches {
+ public static final List<Long> RUNNING_TASKS = new ArrayList<>();
}
diff --git
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/AbstractCommandExecutor.java
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/AbstractCommandExecutor.java
index 8a44bb4..fda8b75 100644
---
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/AbstractCommandExecutor.java
+++
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/AbstractCommandExecutor.java
@@ -60,6 +60,7 @@ public abstract class AbstractCommandExecutor implements
CommandExecutor {
}
protected void doExecuteOnDevMode() {
+ log.info("Running command on dev mode");
commandReplyBuilder.setCode(MessageConstants.SUCCESS_CODE);
commandReplyBuilder.setResult(ShellResult.success().getResult());
}
diff --git
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/HostCheckCommandExecutor.java
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/HostCheckCommandExecutor.java
index 82eb1e9..90b2f43 100644
---
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/HostCheckCommandExecutor.java
+++
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/executor/HostCheckCommandExecutor.java
@@ -44,6 +44,7 @@ public class HostCheckCommandExecutor extends
AbstractCommandExecutor {
@Override
public void doExecute() {
+ log.info("[agent executeTask] taskEvent is: {}", commandRequest);
ShellResult shellResult = runChecks(List.of(this::checkTimeSync));
commandReplyBuilder.setCode(shellResult.getExitCode());
commandReplyBuilder.setResult(shellResult.getResult());
diff --git
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/CommandServiceGrpcImpl.java
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/CommandServiceGrpcImpl.java
index 42591ba..ed1f829 100644
---
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/CommandServiceGrpcImpl.java
+++
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/CommandServiceGrpcImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.bigtop.manager.agent.service;
+import org.apache.bigtop.manager.agent.cache.Caches;
import org.apache.bigtop.manager.agent.executor.CommandExecutor;
import org.apache.bigtop.manager.agent.executor.CommandExecutors;
import org.apache.bigtop.manager.grpc.generated.CommandReply;
@@ -39,6 +40,7 @@ public class CommandServiceGrpcImpl extends
CommandServiceGrpc.CommandServiceImp
public void exec(CommandRequest request, StreamObserver<CommandReply>
responseObserver) {
try {
MDC.put("taskId", String.valueOf(request.getTaskId()));
+ Caches.RUNNING_TASKS.add(request.getTaskId());
CommandExecutor commandExecutor =
CommandExecutors.getCommandExecutor(request.getType());
CommandReply reply = commandExecutor.execute(request);
responseObserver.onNext(reply);
@@ -48,6 +50,7 @@ public class CommandServiceGrpcImpl extends
CommandServiceGrpc.CommandServiceImp
Status status = Status.UNKNOWN.withDescription(e.getMessage());
responseObserver.onError(status.asRuntimeException());
} finally {
+ Caches.RUNNING_TASKS.remove(request.getTaskId());
MDC.clear();
}
}
diff --git
a/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/TaskLogServiceGrpcImpl.java
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/TaskLogServiceGrpcImpl.java
new file mode 100644
index 0000000..4585527
--- /dev/null
+++
b/bigtop-manager-agent/src/main/java/org/apache/bigtop/manager/agent/service/TaskLogServiceGrpcImpl.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bigtop.manager.agent.service;
+
+import org.apache.bigtop.manager.agent.cache.Caches;
+import org.apache.bigtop.manager.grpc.generated.TaskLogReply;
+import org.apache.bigtop.manager.grpc.generated.TaskLogRequest;
+import org.apache.bigtop.manager.grpc.generated.TaskLogServiceGrpc;
+
+import org.apache.commons.lang3.SystemUtils;
+
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import net.devh.boot.grpc.server.service.GrpcService;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+
+@Slf4j
+@GrpcService
+public class TaskLogServiceGrpcImpl extends
TaskLogServiceGrpc.TaskLogServiceImplBase {
+
+ @Override
+ public void getLog(TaskLogRequest request, StreamObserver<TaskLogReply>
responseObserver) {
+ String path = getLogFilePath(request.getTaskId());
+ try (RandomAccessFile file = new RandomAccessFile(path, "r")) {
+ // Read from beginning
+ long fileLength = file.length();
+ while (file.getFilePointer() < fileLength) {
+ String line = file.readLine();
+ if (line != null) {
+ responseObserver.onNext(
+ TaskLogReply.newBuilder().setText(line).build());
+ }
+ }
+
+ // Waiting for new logs
+ boolean isTaskRunning = true;
+ while (isTaskRunning) {
+ isTaskRunning =
Caches.RUNNING_TASKS.contains(request.getTaskId());
+ readNewLogs(file, responseObserver);
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ String errMsg = "Error when reading task log: " + e.getMessage() +
", please fix it";
+
responseObserver.onNext(TaskLogReply.newBuilder().setText(errMsg).build());
+
+ log.error("Error reading task log", e);
+ Status status = Status.UNKNOWN.withDescription(e.getMessage());
+ responseObserver.onError(status.asRuntimeException());
+ }
+ }
+
+ private void readNewLogs(RandomAccessFile file,
StreamObserver<TaskLogReply> responseObserver) throws Exception {
+ long position = file.getFilePointer();
+ if (position < file.length()) {
+ // Read new logs
+ file.seek(position);
+ if (file.readByte() != '\n') {
+ file.seek(position);
+ }
+
+ String line = file.readLine();
+ while (line != null) {
+
responseObserver.onNext(TaskLogReply.newBuilder().setText(line).build());
+ line = file.readLine();
+ }
+ }
+ }
+
+ private String getLogFilePath(Long taskId) {
+ String baseDir;
+ if (SystemUtils.IS_OS_WINDOWS) {
+ baseDir = SystemUtils.getUserDir().getPath();
+ } else {
+ File file = new File(this.getClass()
+ .getProtectionDomain()
+ .getCodeSource()
+ .getLocation()
+ .getPath());
+ baseDir = file.getParentFile().getParentFile().getPath();
+ }
+
+ return baseDir + File.separator + "tasklogs" + File.separator +
"task-" + taskId + ".log";
+ }
+}
diff --git a/bigtop-manager-grpc/src/main/resources/proto/command.proto
b/bigtop-manager-grpc/src/main/resources/proto/command.proto
index c86c571..e2c09a4 100644
--- a/bigtop-manager-grpc/src/main/resources/proto/command.proto
+++ b/bigtop-manager-grpc/src/main/resources/proto/command.proto
@@ -30,7 +30,6 @@ enum CommandType {
COMPONENT = 0;
HOST_CHECK = 1;
CACHE_DISTRIBUTE = 2;
- TASK_LOG = 3;
}
message CommandRequest {
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
b/bigtop-manager-grpc/src/main/resources/proto/task_log.proto
similarity index 67%
copy from
bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
copy to bigtop-manager-grpc/src/main/resources/proto/task_log.proto
index e0bcdfd..af7bff4 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
+++ b/bigtop-manager-grpc/src/main/resources/proto/task_log.proto
@@ -16,19 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.bigtop.manager.server.service;
+syntax = "proto3";
-import reactor.core.publisher.FluxSink;
+option java_multiple_files = true;
+option java_package = "org.apache.bigtop.manager.grpc.generated";
+option java_outer_classname = "TaskLogProto";
-public interface CommandLogService {
-
- void registerSink(Long taskId, FluxSink<String> sink);
-
- void unregisterSink(Long taskId);
-
- void onLogStarted(Long taskId, String hostname);
-
- void onLogReceived(Long taskId, String hostname, String log);
+service TaskLogService {
+ rpc GetLog (TaskLogRequest) returns (stream TaskLogReply) {}
+}
- void onLogEnded(Long taskId, String hostname);
+message TaskLogRequest {
+ int64 task_id = 1;
}
+
+message TaskLogReply {
+ string text = 1;
+}
\ No newline at end of file
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/runner/AbstractStageRunner.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/runner/AbstractStageRunner.java
index f508fdd..30145da 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/runner/AbstractStageRunner.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/stage/runner/AbstractStageRunner.java
@@ -30,7 +30,6 @@ import
org.apache.bigtop.manager.grpc.generated.CommandServiceGrpc;
import org.apache.bigtop.manager.grpc.utils.ProtobufUtil;
import org.apache.bigtop.manager.server.command.stage.factory.StageContext;
import org.apache.bigtop.manager.server.grpc.GrpcClient;
-import org.apache.bigtop.manager.server.service.CommandLogService;
import lombok.extern.slf4j.Slf4j;
@@ -42,9 +41,6 @@ import java.util.concurrent.CompletableFuture;
@Slf4j
public abstract class AbstractStageRunner implements StageRunner {
- @Resource
- private CommandLogService commandLogService;
-
@Resource
protected StageRepository stageRepository;
@@ -81,7 +77,6 @@ public abstract class AbstractStageRunner implements
StageRunner {
CommandRequest request = builder.build();
futures.add(CompletableFuture.supplyAsync(() -> {
- commandLogService.onLogStarted(task.getId(),
task.getHostname());
CommandServiceGrpc.CommandServiceBlockingStub stub =
GrpcClient.getBlockingStub(
task.getHostname(),
CommandServiceGrpc.CommandServiceBlockingStub.class);
CommandReply reply = stub.exec(request);
@@ -90,14 +85,11 @@ public abstract class AbstractStageRunner implements
StageRunner {
boolean taskSuccess = reply != null && reply.getCode() ==
MessageConstants.SUCCESS_CODE;
if (taskSuccess) {
- commandLogService.onLogReceived(task.getId(),
task.getHostname(), "Success!");
onTaskSuccess(task);
} else {
- commandLogService.onLogReceived(task.getId(),
task.getHostname(), "Failed!");
onTaskFailure(task);
}
- commandLogService.onLogEnded(task.getId(), task.getHostname());
return taskSuccess;
}));
}
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/SseController.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/SseController.java
index 9ffccef..7ed8642 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/SseController.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/controller/SseController.java
@@ -18,7 +18,7 @@
*/
package org.apache.bigtop.manager.server.controller;
-import org.apache.bigtop.manager.server.service.CommandLogService;
+import org.apache.bigtop.manager.server.service.TaskLogService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
@@ -32,7 +32,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import jakarta.annotation.Resource;
-import java.io.IOException;
@Tag(name = "Sse Controller")
@RestController
@@ -40,21 +39,21 @@ import java.io.IOException;
public class SseController {
@Resource
- private CommandLogService commandLogService;
+ private TaskLogService taskLogService;
@Operation(summary = "get task log", description = "Get a task log")
@GetMapping("/tasks/{id}/log")
public SseEmitter log(@PathVariable Long id, @PathVariable Long clusterId)
{
- SseEmitter emitter = new SseEmitter();
+ // Default timeout to 5 minutes
+ SseEmitter emitter = new SseEmitter(5 * 60 * 1000L);
Flux<String> flux =
- Flux.create(sink -> commandLogService.registerSink(id, sink),
FluxSink.OverflowStrategy.BUFFER);
+ Flux.create(sink -> taskLogService.registerSink(id, sink),
FluxSink.OverflowStrategy.BUFFER);
flux.subscribe(
s -> {
try {
emitter.send(s);
- } catch (IOException e) {
- commandLogService.unregisterSink(id);
+ } catch (Exception e) {
emitter.completeWithError(e);
}
},
@@ -62,7 +61,6 @@ public class SseController {
emitter::complete);
emitter.onTimeout(emitter::complete);
- emitter.onCompletion(() -> commandLogService.unregisterSink(id));
return emitter;
}
}
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java
index 6142245..f759e16 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/grpc/GrpcClient.java
@@ -122,7 +122,7 @@ public class GrpcClient {
if (isChannelAlive(host)) {
return CHANNELS.get(host);
} else {
- throw new ApiException(ApiExceptionEnum.HOST_NOT_CONNECTED, host);
+ return createChannel(host);
}
}
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java
index 81705ab..9944246 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/scheduler/HostInfoScheduler.java
@@ -54,10 +54,6 @@ public class HostInfoScheduler {
private void getHostInfo(Host host) {
String hostname = host.getHostname();
try {
- if (!GrpcClient.isChannelAlive(hostname)) {
- GrpcClient.createChannel(hostname);
- }
-
HostInfoServiceGrpc.HostInfoServiceBlockingStub stub =
GrpcClient.getBlockingStub(hostname,
HostInfoServiceGrpc.HostInfoServiceBlockingStub.class);
HostInfoReply reply =
stub.getHostInfo(HostInfoRequest.newBuilder().build());
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/TaskLogService.java
similarity index 79%
rename from
bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
rename to
bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/TaskLogService.java
index e0bcdfd..cc1bee7 100644
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/CommandLogService.java
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/TaskLogService.java
@@ -20,15 +20,7 @@ package org.apache.bigtop.manager.server.service;
import reactor.core.publisher.FluxSink;
-public interface CommandLogService {
+public interface TaskLogService {
void registerSink(Long taskId, FluxSink<String> sink);
-
- void unregisterSink(Long taskId);
-
- void onLogStarted(Long taskId, String hostname);
-
- void onLogReceived(Long taskId, String hostname, String log);
-
- void onLogEnded(Long taskId, String hostname);
}
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/CommandLogServiceImpl.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/CommandLogServiceImpl.java
deleted file mode 100644
index 945eb47..0000000
---
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/CommandLogServiceImpl.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.bigtop.manager.server.service.impl;
-
-import org.apache.bigtop.manager.server.service.CommandLogService;
-
-import org.apache.commons.collections4.CollectionUtils;
-
-import org.springframework.stereotype.Service;
-
-import reactor.core.publisher.FluxSink;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@Service
-public class CommandLogServiceImpl implements CommandLogService {
-
- private final Map<Long, FluxSink<String>> taskSinks = new HashMap<>();
-
- private final Map<Long, List<String>> logs = new HashMap<>();
-
- public void registerSink(Long taskId, FluxSink<String> sink) {
- List<String> list = logs.get(taskId);
- if (CollectionUtils.isNotEmpty(list)) {
- synchronized (list) {
- taskSinks.put(taskId, sink);
- for (String log : list) {
- sink.next(log);
- }
- }
- } else {
- // Task already completed, get logs from database
- sink.next("Task finished, please check the log details on agent
machine.");
- sink.complete();
- }
- }
-
- public void unregisterSink(Long taskId) {
- taskSinks.remove(taskId);
- }
-
- @Override
- public void onLogStarted(Long taskId, String hostname) {
- logs.put(taskId, new ArrayList<>());
- }
-
- @Override
- public void onLogReceived(Long taskId, String hostname, String log) {
- List<String> list = logs.get(taskId);
-
- synchronized (list) {
- list.add(log);
- FluxSink<String> sink = taskSinks.get(taskId);
- if (sink != null) {
- sink.next(log);
- }
- }
- }
-
- @Override
- public void onLogEnded(Long taskId, String hostname) {
- List<String> list = logs.get(taskId);
- synchronized (list) {
- FluxSink<String> sink = taskSinks.remove(taskId);
- if (sink != null) {
- sink.complete();
- }
- }
-
- logs.remove(taskId);
- }
-}
diff --git
a/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/TaskLogServiceImpl.java
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/TaskLogServiceImpl.java
new file mode 100644
index 0000000..7d91360
--- /dev/null
+++
b/bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/service/impl/TaskLogServiceImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bigtop.manager.server.service.impl;
+
+import org.apache.bigtop.manager.common.enums.JobState;
+import org.apache.bigtop.manager.dao.entity.Task;
+import org.apache.bigtop.manager.dao.repository.TaskRepository;
+import org.apache.bigtop.manager.grpc.generated.TaskLogReply;
+import org.apache.bigtop.manager.grpc.generated.TaskLogRequest;
+import org.apache.bigtop.manager.grpc.generated.TaskLogServiceGrpc;
+import org.apache.bigtop.manager.server.grpc.GrpcClient;
+import org.apache.bigtop.manager.server.service.TaskLogService;
+
+import org.springframework.stereotype.Service;
+
+import io.grpc.stub.StreamObserver;
+import reactor.core.publisher.FluxSink;
+
+import jakarta.annotation.Resource;
+
+@Service
+public class TaskLogServiceImpl implements TaskLogService {
+
+ @Resource
+ private TaskRepository taskRepository;
+
+ public void registerSink(Long taskId, FluxSink<String> sink) {
+ Task task = taskRepository.getReferenceById(taskId);
+ String hostname = task.getHostname();
+
+ if (task.getState() == JobState.PENDING || task.getState() ==
JobState.CANCELED) {
+ new Thread(() -> {
+ sink.next("There is no log when task is in status: "
+ + task.getState().name().toLowerCase()
+ + ", please reopen the window when status
changed");
+ sink.complete();
+ })
+ .start();
+ } else {
+ TaskLogServiceGrpc.TaskLogServiceStub asyncStub =
+ GrpcClient.getAsyncStub(hostname,
TaskLogServiceGrpc.TaskLogServiceStub.class);
+ TaskLogRequest request =
+ TaskLogRequest.newBuilder().setTaskId(taskId).build();
+ asyncStub.getLog(request, new LogReader(sink));
+ }
+ }
+
+ private record LogReader(FluxSink<String> sink) implements
StreamObserver<TaskLogReply> {
+
+ @Override
+ public void onNext(TaskLogReply reply) {
+ sink.next(reply.getText());
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ sink.error(t);
+ }
+
+ @Override
+ public void onCompleted() {
+ sink.complete();
+ }
+ }
+}
diff --git a/bigtop-manager-server/src/main/resources/ddl/DaMeng-DDL-CREATE.sql
b/bigtop-manager-server/src/main/resources/ddl/DaMeng-DDL-CREATE.sql
index d86816e..f2eabdd 100644
--- a/bigtop-manager-server/src/main/resources/ddl/DaMeng-DDL-CREATE.sql
+++ b/bigtop-manager-server/src/main/resources/ddl/DaMeng-DDL-CREATE.sql
@@ -62,27 +62,6 @@ CREATE UNIQUE INDEX INDEX33555701 ON
"bigtop_manager"."cluster" ("cluster_name")
CREATE INDEX "idx_cluster_stack_id" ON "bigtop_manager"."cluster" ("stack_id");
--- "bigtop_manager"."command_log" definition
-
-CREATE TABLE "bigtop_manager"."command_log" (
- "id" BIGINT NOT NULL,
- "create_by" BIGINT NULL,
- "create_time" DATETIME NULL,
- "update_by" BIGINT NULL,
- "update_time" DATETIME NULL,
- "hostname" VARCHAR(255) NULL,
- "result" CLOB NULL,
- "job_id" BIGINT NULL,
- "stage_id" BIGINT NULL,
- "task_id" BIGINT NULL,
- CONSTRAINT CONS134218865 PRIMARY KEY ("id")
-);
-CREATE UNIQUE INDEX INDEX33555669 ON "bigtop_manager"."command_log" ("id");
-CREATE INDEX "idx_cl_job_id" ON "bigtop_manager"."command_log" ("job_id");
-CREATE INDEX "idx_cl_stage_id" ON "bigtop_manager"."command_log" ("stage_id");
-CREATE INDEX "idx_cl_task_id" ON "bigtop_manager"."command_log" ("task_id");
-
-
-- "bigtop_manager"."component" definition
CREATE TABLE "bigtop_manager"."component" (
diff --git
a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/kafka/KafkaBrokerScript.java
b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/kafka/KafkaBrokerScript.java
index fc173c3..dbb18eb 100644
---
a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/kafka/KafkaBrokerScript.java
+++
b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/kafka/KafkaBrokerScript.java
@@ -31,36 +31,36 @@ public class KafkaBrokerScript implements Script {
@Override
public ShellResult install(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
@Override
public ShellResult configure(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
@Override
public ShellResult start(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
@Override
public ShellResult stop(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
@Override
public ShellResult status(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
public ShellResult test(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
}
diff --git
a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperClientScript.java
b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperClientScript.java
index 08e3bf0..fced816 100644
---
a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperClientScript.java
+++
b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperClientScript.java
@@ -32,13 +32,13 @@ public class ZookeeperClientScript implements ClientScript {
@Override
public ShellResult install(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
@Override
public ShellResult configure(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
}
diff --git
a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperServerScript.java
b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperServerScript.java
index 48d75da..2487150 100644
---
a/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperServerScript.java
+++
b/bigtop-manager-stack/bigtop-manager-stack-nop/src/main/java/org/apache/bigtop/manager/stack/nop/v1_0_0/zookeeper/ZookeeperServerScript.java
@@ -31,31 +31,31 @@ public class ZookeeperServerScript implements Script {
@Override
public ShellResult install(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
@Override
public ShellResult configure(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
@Override
public ShellResult start(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
@Override
public ShellResult stop(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
@Override
public ShellResult status(Params params) {
- log.info("Default to success in dev mode");
+ log.info("Default to success on dev mode");
return ShellResult.success();
}
}
diff --git a/bigtop-manager-ui/src/components/job-info/job.vue
b/bigtop-manager-ui/src/components/job-info/job.vue
index 7abb2b7..9ca8568 100644
--- a/bigtop-manager-ui/src/components/job-info/job.vue
+++ b/bigtop-manager-ui/src/components/job-info/job.vue
@@ -223,6 +223,7 @@
const clickTask = (record: TaskVO) => {
breadcrumbs.value.push(record)
currTaskInfo.value = record
+ logTextOrigin.value = ''
getLogsInfo(record.id)
}