This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8f6be3678d1 MAPREDUCE-7434. Fix ShuffleHandler tests. Contributed by 
Tamas Domok
8f6be3678d1 is described below

commit 8f6be3678d1113e3e7f5477c357fc81f62d460b8
Author: Szilard Nemeth <snem...@apache.org>
AuthorDate: Wed Mar 1 16:10:05 2023 +0100

    MAPREDUCE-7434. Fix ShuffleHandler tests. Contributed by Tamas Domok
---
 .../hadoop/mapred/TestShuffleChannelHandler.java   |  2 +-
 .../apache/hadoop/mapred/TestShuffleHandler.java   | 44 +++++++++++++++-------
 .../hadoop/mapred/TestShuffleHandlerBase.java      | 29 +++++++-------
 3 files changed, 47 insertions(+), 28 deletions(-)

diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java
index 7fedc7bb2dc..66fa3de94f8 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java
@@ -225,7 +225,7 @@ public class TestShuffleChannelHandler extends 
TestShuffleHandlerBase {
     final ShuffleTest t = createShuffleTest();
     final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
 
-    String dataFile = getDataFile(tempDir.toAbsolutePath().toString(), 
TEST_ATTEMPT_2);
+    String dataFile = getDataFile(TEST_USER, 
tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2);
     assertTrue("should delete", new File(dataFile).delete());
 
     FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0,
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index 37a9210286c..cc46b49b113 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -29,6 +29,7 @@ import static 
org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -41,6 +42,7 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
@@ -159,7 +161,7 @@ public class TestShuffleHandler extends 
TestShuffleHandlerBase {
     shuffleHandler.init(conf);
     shuffleHandler.start();
     final String port = 
shuffleHandler.getConfig().get(SHUFFLE_PORT_CONFIG_KEY);
-    final SecretKey secretKey = shuffleHandler.addTestApp();
+    final SecretKey secretKey = shuffleHandler.addTestApp(TEST_USER);
 
     // setup connections
     HttpURLConnection[] conns = new HttpURLConnection[connAttempts];
@@ -237,7 +239,7 @@ public class TestShuffleHandler extends 
TestShuffleHandlerBase {
     shuffleHandler.init(conf);
     shuffleHandler.start();
     final String port = 
shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
-    final SecretKey secretKey = shuffleHandler.addTestApp();
+    final SecretKey secretKey = shuffleHandler.addTestApp(TEST_USER);
 
     HttpURLConnection conn1 = createRequest(
         geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), 
true),
@@ -278,18 +280,34 @@ public class TestShuffleHandler extends 
TestShuffleHandlerBase {
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
     UserGroupInformation.setConfiguration(conf);
 
+    final String randomUser = "randomUser";
+    final String attempt = "attempt_1111111111111_0004_m_000004_0";
+    generateMapOutput(randomUser, tempDir.toAbsolutePath().toString(), attempt,
+            Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A));
+
     ShuffleHandlerMock shuffleHandler = new ShuffleHandlerMock();
     shuffleHandler.init(conf);
     try {
       shuffleHandler.start();
       final String port = 
shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
-      final SecretKey secretKey = shuffleHandler.addTestApp();
+      final SecretKey secretKey = shuffleHandler.addTestApp(randomUser);
 
       HttpURLConnection conn = createRequest(
-          geURL(port, TEST_JOB_ID, 0, 
Collections.singletonList(TEST_ATTEMPT_1), false),
+          geURL(port, TEST_JOB_ID, 0, Collections.singletonList(attempt), 
false),
           secretKey);
       conn.connect();
-      BufferedReader in = new BufferedReader(new 
InputStreamReader(conn.getInputStream()));
+
+      InputStream is = null;
+      try {
+        is = conn.getInputStream();
+      } catch (IOException ioe) {
+        if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+          is = conn.getErrorStream();
+        }
+      }
+
+      assertNotNull(is);
+      BufferedReader in = new BufferedReader(new InputStreamReader(is));
       StringBuilder builder = new StringBuilder();
       String inputLine;
       while ((inputLine = in.readLine()) != null) {
@@ -299,7 +317,7 @@ public class TestShuffleHandler extends 
TestShuffleHandlerBase {
       String receivedString = builder.toString();
 
       //Retrieve file owner name
-      String indexFilePath = getIndexFile(tempDir.toAbsolutePath().toString(), 
TEST_ATTEMPT_1);
+      String indexFilePath = getIndexFile(randomUser, 
tempDir.toAbsolutePath().toString(), attempt);
       String owner;
       try (FileInputStream fis = new FileInputStream(indexFilePath)) {
         owner = NativeIO.POSIX.getFstat(fis.getFD()).getOwner();
@@ -307,11 +325,11 @@ public class TestShuffleHandler extends 
TestShuffleHandlerBase {
 
       String message =
           "Owner '" + owner + "' for path " + indexFilePath
-              + " did not match expected owner '" + TEST_USER + "'";
+              + " did not match expected owner '" + randomUser + "'";
       assertTrue(String.format("Received string '%s' should contain " +
               "message '%s'", receivedString, message),
           receivedString.contains(message));
-      assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
+      assertEquals(HttpURLConnection.HTTP_INTERNAL_ERROR, 
conn.getResponseCode());
       LOG.info("received: " + receivedString);
       assertNotEquals("", receivedString);
     } finally {
@@ -334,7 +352,7 @@ public class TestShuffleHandler extends 
TestShuffleHandlerBase {
       shuffle.init(conf);
       shuffle.start();
       final String port = 
shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
-      final SecretKey secretKey = shuffle.addTestApp();
+      final SecretKey secretKey = shuffle.addTestApp(TEST_USER);
 
       // verify we are authorized to shuffle
       int rc = getShuffleResponseCode(port, secretKey);
@@ -387,7 +405,7 @@ public class TestShuffleHandler extends 
TestShuffleHandlerBase {
       shuffle.init(conf);
       shuffle.start();
       final String port = 
shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
-      final SecretKey secretKey = shuffle.addTestApp();
+      final SecretKey secretKey = shuffle.addTestApp(TEST_USER);
 
       // verify we are authorized to shuffle
       int rc = getShuffleResponseCode(port, secretKey);
@@ -489,14 +507,14 @@ public class TestShuffleHandler extends 
TestShuffleHandlerBase {
 
   class ShuffleHandlerMock extends ShuffleHandler {
 
-    public SecretKey addTestApp() throws IOException {
+    public SecretKey addTestApp(String user) throws IOException {
       DataOutputBuffer outputBuffer = new DataOutputBuffer();
       outputBuffer.reset();
       Token<JobTokenIdentifier> jt = new Token<>(
-          "identifier".getBytes(), "password".getBytes(), new Text(TEST_USER),
+          "identifier".getBytes(), "password".getBytes(), new Text(user),
           new Text("shuffleService"));
       jt.write(outputBuffer);
-      initializeApplication(new ApplicationInitializationContext(TEST_USER, 
TEST_APP_ID,
+      initializeApplication(new ApplicationInitializationContext(user, 
TEST_APP_ID,
           ByteBuffer.wrap(outputBuffer.getData(), 0,
               outputBuffer.getLength())));
 
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java
index 1bce443381d..406f2866230 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java
@@ -55,7 +55,7 @@ public class TestShuffleHandlerBase {
   public static final String TEST_ATTEMPT_2 = 
"attempt_1111111111111_0002_m_000002_0";
   public static final String TEST_ATTEMPT_3 = 
"attempt_1111111111111_0003_m_000003_0";
   public static final String TEST_JOB_ID = "job_1111111111111_0001";
-  public static final String TEST_USER = "testUser";
+  public static final String TEST_USER = System.getProperty("user.name");
   public static final String TEST_DATA_A = "aaaaa";
   public static final String TEST_DATA_B = "bbbbb";
   public static final String TEST_DATA_C = "ccccc";
@@ -70,11 +70,11 @@ public class TestShuffleHandlerBase {
     tempDir = Files.createTempDirectory("test-shuffle-channel-handler");
     tempDir.toFile().deleteOnExit();
 
-    generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1,
+    generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), 
TEST_ATTEMPT_1,
         Arrays.asList(TEST_DATA_A, TEST_DATA_B, TEST_DATA_C));
-    generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2,
+    generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), 
TEST_ATTEMPT_2,
         Arrays.asList(TEST_DATA_B, TEST_DATA_A, TEST_DATA_C));
-    generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_3,
+    generateMapOutput(TEST_USER, tempDir.toAbsolutePath().toString(), 
TEST_ATTEMPT_3,
         Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A));
 
     outputStreamCaptor.reset();
@@ -101,12 +101,13 @@ public class TestShuffleHandlerBase {
     return allMatches;
   }
 
-  public static void generateMapOutput(String tempDir, String attempt, 
List<String> maps)
+  public static void generateMapOutput(String user, String tempDir,
+                                       String attempt, List<String> maps)
       throws IOException {
     SpillRecord record = new SpillRecord(maps.size());
 
-    assertTrue(new File(getBasePath(tempDir, attempt)).mkdirs());
-    try (PrintWriter writer = new PrintWriter(getDataFile(tempDir, attempt), 
"UTF-8")) {
+    assertTrue(new File(getBasePath(user, tempDir, attempt)).mkdirs());
+    try (PrintWriter writer = new PrintWriter(getDataFile(user, tempDir, 
attempt), "UTF-8")) {
       long startOffset = 0;
       int partition = 0;
       for (String map : maps) {
@@ -119,21 +120,21 @@ public class TestShuffleHandlerBase {
         partition++;
         writer.write(map);
       }
-      record.writeToFile(new Path(getIndexFile(tempDir, attempt)),
+      record.writeToFile(new Path(getIndexFile(user, tempDir, attempt)),
           new JobConf(new Configuration()));
     }
   }
 
-  public static String getIndexFile(String tempDir, String attempt) {
-    return String.format("%s/%s", getBasePath(tempDir, attempt), 
INDEX_FILE_NAME);
+  public static String getIndexFile(String user, String tempDir, String 
attempt) {
+    return String.format("%s/%s", getBasePath(user, tempDir, attempt), 
INDEX_FILE_NAME);
   }
 
-  public static String getDataFile(String tempDir, String attempt) {
-    return String.format("%s/%s", getBasePath(tempDir, attempt), 
DATA_FILE_NAME);
+  public static String getDataFile(String user, String tempDir, String 
attempt) {
+    return String.format("%s/%s", getBasePath(user, tempDir, attempt), 
DATA_FILE_NAME);
   }
 
-  private static String getBasePath(String tempDir, String attempt) {
-    return String.format("%s/%s/%s/%s", tempDir, TEST_JOB_ID, TEST_USER, 
attempt);
+  private static String getBasePath(String user, String tempDir, String 
attempt) {
+    return String.format("%s/%s/%s/%s", tempDir, TEST_JOB_ID, user, attempt);
   }
 
   public static String getUri(String jobId, int reduce, List<String> maps, 
boolean keepAlive) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to