This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new d590745 YARN-9573. DistributedShell cannot specify
LogAggregationContext. Contributed by Adam Antal.
d590745 is described below
commit d5907450463ab2a3436ff4f28918d6253159cd76
Author: Szilard Nemeth <[email protected]>
AuthorDate: Thu Jul 11 19:54:31 2019 +0200
YARN-9573. DistributedShell cannot specify LogAggregationContext.
Contributed by Adam Antal.
---
.../yarn/applications/distributedshell/Client.java | 47 ++++++++++++++++------
.../distributedshell/TestDistributedShell.java | 29 ++++++++++++-
2 files changed, 62 insertions(+), 14 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 9c1d8fc..1d225d2 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -65,6 +65,7 @@ import
org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -93,6 +94,7 @@ import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -196,7 +198,9 @@ public class Client {
private String placementSpec = "";
// log4j.properties file
// if available, add to local resources and set into classpath
- private String log4jPropFile = "";
+ private String log4jPropFile = "";
+ // rolling
+ private String rollingFilesPattern = "";
// Start time for client
private final long clientStartTime = System.currentTimeMillis();
@@ -271,7 +275,7 @@ public class Client {
}
if (result) {
LOG.info("Application completed successfully");
- System.exit(0);
+ System.exit(0);
}
LOG.error("Application failed to complete successfully");
System.exit(2);
@@ -335,6 +339,8 @@ public class Client {
opts.addOption("enforce_execution_type", false,
"Flag to indicate whether to enforce execution type of containers");
opts.addOption("log_properties", true, "log4j.properties file");
+ opts.addOption("rolling_log_pattern", true,
+ "pattern for files that should be aggregated in a rolling fashion");
opts.addOption("keep_containers_across_application_attempts", false,
"Flag to indicate whether to keep containers across application "
+ "attempts."
@@ -432,6 +438,10 @@ public class Client {
}
}
+ if (cliParser.hasOption("rolling_log_pattern")) {
+ rollingFilesPattern = cliParser.getOptionValue("rolling_log_pattern");
+ }
+
if (cliParser.hasOption("help")) {
printUsage();
return false;
@@ -476,7 +486,7 @@ public class Client {
if (!cliParser.hasOption("jar")) {
throw new IllegalArgumentException("No jar file specified for
application master");
- }
+ }
appMasterJar = cliParser.getOptionValue("jar");
@@ -666,16 +676,16 @@ public class Client {
+ ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
+ ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
+ ", queueApplicationCount=" + queueInfo.getApplications().size()
- + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
+ + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
for (QueueUserACLInfo aclInfo : listAclInfo) {
for (QueueACL userAcl : aclInfo.getUserAcls()) {
LOG.info("User ACL Info for Queue"
- + ", queueName=" + aclInfo.getQueueName()
+ + ", queueName=" + aclInfo.getQueueName()
+ ", userAcl=" + userAcl.name());
}
- }
+ }
if (domainId != null && domainId.length() > 0 && toCreateDomain) {
prepareTimelineDomain();
@@ -772,7 +782,7 @@ public class Client {
// set local resources for the application master
// local files or archives as needed
- // In this scenario, the jar file for the application master is part of
the local resources
+ // In this scenario, the jar file for the application master is part of
the local resources
Map<String, LocalResource> localResources = new HashMap<String,
LocalResource>();
LOG.info("Copy App Master jar from local filesystem and add to local
environment");
@@ -793,7 +803,7 @@ public class Client {
// To do this, we need to first copy into the filesystem that is visible
// to the yarn framework.
// We do not need to set this as a local resource for the application
- // master as the application master does not need it.
+ // master as the application master does not need it.
String hdfsShellScriptLocation = "";
long hdfsShellScriptLen = 0;
long hdfsShellScriptTimestamp = 0;
@@ -837,7 +847,7 @@ public class Client {
env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
}
- // Add AppMaster.jar location to classpath
+ // Add AppMaster.jar location to classpath
// At some point we should not be required to add
// the hadoop specific classpaths to the env.
// It should be provided out of the box.
@@ -935,7 +945,7 @@ public class Client {
LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
- commands.add(command.toString());
+ commands.add(command.toString());
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
@@ -996,6 +1006,8 @@ public class Client {
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);
+ specifyLogAggregationContext(appContext);
+
// Submit the application to the applications manager
// SubmitApplicationResponse submitResp =
applicationsManager.submitApplication(appRequest);
// Ignore the response as either a valid response object is returned on
success
@@ -1013,6 +1025,15 @@ public class Client {
}
+ @VisibleForTesting
+ void specifyLogAggregationContext(ApplicationSubmissionContext appContext) {
+ if (!rollingFilesPattern.isEmpty()) {
+ LogAggregationContext logAggregationContext = LogAggregationContext
+ .newInstance(null, null, rollingFilesPattern, "");
+ appContext.setLogAggregationContext(logAggregationContext);
+ }
+ }
+
/**
* Monitor the submitted application for completion.
* Kill application if time expires.
@@ -1061,9 +1082,9 @@ public class Client {
+ " YarnState=" + state.toString() + ", DSFinalStatus=" +
dsStatus.toString()
+ ". Breaking monitoring loop");
return false;
- }
+ }
}
- else if (YarnApplicationState.KILLED == state
+ else if (YarnApplicationState.KILLED == state
|| YarnApplicationState.FAILED == state) {
LOG.info("Application did not finish."
+ " YarnState=" + state.toString() + ", DSFinalStatus=" +
dsStatus.toString()
@@ -1097,7 +1118,7 @@ public class Client {
// Response can be ignored as it is non-null on success or
// throws an exception in case of failures
- yarnClient.killApplication(appId);
+ yarnClient.killApplication(appId);
}
private void addToLocalResources(FileSystem fs, String fileSrcPath,
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 187d13b..b1358b3 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -17,7 +17,8 @@
*/
package org.apache.hadoop.yarn.applications.distributedshell;
-
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -60,12 +61,14 @@ import
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
@@ -95,6 +98,7 @@ import
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineW
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.After;
import org.junit.Assert;
@@ -956,6 +960,29 @@ public class TestDistributedShell {
Assert.assertTrue(LOG_AM.isDebugEnabled());
}
+ @Test
+ public void testSpecifyingLogAggregationContext() throws Exception {
+ String regex = ".*(foo|bar)\\d";
+ String[] args = {
+ "--jar",
+ APPMASTER_JAR,
+ "--shell_command",
+ "echo",
+ "--rolling_log_pattern",
+ regex
+ };
+ final Client client =
+ new Client(new Configuration(yarnCluster.getConfig()));
+ Assert.assertTrue(client.init(args));
+
+ ApplicationSubmissionContext context =
+ Records.newRecord(ApplicationSubmissionContext.class);
+ client.specifyLogAggregationContext(context);
+ LogAggregationContext logContext = context.getLogAggregationContext();
+ assertEquals(logContext.getRolledLogsIncludePattern(), regex);
+ assertTrue(logContext.getRolledLogsExcludePattern().isEmpty());
+ }
+
public void testDSShellWithCommands() throws Exception {
String[] args = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]