This is an automated email from the ASF dual-hosted git repository.
sunilg pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 30c6dd9 YARN-9452. Fix TestDistributedShell and
TestTimelineAuthFilterForV2 failures. Contributed by Prabhu Joseph.
30c6dd9 is described below
commit 30c6dd92e1d4075d143adc891dc8ec536dddc0d9
Author: Sunil G <[email protected]>
AuthorDate: Thu May 30 22:32:36 2019 +0530
YARN-9452. Fix TestDistributedShell and TestTimelineAuthFilterForV2
failures. Contributed by Prabhu Joseph.
---
.../distributedshell/ApplicationMaster.java | 17 +-
.../yarn/applications/distributedshell/Client.java | 2 +
.../timelineservice/NMTimelinePublisher.java | 12 --
.../security/TestTimelineAuthFilterForV2.java | 177 +++++++++++----------
4 files changed, 107 insertions(+), 101 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index c30dc4d..bb300db 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -226,6 +226,8 @@ public class ApplicationMaster {
@VisibleForTesting
UserGroupInformation appSubmitterUgi;
+ private Path homeDirectory;
+
// Handle to communicate with the Node Manager
private NMClientAsync nmClientAsync;
// Listen to process the response from the Node Manager
@@ -513,6 +515,7 @@ public class ApplicationMaster {
+ "retrieved by"
+ " the new application attempt ");
opts.addOption("localized_files", true, "List of localized files");
+ opts.addOption("homedir", true, "Home Directory of Job Owner");
opts.addOption("help", false, "Print usage");
CommandLine cliParser = new GnuParser().parse(opts, args);
@@ -544,6 +547,11 @@ public class ApplicationMaster {
dumpOutDebugInfo();
}
+ homeDirectory = cliParser.hasOption("homedir") ?
+ new Path(cliParser.getOptionValue("homedir")) :
+ new Path("/user/" + System.getenv(ApplicationConstants.
+ Environment.USER.name()));
+
if (cliParser.hasOption("placement_spec")) {
String placementSpec = cliParser.getOptionValue("placement_spec");
String decodedSpec = getDecodedPlacementSpec(placementSpec);
@@ -779,7 +787,7 @@ public class ApplicationMaster {
@Override
public Void run() throws IOException {
FileSystem fs = FileSystem.get(conf);
- Path dst = new Path(getAppSubmitterHomeDir(),
+ Path dst = new Path(homeDirectory,
getRelativePath(appName, appId.toString(), ""));
fs.delete(dst, true);
return null;
@@ -790,11 +798,6 @@ public class ApplicationMaster {
}
}
- private Path getAppSubmitterHomeDir() {
- return new Path("/user/" +
- System.getenv(ApplicationConstants.Environment.USER.name()));
- }
-
/**
* Main run function for the application master
*
@@ -1495,7 +1498,7 @@ public class ApplicationMaster {
String relativePath =
getRelativePath(appName, appId.toString(), fileName);
Path dst =
- new Path(getAppSubmitterHomeDir(), relativePath);
+ new Path(homeDirectory, relativePath);
FileStatus fileStatus = fs.getFileStatus(dst);
LocalResource localRes = LocalResource.newInstance(
URL.fromURI(dst.toUri()),
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 08c6b83..4bd57dd 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -986,6 +986,8 @@ public class Client {
}
vargs.add("--appname " + appName);
+ vargs.add("--homedir " + fs.getHomeDirectory());
+
vargs.addAll(containerRetryOptions);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
"/AppMaster.stdout");
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index ba57495..5a4de1f 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -273,11 +273,7 @@ public class NMTimelinePublisher extends CompositeService {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.RESUMED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
-
- long containerStartTime = container.getContainerStartTime();
entity.addEvent(tEvent);
- entity
- .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
containerId.getApplicationAttemptId().getApplicationId()));
}
@@ -302,11 +298,7 @@ public class NMTimelinePublisher extends CompositeService {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.PAUSED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
-
- long containerStartTime = container.getContainerStartTime();
entity.addEvent(tEvent);
- entity
- .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
containerId.getApplicationAttemptId().getApplicationId()));
}
@@ -333,11 +325,7 @@ public class NMTimelinePublisher extends CompositeService {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setId(ContainerMetricsConstants.KILLED_EVENT_TYPE);
tEvent.setTimestamp(event.getTimestamp());
-
- long containerStartTime = container.getContainerStartTime();
entity.addEvent(tEvent);
- entity
- .setIdPrefix(TimelineServiceHelper.invertLong(containerStartTime));
dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
containerId.getApplicationAttemptId().getApplicationId()));
}
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
index 95a008a..0c70a5a 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java
@@ -34,14 +34,15 @@ import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.File;
+import java.io.FileFilter;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Callable;
-import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
@@ -100,6 +101,8 @@ public class TestTimelineAuthFilterForV2 {
getKeytabFile());
private static String httpSpnegoPrincipal = KerberosTestUtils.
getServerPrincipal();
+ private static final String ENTITY_TYPE = "dummy_type";
+ private static final AtomicInteger ENTITY_TYPE_SUFFIX = new AtomicInteger(0);
// First param indicates whether HTTPS access or HTTP access and second param
// indicates whether it is kerberos access or token based access.
@@ -274,11 +277,20 @@ public class TestTimelineAuthFilterForV2 {
}
private static void verifyEntity(File entityTypeDir, String id, String type)
- throws IOException {
+ throws InterruptedException, IOException {
File entityFile = new File(entityTypeDir, id +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
+ TimelineEntity entity = null;
+ for (int i = 0; i < 50; i++) {
+ if (entityFile.exists()) {
+ entity = readEntityFile(entityFile);
+ if (entity != null) {
+ break;
+ }
+ }
+ Thread.sleep(50);
+ }
assertTrue(entityFile.exists());
- TimelineEntity entity = readEntityFile(entityFile);
assertNotNull(entity);
assertEquals(id, entity.getId());
assertEquals(type, entity.getType());
@@ -333,7 +345,8 @@ public class TestTimelineAuthFilterForV2 {
@Test
public void testPutTimelineEntities() throws Exception {
- final String entityType = "dummy_type";
+ final String entityType = ENTITY_TYPE +
+ ENTITY_TYPE_SUFFIX.getAndIncrement();
ApplicationId appId = ApplicationId.newInstance(0, 1);
File entityTypeDir = new File(TEST_ROOT_DIR.getAbsolutePath() +
File.separator + "entities" + File.separator +
@@ -342,92 +355,92 @@ public class TestTimelineAuthFilterForV2 {
File.separator + "test_flow_name" + File.separator +
"test_flow_version" + File.separator + "1" + File.separator +
appId.toString() + File.separator + entityType);
- try {
- if (withKerberosLogin) {
- KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- publishAndVerifyEntity(appId, entityTypeDir, entityType, 1);
- return null;
- }
- });
- } else {
- assertTrue("Entities should have been published successfully.",
- publishWithRetries(appId, entityTypeDir, entityType, 1));
-
- AppLevelTimelineCollector collector =
- (AppLevelTimelineCollector) collectorManager.get(appId);
- Token<TimelineDelegationTokenIdentifier> token =
- collector.getDelegationTokenForApp();
- assertNotNull(token);
+ if (withKerberosLogin) {
+ KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ publishAndVerifyEntity(appId, entityTypeDir, entityType, 1);
+ return null;
+ }
+ });
+ } else {
+ assertTrue("Entities should have been published successfully.",
+ publishWithRetries(appId, entityTypeDir, entityType, 1));
- // Verify if token is renewed automatically and entities can still be
- // published.
- Thread.sleep(1000);
- // Entities should publish successfully after renewal.
- assertTrue("Entities should have been published successfully.",
- publishWithRetries(appId, entityTypeDir, entityType, 2));
- assertNotNull(collector);
- verify(collectorManager.getTokenManagerService(), atLeastOnce()).
- renewToken(eq(collector.getDelegationTokenForApp()),
- any(String.class));
+ AppLevelTimelineCollector collector =
+ (AppLevelTimelineCollector) collectorManager.get(appId);
+ Token<TimelineDelegationTokenIdentifier> token =
+ collector.getDelegationTokenForApp();
+ assertNotNull(token);
- // Wait to ensure lifetime of token expires and ensure its regenerated
- // automatically.
- Thread.sleep(3000);
- for (int i = 0; i < 40; i++) {
- if (!token.equals(collector.getDelegationTokenForApp())) {
- break;
- }
- Thread.sleep(50);
- }
- assertNotEquals("Token should have been regenerated.", token,
- collector.getDelegationTokenForApp());
- Thread.sleep(1000);
- // Try publishing with the old token in UGI. Publishing should fail due
- // to invalid token.
- try {
- publishAndVerifyEntity(appId, entityTypeDir, entityType, 2);
- fail("Exception should have been thrown due to Invalid Token.");
- } catch (YarnException e) {
- assertTrue("Exception thrown should have been due to Invalid Token.",
- e.getCause().getMessage().contains("InvalidToken"));
- }
+ // Verify if token is renewed automatically and entities can still be
+ // published.
+ Thread.sleep(1000);
+ // Entities should publish successfully after renewal.
+ assertTrue("Entities should have been published successfully.",
+ publishWithRetries(appId, entityTypeDir, entityType, 2));
+ assertNotNull(collector);
+ verify(collectorManager.getTokenManagerService(), atLeastOnce()).
+ renewToken(eq(collector.getDelegationTokenForApp()),
+ any(String.class));
- // Update the regenerated token in UGI and retry publishing entities.
- Token<TimelineDelegationTokenIdentifier> regeneratedToken =
- collector.getDelegationTokenForApp();
- regeneratedToken.setService(new Text("localhost" +
- regeneratedToken.getService().toString().substring(
- regeneratedToken.getService().toString().indexOf(":"))));
- UserGroupInformation.getCurrentUser().addToken(regeneratedToken);
- assertTrue("Entities should have been published successfully.",
- publishWithRetries(appId, entityTypeDir, entityType, 2));
- // Token was generated twice, once when app collector was created and
- // later after token lifetime expiry.
- verify(collectorManager.getTokenManagerService(), times(2)).
- generateToken(any(UserGroupInformation.class), any(String.class));
- assertEquals(1, ((DummyNodeTimelineCollectorManager) collectorManager).
- getTokenExpiredCnt());
- }
- // Wait for async entity to be published.
- for (int i = 0; i < 50; i++) {
- if (entityTypeDir.listFiles().length == 2) {
+ // Wait to ensure lifetime of token expires and ensure its regenerated
+ // automatically.
+ Thread.sleep(3000);
+ for (int i = 0; i < 40; i++) {
+ if (!token.equals(collector.getDelegationTokenForApp())) {
break;
}
Thread.sleep(50);
}
- assertEquals(2, entityTypeDir.listFiles().length);
- verifyEntity(entityTypeDir, "entity2", entityType);
- AppLevelTimelineCollector collector =
- (AppLevelTimelineCollector)collectorManager.get(appId);
- assertNotNull(collector);
- auxService.removeApplication(appId);
- verify(collectorManager.getTokenManagerService()).cancelToken(
- eq(collector.getDelegationTokenForApp()), any(String.class));
- } finally {
- FileUtils.deleteQuietly(entityTypeDir);
+ assertNotEquals("Token should have been regenerated.", token,
+ collector.getDelegationTokenForApp());
+ Thread.sleep(1000);
+ // Try publishing with the old token in UGI. Publishing should fail due
+ // to invalid token.
+ try {
+ publishAndVerifyEntity(appId, entityTypeDir, entityType, 2);
+ fail("Exception should have been thrown due to Invalid Token.");
+ } catch (YarnException e) {
+ assertTrue("Exception thrown should have been due to Invalid Token.",
+ e.getCause().getMessage().contains("InvalidToken"));
+ }
+
+ // Update the regenerated token in UGI and retry publishing entities.
+ Token<TimelineDelegationTokenIdentifier> regeneratedToken =
+ collector.getDelegationTokenForApp();
+ regeneratedToken.setService(new Text("localhost" +
+ regeneratedToken.getService().toString().substring(
+ regeneratedToken.getService().toString().indexOf(":"))));
+ UserGroupInformation.getCurrentUser().addToken(regeneratedToken);
+ assertTrue("Entities should have been published successfully.",
+ publishWithRetries(appId, entityTypeDir, entityType, 2));
+ // Token was generated twice, once when app collector was created and
+ // later after token lifetime expiry.
+ verify(collectorManager.getTokenManagerService(), times(2)).
+ generateToken(any(UserGroupInformation.class), any(String.class));
+ assertEquals(1, ((DummyNodeTimelineCollectorManager) collectorManager).
+ getTokenExpiredCnt());
+ }
+ // Wait for async entity to be published.
+ FileFilter tmpFilter = (pathname -> !pathname.getName().endsWith(".tmp"));
+ File[] entities = null;
+ for (int i = 0; i < 50; i++) {
+ entities = entityTypeDir.listFiles(tmpFilter);
+ if (entities != null && entities.length == 2) {
+ break;
+ }
+ Thread.sleep(50);
}
+ assertNotNull("Error reading entityTypeDir", entities);
+ assertEquals(2, entities.length);
+ verifyEntity(entityTypeDir, "entity2", entityType);
+ AppLevelTimelineCollector collector =
+ (AppLevelTimelineCollector)collectorManager.get(appId);
+ assertNotNull(collector);
+ auxService.removeApplication(appId);
+ verify(collectorManager.getTokenManagerService()).cancelToken(
+ eq(collector.getDelegationTokenForApp()), any(String.class));
}
private static class DummyNodeTimelineCollectorManager extends
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]