YARN-5523. Yarn running container log fetching causes OutOfMemoryError (Xuan Gong via Varun Saxena)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3037c56 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3037c56 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3037c56 Branch: refs/heads/HADOOP-13345 Commit: e3037c564117fe53742c130665b047dd17eff6d0 Parents: f80a729 Author: Varun Saxena <[email protected]> Authored: Thu Aug 18 01:45:33 2016 +0530 Committer: Varun Saxena <[email protected]> Committed: Thu Aug 18 01:45:33 2016 +0530 ---------------------------------------------------------------------- .../apache/hadoop/yarn/client/cli/LogsCLI.java | 74 +++++++++++---- .../hadoop/yarn/client/cli/TestLogsCLI.java | 95 ++++++++++++++++++++ 2 files changed, 150 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3037c56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java index 908d379..25e3a46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/LogsCLI.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.client.cli; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.util.ArrayList; import java.util.Arrays; @@ -41,6 +42,7 @@ import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -462,15 +464,7 @@ public class LogsCLI extends Configured implements Tool { PrintStream out = logCliHelper.createPrintStream(localDir, nodeId, containerIdStr); try { - // fetch all the log files for the container - // filter the log files based on the given -log_files pattern - List<PerLogFileInfo> allLogFileInfos= - getContainerLogFiles(getConf(), containerIdStr, nodeHttpAddress); - List<String> fileNames = new ArrayList<String>(); - for (PerLogFileInfo fileInfo : allLogFileInfos) { - fileNames.add(fileInfo.getFileName()); - } - Set<String> matchedFiles = getMatchedLogFiles(request, fileNames, + Set<String> matchedFiles = getMatchedContainerLogFiles(request, useRegex); if (matchedFiles.isEmpty()) { System.err.println("Can not find any log file matching the pattern: " @@ -487,22 +481,33 @@ public class LogsCLI extends Configured implements Tool { out.println(containerString); out.println(StringUtils.repeat("=", containerString.length())); boolean foundAnyLogs = false; + byte[] buffer = new byte[65536]; for (String logFile : newOptions.getLogTypes()) { out.println("LogType:" + logFile); out.println("Log Upload Time:" + Times.format(System.currentTimeMillis())); out.println("Log Contents:"); + InputStream is = null; try { - WebResource webResource = - webServiceClient.resource(WebAppUtils.getHttpSchemePrefix(conf) - + nodeHttpAddress); - ClientResponse response = - webResource.path("ws").path("v1").path("node") - .path("containers").path(containerIdStr).path("logs") - .path(logFile) - .queryParam("size", Long.toString(request.getBytes())) - .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); - out.println(response.getEntity(String.class)); + ClientResponse response = getResponeFromNMWebService(conf, + webServiceClient, request, logFile); + if (response != null && response.getStatusInfo().getStatusCode() == + ClientResponse.Status.OK.getStatusCode()) { + is = response.getEntityInputStream(); + int len = 0; + while((len = is.read(buffer)) != -1) { + out.write(buffer, 0, len); + } + out.println(); + } else { + out.println("Can not get any logs for the log file: " + logFile); + String msg = "Response from the NodeManager:" + nodeId + + " WebService is " + ((response == null) ? "null": + "not successful," + " HTTP error code: " + + response.getStatus() + ", Server response:\n" + + response.getEntity(String.class)); + out.println(msg); + } StringBuilder sb = new StringBuilder(); sb.append("End of LogType:" + logFile + "."); if (request.getContainerState() == ContainerState.RUNNING) { @@ -517,6 +522,8 @@ public class LogsCLI extends Configured implements Tool { System.err.println("Can not find the log file:" + logFile + " for the container:" + containerIdStr + " in NodeManager:" + nodeId); + } finally { + IOUtils.closeQuietly(is); } } // for the case, we have already uploaded partial logs in HDFS @@ -1189,4 +1196,33 @@ public class LogsCLI extends Configured implements Tool { this.fileLength = fileLength; } } + + @VisibleForTesting + public Set<String> getMatchedContainerLogFiles(ContainerLogsRequest request, + boolean useRegex) throws IOException { + // fetch all the log files for the container + // filter the log files based on the given -log_files pattern + List<PerLogFileInfo> allLogFileInfos= + getContainerLogFiles(getConf(), request.getContainerId(), + request.getNodeHttpAddress()); + List<String> fileNames = new ArrayList<String>(); + for (PerLogFileInfo fileInfo : allLogFileInfos) { + fileNames.add(fileInfo.getFileName()); + } + return getMatchedLogFiles(request, fileNames, + useRegex); + } + + @VisibleForTesting + public ClientResponse getResponeFromNMWebService(Configuration conf, + Client webServiceClient, ContainerLogsRequest request, String logFile) { + WebResource webResource = + webServiceClient.resource(WebAppUtils.getHttpSchemePrefix(conf) + + request.getNodeHttpAddress()); + return webResource.path("ws").path("v1").path("node") + .path("containers").path(request.getContainerId()).path("logs") + .path(logFile) + .queryParam("size", Long.toString(request.getBytes())) + .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3037c56/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 19ddb2f..a564f6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -30,10 +31,14 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.ClientResponse.Status; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; @@ -63,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -582,6 +588,95 @@ public class TestLogsCLI { } @Test (timeout = 5000) + public void testGetRunningContainerLogs() throws Exception { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + + NodeId nodeId = NodeId.newInstance("localhost", 1234); + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId appAttemptId = ApplicationAttemptId + .newInstance(appId, 1); + + // Create a mock ApplicationAttempt Report + ApplicationAttemptReport mockAttemptReport = mock( + ApplicationAttemptReport.class); + doReturn(appAttemptId).when(mockAttemptReport).getApplicationAttemptId(); + List<ApplicationAttemptReport> attemptReports = Arrays.asList( + mockAttemptReport); + + // Create one mock containerReport + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1); + ContainerReport mockContainerReport1 = mock(ContainerReport.class); + doReturn(containerId1).when(mockContainerReport1).getContainerId(); + doReturn(nodeId).when(mockContainerReport1).getAssignedNode(); + doReturn("http://localhost:2345").when(mockContainerReport1) + .getNodeHttpAddress(); + doReturn(ContainerState.RUNNING).when(mockContainerReport1) + .getContainerState(); + List<ContainerReport> containerReports = Arrays.asList( + mockContainerReport1); + + // Mock the YarnClient, and it would report the previous created + // mockAttemptReport and previous two created mockContainerReports + YarnClient mockYarnClient = createMockYarnClient( + YarnApplicationState.RUNNING, ugi.getShortUserName(), true, + attemptReports, containerReports); + doReturn(mockContainerReport1).when(mockYarnClient).getContainerReport( + any(ContainerId.class)); + + // create local logs + Configuration configuration = new Configuration(); + FileSystem fs = FileSystem.get(configuration); + String rootLogDir = "target/LocalLogs"; + Path rootLogDirPath = new Path(rootLogDir); + if (fs.exists(rootLogDirPath)) { + fs.delete(rootLogDirPath, true); + } + assertTrue(fs.mkdirs(rootLogDirPath)); + + Path appLogsDir = new Path(rootLogDirPath, appId.toString()); + if (fs.exists(appLogsDir)) { + fs.delete(appLogsDir, true); + } + assertTrue(fs.mkdirs(appLogsDir)); + + String fileName = "syslog"; + List<String> logTypes = new ArrayList<String>(); + logTypes.add(fileName); + // create container logs in localLogDir + createContainerLogInLocalDir(appLogsDir, containerId1, fs, logTypes); + + Path containerDirPath = new Path(appLogsDir, containerId1.toString()); + Path logPath = new Path(containerDirPath, fileName); + File logFile = new File(logPath.toString()); + final FileInputStream fis = new FileInputStream(logFile); + + try { + LogsCLI cli = spy(new LogsCLIForTest(mockYarnClient)); + Set<String> logsSet = new HashSet<String>(); + logsSet.add(fileName); + doReturn(logsSet).when(cli).getMatchedContainerLogFiles( + any(ContainerLogsRequest.class), anyBoolean()); + ClientResponse mockReponse = mock(ClientResponse.class); + doReturn(Status.OK).when(mockReponse).getStatusInfo(); + doReturn(fis).when(mockReponse).getEntityInputStream(); + doReturn(mockReponse).when(cli).getResponeFromNMWebService( + any(Configuration.class), + any(Client.class), + any(ContainerLogsRequest.class), anyString()); + cli.setConf(new YarnConfiguration()); + int exitCode = cli.run(new String[] {"-containerId", + containerId1.toString()}); + assertTrue(exitCode == 0); + assertTrue(sysOutStream.toString().contains( + logMessage(containerId1, "syslog"))); + sysOutStream.reset(); + } finally { + IOUtils.closeQuietly(fis); + fs.delete(new Path(rootLogDir), true); + } + } + + @Test (timeout = 5000) public void testFetchRunningApplicationLogs() throws Exception { UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
