This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new ea6b1de [Improvement][worker] Optimize KillYarnJob (4939) (#4943)
ea6b1de is described below
commit ea6b1de120cd438b7611b966d396164dec421cbb
Author: wenjun <[email protected]>
AuthorDate: Thu Mar 11 22:13:44 2021 +0800
[Improvement][worker] Optimize KillYarnJob (4939) (#4943)
---
.../dolphinscheduler/common/utils/LoggerUtils.java | 27 ++++
.../common/utils/LoggerUtilsTest.java | 54 +++++++-
.../server/log/LoggerRequestProcessor.java | 28 +---
.../server/log/LoggerRequestProcessorTest.java | 56 ++++++++
.../service/log/LogClientService.java | 18 ++-
.../service/log/LogClientServiceTest.java | 143 +++++++++++++++++++++
pom.xml | 3 +-
7 files changed, 290 insertions(+), 39 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
index 211f0a0..c9e4ebf 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
@@ -19,6 +19,10 @@ package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -26,6 +30,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* logger utils
@@ -36,6 +41,8 @@ public class LoggerUtils {
throw new UnsupportedOperationException("Construct LoggerUtils");
}
+ private static final Logger logger =
LoggerFactory.getLogger(LoggerUtils.class);
+
/**
* rules for extracting application ID
*/
@@ -101,6 +108,26 @@ public class LoggerUtils {
return appIds;
}
+ /**
+ * read whole file content
+ *
+ * @param filePath file path
+ * @return whole file content
+ */
+ public static String readWholeFileContent(String filePath) {
+ String line;
+ StringBuilder sb = new StringBuilder();
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(new
FileInputStream(filePath)))) {
+ while ((line = br.readLine()) != null) {
+ sb.append(line + "\r\n");
+ }
+ return sb.toString();
+ } catch (IOException e) {
+ logger.error("read file error", e);
+ }
+ return "";
+ }
+
public static void logError(Optional<Logger> optionalLogger
, String error) {
optionalLogger.ifPresent((Logger logger) -> logger.error(error));
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java
index 5a80e38..811dff5 100644
---
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/LoggerUtilsTest.java
@@ -14,30 +14,72 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common.utils;
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Optional;
+
import org.junit.Assert;
import org.junit.Test;
+import org.junit.Test.None;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LoggerUtils.class})
public class LoggerUtilsTest {
private Logger logger = LoggerFactory.getLogger(LoggerUtilsTest.class);
@Test
public void buildTaskId() {
- String taskId =
LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,79,4084,15210);
+ String taskId =
LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX, 79, 4084, 15210);
- Assert.assertEquals(" - [taskAppId=TASK-79-4084-15210]", taskId);
+ Assert.assertEquals(" - [taskAppId=TASK-79-4084-15210]", taskId);
}
@Test
public void getAppIds() {
- List<String> appIdList = LoggerUtils.getAppIds("Running job:
application_1_1",logger);
- Assert.assertEquals("application_1_1", appIdList.get(0));
+ List<String> appIdList = LoggerUtils.getAppIds("Running job:
application_1_1", logger);
+ Assert.assertEquals("application_1_1", appIdList.get(0));
+
+ }
+
+ @Test
+ public void testReadWholeFileContent() throws Exception {
+ BufferedReader bufferedReader =
PowerMockito.mock(BufferedReader.class);
+
PowerMockito.whenNew(BufferedReader.class).withAnyArguments().thenReturn(bufferedReader);
+
PowerMockito.when(bufferedReader.readLine()).thenReturn("").thenReturn(null);
+ FileInputStream fileInputStream =
PowerMockito.mock(FileInputStream.class);
+
PowerMockito.whenNew(FileInputStream.class).withAnyArguments().thenReturn(fileInputStream);
+
+ InputStreamReader inputStreamReader =
PowerMockito.mock(InputStreamReader.class);
+
PowerMockito.whenNew(InputStreamReader.class).withAnyArguments().thenReturn(inputStreamReader);
+
+ String log = LoggerUtils.readWholeFileContent("/tmp/log");
+ Assert.assertNotNull(log);
+
+ PowerMockito.when(bufferedReader.readLine()).thenThrow(new
IOException());
+ log = LoggerUtils.readWholeFileContent("/tmp/log");
+ Assert.assertNotNull(log);
+ }
+
+ @Test(expected = None.class)
+ public void testLogError() {
+ Optional<Logger> loggerOptional = Optional.of(this.logger);
+ LoggerUtils.logError(loggerOptional, "error message");
+ LoggerUtils.logError(loggerOptional, new RuntimeException("error
message"));
+ LoggerUtils.logError(loggerOptional, "error message", new
RuntimeException("runtime exception"));
+ LoggerUtils.logInfo(loggerOptional, "info message");
}
}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
index e9a85f4..c60a447 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessor.java
@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.log;
import org.apache.dolphinscheduler.common.utils.IOUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import
org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
@@ -31,13 +32,11 @@ import
org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
-import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
@@ -86,7 +85,7 @@ public class LoggerRequestProcessor implements
NettyRequestProcessor {
case VIEW_WHOLE_LOG_REQUEST:
ViewLogRequestCommand viewLogRequest = JSONUtils.parseObject(
command.getBody(), ViewLogRequestCommand.class);
- String msg = readWholeFileContent(viewLogRequest.getPath());
+ String msg =
LoggerUtils.readWholeFileContent(viewLogRequest.getPath());
ViewLogResponseCommand viewLogResponse = new
ViewLogResponseCommand(msg);
channel.writeAndFlush(viewLogResponse.convert2Command(command.getOpaque()));
break;
@@ -182,27 +181,4 @@ public class LoggerRequestProcessor implements
NettyRequestProcessor {
return Collections.emptyList();
}
- /**
- * read whole file content
- *
- * @param filePath file path
- * @return whole file content
- */
- private String readWholeFileContent(String filePath) {
- BufferedReader br = null;
- String line;
- StringBuilder sb = new StringBuilder();
- try {
- br = new BufferedReader(new InputStreamReader(new
FileInputStream(filePath)));
- while ((line = br.readLine()) != null) {
- sb.append(line + "\r\n");
- }
- return sb.toString();
- } catch (IOException e) {
- logger.error("read file error",e);
- } finally {
- IOUtils.closeQuietly(br);
- }
- return "";
- }
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessorTest.java
new file mode 100644
index 0000000..e245395
--- /dev/null
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/log/LoggerRequestProcessorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.log;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.remote.command.Command;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
+
+import org.junit.Test;
+import org.junit.Test.None;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import io.netty.channel.Channel;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LoggerUtils.class})
+public class LoggerRequestProcessorTest {
+
+ @Test(expected = None.class)
+ public void testProcessViewWholeLogRequest() {
+ Channel channel = PowerMockito.mock(Channel.class);
+
PowerMockito.when(channel.writeAndFlush(Mockito.any(Command.class))).thenReturn(null);
+ PowerMockito.mockStatic(LoggerUtils.class);
+
PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("");
+
+ ViewLogRequestCommand logRequestCommand = new
ViewLogRequestCommand("/log/path");
+
+ Command command = new Command();
+ command.setType(CommandType.VIEW_WHOLE_LOG_REQUEST);
+ command.setBody(JSONUtils.toJsonByteArray(logRequestCommand));
+
+ LoggerRequestProcessor loggerRequestProcessor = new
LoggerRequestProcessor();
+ loggerRequestProcessor.process(channel, command);
+ }
+}
\ No newline at end of file
diff --git
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
index 75753c7..5f0f545 100644
---
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
+++
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/log/LogClientService.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.service.log;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import
org.apache.dolphinscheduler.remote.command.log.GetLogBytesRequestCommand;
@@ -30,6 +31,7 @@ import
org.apache.dolphinscheduler.remote.command.log.ViewLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.remote.utils.IPUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,12 +118,16 @@ public class LogClientService {
String result = "";
final Host address = new Host(host, port);
try {
- Command command = request.convert2Command();
- Command response = this.client.sendSync(address, command,
LOG_REQUEST_TIMEOUT);
- if (response != null) {
- ViewLogResponseCommand viewLog = JSONUtils.parseObject(
- response.getBody(), ViewLogResponseCommand.class);
- return viewLog.getMsg();
+ if (IPUtils.getLocalHost().equals(host)) {
+ result = LoggerUtils.readWholeFileContent(request.getPath());
+ } else {
+ Command command = request.convert2Command();
+ Command response = this.client.sendSync(address, command,
LOG_REQUEST_TIMEOUT);
+ if (response != null) {
+ ViewLogResponseCommand viewLog = JSONUtils.parseObject(
+ response.getBody(), ViewLogResponseCommand.class);
+ result = viewLog.getMsg();
+ }
}
} catch (Exception e) {
logger.error("view log error", e);
diff --git
a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java
new file mode 100644
index 0000000..58d7c1e
--- /dev/null
+++
b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/log/LogClientServiceTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.log;
+
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.remote.NettyRemotingClient;
+import org.apache.dolphinscheduler.remote.command.Command;
+import
org.apache.dolphinscheduler.remote.command.log.GetLogBytesResponseCommand;
+import
org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand;
+import
org.apache.dolphinscheduler.remote.command.log.RollViewLogResponseCommand;
+import org.apache.dolphinscheduler.remote.command.log.ViewLogResponseCommand;
+import org.apache.dolphinscheduler.remote.utils.Host;
+import org.apache.dolphinscheduler.remote.utils.IPUtils;
+
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.Test.None;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LogClientService.class, IPUtils.class, LoggerUtils.class,
NettyRemotingClient.class})
+public class LogClientServiceTest {
+
+ @Test
+ public void testViewLogFromLocal() {
+ String localMachine = "LOCAL_MACHINE";
+ int port = 1234;
+ String path = "/tmp/log";
+
+ PowerMockito.mockStatic(IPUtils.class);
+ PowerMockito.when(IPUtils.getLocalHost()).thenReturn(localMachine);
+ PowerMockito.mockStatic(LoggerUtils.class);
+
PowerMockito.when(LoggerUtils.readWholeFileContent(Mockito.anyString())).thenReturn("application_xx_11");
+
+ LogClientService logClientService = new LogClientService();
+ String log = logClientService.viewLog(localMachine, port, path);
+ Assert.assertNotNull(log);
+ }
+
+ @Test
+ public void testViewLogFromRemote() throws Exception {
+ String localMachine = "LOCAL_MACHINE";
+ int port = 1234;
+ String path = "/tmp/log";
+
+ PowerMockito.mockStatic(IPUtils.class);
+ PowerMockito.when(IPUtils.getLocalHost()).thenReturn(localMachine +
"1");
+
+ NettyRemotingClient remotingClient =
PowerMockito.mock(NettyRemotingClient.class);
+
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
+
+ Command command = new Command();
+ command.setBody(JSONUtils.toJsonString(new
ViewLogResponseCommand("")).getBytes(StandardCharsets.UTF_8));
+ PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class),
Mockito.any(Command.class), Mockito.anyLong()))
+ .thenReturn(command);
+ LogClientService logClientService = new LogClientService();
+ String log = logClientService.viewLog(localMachine, port, path);
+ Assert.assertNotNull(log);
+ }
+
+ @Test(expected = None.class)
+ public void testClose() throws Exception {
+ NettyRemotingClient remotingClient =
PowerMockito.mock(NettyRemotingClient.class);
+
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
+ PowerMockito.doNothing().when(remotingClient).close();
+
+ LogClientService logClientService = new LogClientService();
+ logClientService.close();
+ }
+
+ @Test
+ public void testRollViewLog() throws Exception {
+ NettyRemotingClient remotingClient =
PowerMockito.mock(NettyRemotingClient.class);
+
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
+
+ Command command = new Command();
+ command.setBody(JSONUtils.toJsonByteArray(new
RollViewLogResponseCommand("success")));
+ PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class),
Mockito.any(Command.class), Mockito.anyLong()))
+ .thenReturn(command);
+
+ LogClientService logClientService = new LogClientService();
+ String msg = logClientService.rollViewLog("localhost", 1234,
"/tmp/log", 0, 10);
+ Assert.assertNotNull(msg);
+ }
+
+ @Test
+ public void testGetLogBytes() throws Exception {
+ NettyRemotingClient remotingClient =
PowerMockito.mock(NettyRemotingClient.class);
+
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
+
+ Command command = new Command();
+ command.setBody(JSONUtils.toJsonByteArray(new
GetLogBytesResponseCommand("log".getBytes(StandardCharsets.UTF_8))));
+ PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class),
Mockito.any(Command.class), Mockito.anyLong()))
+ .thenReturn(command);
+
+ LogClientService logClientService = new LogClientService();
+ byte[] logBytes = logClientService.getLogBytes("localhost", 1234,
"/tmp/log");
+ Assert.assertNotNull(logBytes);
+ }
+
+ @Test
+ public void testRemoveTaskLog() throws Exception {
+ NettyRemotingClient remotingClient =
PowerMockito.mock(NettyRemotingClient.class);
+
PowerMockito.whenNew(NettyRemotingClient.class).withAnyArguments().thenReturn(remotingClient);
+
+ Command command = new Command();
+ command.setBody(JSONUtils.toJsonByteArray(new
RemoveTaskLogResponseCommand(true)));
+ PowerMockito.when(remotingClient.sendSync(Mockito.any(Host.class),
Mockito.any(Command.class), Mockito.anyLong()))
+ .thenReturn(command);
+
+ LogClientService logClientService = new LogClientService();
+ Boolean status = logClientService.removeTaskLog("localhost", 1234,
"/log/path");
+ Assert.assertTrue(status);
+ }
+
+ @Test
+ public void testIsRunning() {
+ LogClientService logClientService = new LogClientService();
+ Assert.assertTrue(logClientService.isRunning());
+ }
+}
diff --git a/pom.xml b/pom.xml
index 2ab57bc..525b486 100644
--- a/pom.xml
+++ b/pom.xml
@@ -908,6 +908,7 @@
<include>**/server/entity/SQLTaskExecutionContextTest.java</include>
<include>**/server/log/MasterLogFilterTest.java</include>
<include>**/server/log/SensitiveDataConverterTest.java</include>
+
<include>**/server/log/LoggerRequestProcessorTest.java</include>
<!--<include>**/server/log/TaskLogDiscriminatorTest.java</include>-->
<include>**/server/log/TaskLogFilterTest.java</include>
<include>**/server/log/WorkerLogFilterTest.java</include>
@@ -966,7 +967,7 @@
<include>**/service/zk/RegisterOperatorTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include>
<include>**/service/queue/PeerTaskInstancePriorityQueueTest.java</include>
-
+
<include>**/service/log/LogClientServiceTest.java</include>
<include>**/service/alert/AlertClientServiceTest.java</include>
<include>**/dao/mapper/DataSourceUserMapperTest.java</include>
<!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>-->