This is an automated email from the ASF dual-hosted git repository.
sunilg pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new f4ee38d YARN-9519. TFile log aggregation file format is not working
for yarn.log-aggregation.TFile.remote-app-log-dir config. Contributed by Adam
Antal.
f4ee38d is described below
commit f4ee38df29b7c6daa2fd32af2b0de49497892980
Author: Sunil G <[email protected]>
AuthorDate: Tue May 14 10:48:08 2019 -0700
YARN-9519. TFile log aggregation file format is not working for
yarn.log-aggregation.TFile.remote-app-log-dir config. Contributed by Adam Antal.
(cherry picked from commit 7d831eca645f93d064975ebae35a7cbea3bbad31)
---
.../LogAggregationFileController.java | 43 ++++
.../ifile/LogAggregationIndexedFileController.java | 20 --
.../tfile/LogAggregationTFileController.java | 8 +-
.../TestLogAggregationFileControllerFactory.java | 261 ++++++++++++++-------
4 files changed, 223 insertions(+), 109 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index e37308d..6f1162a 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -132,6 +132,10 @@ public abstract class LogAggregationFileController {
this.retentionSize = configuredRetentionSize;
}
this.fileControllerName = controllerName;
+
+ extractRemoteRootLogDir();
+ extractRemoteRootLogDirSuffix();
+
initInternal(conf);
}
@@ -249,6 +253,45 @@ public abstract class LogAggregationFileController {
Path aggregatedLogPath, ApplicationId appId) throws IOException;
/**
+ * Sets the remoteRootLogDirSuffix class variable extracting
+ * {@link YarnConfiguration#LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT}
+ * from the configuration, or
+ * {@link YarnConfiguration#NM_REMOTE_APP_LOG_DIR_SUFFIX} appended by the
+ * FileController's name, if the former is not set.
+ */
+ private void extractRemoteRootLogDirSuffix() {
+ String suffix = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
+ fileControllerName);
+ remoteRootLogDirSuffix = conf.get(suffix);
+ if (remoteRootLogDirSuffix == null
+ || remoteRootLogDirSuffix.isEmpty()) {
+ remoteRootLogDirSuffix = conf.get(
+ YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)
+ + "-" + fileControllerName.toLowerCase();
+ }
+ }
+
+ /**
+ * Sets the remoteRootLogDir class variable extracting
+ * {@link YarnConfiguration#LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT}
+ * from the configuration or {@link YarnConfiguration#NM_REMOTE_APP_LOG_DIR},
+ * if the former is not set.
+ */
+ private void extractRemoteRootLogDir() {
+ String remoteDirStr = String.format(
+ YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
+ fileControllerName);
+ String remoteDir = conf.get(remoteDirStr);
+ if (remoteDir == null || remoteDir.isEmpty()) {
+ remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
+ }
+ remoteRootLogDir = new Path(remoteDir);
+ }
+
+ /**
* Verify and create the remote log directory.
*/
public void verifyAndCreateRemoteLogDir() {
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
index 78b0c13..2b6a610 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
@@ -145,26 +145,6 @@ public class LogAggregationIndexedFileController
+ " use LogAggregationIndexedFileController when the FileSystem "
+ "support append operations.");
}
- String remoteDirStr = String.format(
- YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
- this.fileControllerName);
- String remoteDir = conf.get(remoteDirStr);
- if (remoteDir == null || remoteDir.isEmpty()) {
- remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
- }
- this.remoteRootLogDir = new Path(remoteDir);
- String suffix = String.format(
- YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
- this.fileControllerName);
- this.remoteRootLogDirSuffix = conf.get(suffix);
- if (this.remoteRootLogDirSuffix == null
- || this.remoteRootLogDirSuffix.isEmpty()) {
- this.remoteRootLogDirSuffix = conf.get(
- YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX)
- + "-ifile";
- }
String compressName = conf.get(
YarnConfiguration.NM_LOG_AGG_COMPRESSION_TYPE,
YarnConfiguration.DEFAULT_NM_LOG_AGG_COMPRESSION_TYPE);
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
index b3103d2..561aec4 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
@@ -77,12 +76,7 @@ public class LogAggregationTFileController
@Override
public void initInternal(Configuration conf) {
- this.remoteRootLogDir = new Path(
- conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
- this.remoteRootLogDirSuffix =
- conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
- YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+ // do nothing
}
@Override
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
index 99aca1b..2d2fb49 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
@@ -18,17 +18,26 @@
package org.apache.hadoop.yarn.logaggregation.filecontroller;
-import static org.junit.Assert.*;
+import static
org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
+import static
org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT;
+import static
org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Writer;
-import java.util.LinkedList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -38,120 +47,208 @@ import
org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
-import
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
-import
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
-import
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
+import
org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController;
import
org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
+import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test LogAggregationFileControllerFactory.
- *
*/
-public class TestLogAggregationFileControllerFactory {
+public class TestLogAggregationFileControllerFactory extends Configured {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestLogAggregationFileControllerFactory.class);
- @Test(timeout = 10000)
- public void testLogAggregationFileControllerFactory() throws Exception {
- ApplicationId appId = ApplicationId.newInstance(
- System.currentTimeMillis(), 1);
- String appOwner = "test";
- String remoteLogRootDir = "target/app-logs/";
+ private static final String REMOTE_LOG_ROOT = "target/app-logs/";
+ private static final String REMOTE_DEFAULT_DIR = "default/";
+ private static final String APP_OWNER = "test";
+
+ private static final String WRONG_ROOT_LOG_DIR_MSG =
+ "Wrong remote root log directory found.";
+ private static final String WRONG_ROOT_LOG_DIR_SUFFIX_MSG =
+ "Wrong remote root log directory suffix found.";
+
+ private static final List<Class<? extends LogAggregationFileController>>
+ ALL_FILE_CONTROLLERS = Arrays.asList(
+ TestLogAggregationFileController.class,
+ LogAggregationIndexedFileController.class,
+ LogAggregationTFileController.class);
+ private static final List<String> ALL_FILE_CONTROLLER_NAMES =
+ Arrays.asList("TestLogAggregationFileController", "IFile", "TFile");
+
+ private ApplicationId appId = ApplicationId.newInstance(
+ System.currentTimeMillis(), 1);
+
+ @Before
+ public void setup() throws IOException {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
- conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_LOG_ROOT +
+ REMOTE_DEFAULT_DIR);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log");
- FileSystem fs = FileSystem.get(conf);
+ setConf(conf);
+ }
+
+ private void verifyFileControllerInstance(
+ LogAggregationFileControllerFactory factory,
+ Class<? extends LogAggregationFileController> className)
+ throws IOException {
+ List<LogAggregationFileController> fileControllers =
+ factory.getConfiguredLogAggregationFileControllerList();
+ FileSystem fs = FileSystem.get(getConf());
+ Path logPath = fileControllers.get(0).getRemoteAppLogDir(appId, APP_OWNER);
+ LOG.debug("Checking " + logPath);
- LogAggregationFileControllerFactory factory =
- new LogAggregationFileControllerFactory(conf);
- LinkedList<LogAggregationFileController> list = factory
- .getConfiguredLogAggregationFileControllerList();
- assertTrue(list.size() == 1);
- assertTrue(list.getFirst() instanceof LogAggregationTFileController);
- assertTrue(factory.getFileControllerForWrite()
- instanceof LogAggregationTFileController);
- Path logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
try {
if (fs.exists(logPath)) {
fs.delete(logPath, true);
}
assertTrue(fs.mkdirs(logPath));
- Writer writer =
- new FileWriter(new File(logPath.toString(), "testLog"));
- writer.write("test");
- writer.close();
- assertTrue(factory.getFileControllerForRead(appId, appOwner)
- instanceof LogAggregationTFileController);
+ try (Writer writer =
+ new FileWriter(new File(logPath.toString(), "testLog"))) {
+ writer.write("test");
+ }
+ assertTrue("The used LogAggregationFileController is not instance of "
+ + className.getSimpleName(), className.isInstance(
+ factory.getFileControllerForRead(appId, APP_OWNER)));
} finally {
fs.delete(logPath, true);
}
+ }
+
+ @Test
+ public void testDefaultLogAggregationFileControllerFactory()
+ throws IOException {
+ LogAggregationFileControllerFactory factory =
+ new LogAggregationFileControllerFactory(getConf());
+ List<LogAggregationFileController> list = factory
+ .getConfiguredLogAggregationFileControllerList();
+
+ assertEquals("Only one LogAggregationFileController is expected!", 1,
+ list.size());
+ assertTrue("TFile format is expected to be the first " +
+ "LogAggregationFileController!", list.get(0) instanceof
+ LogAggregationTFileController);
+ assertTrue("TFile format is expected to be used for writing!",
+ factory.getFileControllerForWrite() instanceof
+ LogAggregationTFileController);
+
+ verifyFileControllerInstance(factory, LogAggregationTFileController.class);
+ }
+ @Test(expected = Exception.class)
+ public void testLogAggregationFileControllerFactoryClassNotSet() {
+ Configuration conf = getConf();
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
"TestLogAggregationFileController");
- // Did not set class for TestLogAggregationFileController,
- // should get the exception.
- try {
- factory =
- new LogAggregationFileControllerFactory(conf);
- fail();
- } catch (Exception ex) {
- // should get exception
- }
+ new LogAggregationFileControllerFactory(conf);
+ fail("TestLogAggregationFileController's class was not set, " +
+ "but the factory creation did not fail.");
+ }
+ private void enableFileControllers(
+ List<Class<? extends LogAggregationFileController>> fileControllers,
+ List<String> fileControllerNames) {
+ Configuration conf = getConf();
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
- "TestLogAggregationFileController,TFile");
- conf.setClass(
- "yarn.log-aggregation.file-controller.TestLogAggregationFileController"
- + ".class", TestLogAggregationFileController.class,
- LogAggregationFileController.class);
-
- conf.set(
- "yarn.log-aggregation.TestLogAggregationFileController"
- + ".remote-app-log-dir", remoteLogRootDir);
- conf.set(
- "yarn.log-aggregation.TestLogAggregationFileController"
- + ".remote-app-log-dir-suffix", "testLog");
-
- factory = new LogAggregationFileControllerFactory(conf);
- list = factory.getConfiguredLogAggregationFileControllerList();
- assertTrue(list.size() == 2);
- assertTrue(list.getFirst() instanceof TestLogAggregationFileController);
- assertTrue(list.getLast() instanceof LogAggregationTFileController);
- assertTrue(factory.getFileControllerForWrite()
- instanceof TestLogAggregationFileController);
-
- logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
- try {
- if (fs.exists(logPath)) {
- fs.delete(logPath, true);
- }
- assertTrue(fs.mkdirs(logPath));
- Writer writer =
- new FileWriter(new File(logPath.toString(), "testLog"));
- writer.write("test");
- writer.close();
- assertTrue(factory.getFileControllerForRead(appId, appOwner)
- instanceof TestLogAggregationFileController);
- } finally {
- fs.delete(logPath, true);
+ StringUtils.join(fileControllerNames, ","));
+ for (int i = 0; i < fileControllers.size(); i++) {
+ Class<? extends LogAggregationFileController> fileController =
+ fileControllers.get(i);
+ String controllerName = fileControllerNames.get(i);
+
+ conf.setClass(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT,
+ controllerName), fileController, LogAggregationFileController.class);
+ conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
+ controllerName), REMOTE_LOG_ROOT + controllerName + "/");
+ conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
+ controllerName), controllerName);
}
}
+ @Test
+ public void testLogAggregationFileControllerFactory() throws Exception {
+ enableFileControllers(ALL_FILE_CONTROLLERS, ALL_FILE_CONTROLLER_NAMES);
+ LogAggregationFileControllerFactory factory =
+ new LogAggregationFileControllerFactory(getConf());
+ List<LogAggregationFileController> list =
+ factory.getConfiguredLogAggregationFileControllerList();
+
+ assertEquals("The expected number of LogAggregationFileController " +
+ "is not 3!", 3, list.size());
+ assertTrue("Test format is expected to be the first " +
+ "LogAggregationFileController!", list.get(0) instanceof
+ TestLogAggregationFileController);
+ assertTrue("IFile format is expected to be the second " +
+ "LogAggregationFileController!", list.get(1) instanceof
+ LogAggregationIndexedFileController);
+ assertTrue("TFile format is expected to be the first " +
+ "LogAggregationFileController!", list.get(2) instanceof
+ LogAggregationTFileController);
+ assertTrue("Test format is expected to be used for writing!",
+ factory.getFileControllerForWrite() instanceof
+ TestLogAggregationFileController);
+
+ verifyFileControllerInstance(factory,
+ TestLogAggregationFileController.class);
+ }
+
+ @Test
+ public void testClassConfUsed() {
+ enableFileControllers(Collections.singletonList(
+ LogAggregationTFileController.class),
+ Collections.singletonList("TFile"));
+ LogAggregationFileControllerFactory factory =
+ new LogAggregationFileControllerFactory(getConf());
+ LogAggregationFileController fc = factory.getFileControllerForWrite();
+
+ assertEquals(WRONG_ROOT_LOG_DIR_MSG, "target/app-logs/TFile",
+ fc.getRemoteRootLogDir().toString());
+ assertEquals(WRONG_ROOT_LOG_DIR_SUFFIX_MSG, "TFile",
+ fc.getRemoteRootLogDirSuffix());
+ }
+
+ @Test
+ public void testNodemanagerConfigurationIsUsed() {
+ Configuration conf = getConf();
+ conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
+ LogAggregationFileControllerFactory factory =
+ new LogAggregationFileControllerFactory(conf);
+ LogAggregationFileController fc = factory.getFileControllerForWrite();
+
+ assertEquals(WRONG_ROOT_LOG_DIR_MSG, "target/app-logs/default",
+ fc.getRemoteRootLogDir().toString());
+ assertEquals(WRONG_ROOT_LOG_DIR_SUFFIX_MSG, "log-tfile",
+ fc.getRemoteRootLogDirSuffix());
+ }
+
+ @Test
+ public void testDefaultConfUsed() {
+ Configuration conf = getConf();
+ conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR);
+ conf.unset(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX);
+ conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
+
+ LogAggregationFileControllerFactory factory =
+ new LogAggregationFileControllerFactory(getConf());
+ LogAggregationFileController fc = factory.getFileControllerForWrite();
+
+ assertEquals(WRONG_ROOT_LOG_DIR_MSG, "/tmp/logs",
+ fc.getRemoteRootLogDir().toString());
+ assertEquals(WRONG_ROOT_LOG_DIR_SUFFIX_MSG, "logs-tfile",
+ fc.getRemoteRootLogDirSuffix());
+ }
+
private static class TestLogAggregationFileController
extends LogAggregationFileController {
@Override
public void initInternal(Configuration conf) {
- String remoteDirStr = String.format(
- YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
- this.fileControllerName);
- this.remoteRootLogDir = new Path(conf.get(remoteDirStr));
- String suffix = String.format(
- YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
- this.fileControllerName);
- this.remoteRootLogDirSuffix = conf.get(suffix);
+ // Do Nothing
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]