[ 
https://issues.apache.org/jira/browse/HADOOP-15327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17631651#comment-17631651
 ] 

ASF GitHub Bot commented on HADOOP-15327:
-----------------------------------------

szilard-nemeth commented on code in PR #3259:
URL: https://github.com/apache/hadoop/pull/3259#discussion_r1019104663


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java:
##########
@@ -668,34 +1357,61 @@ protected ChannelFuture 
sendMapOutput(ChannelHandlerContext ctx,
       conns[i].connect();
     }
 
-    //Ensure first connections are okay
-    conns[0].getInputStream();
-    int rc = conns[0].getResponseCode();
-    Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
-    
-    conns[1].getInputStream();
-    rc = conns[1].getResponseCode();
-    Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
-
-    // This connection should be closed because it to above the limit
-    try {
-      rc = conns[2].getResponseCode();
-      Assert.assertEquals("Expected a too-many-requests response code",
-          ShuffleHandler.TOO_MANY_REQ_STATUS.getCode(), rc);
-      long backoff = Long.valueOf(
-          conns[2].getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER));
-      Assert.assertTrue("The backoff value cannot be negative.", backoff > 0);
-      conns[2].getInputStream();
-      Assert.fail("Expected an IOException");
-    } catch (IOException ioe) {
-      LOG.info("Expected - connection should not be open");
-    } catch (NumberFormatException ne) {
-      Assert.fail("Expected a numerical value for RETRY_AFTER header field");
-    } catch (Exception e) {
-      Assert.fail("Expected a IOException");
+    Map<Integer, List<HttpURLConnection>> mapOfConnections = Maps.newHashMap();
+    for (HttpURLConnection conn : conns) {
+      try {
+        conn.getInputStream();
+      } catch (IOException ioe) {
+        LOG.info("Expected - connection should not be open");
+      } catch (NumberFormatException ne) {
+        fail("Expected a numerical value for RETRY_AFTER header field");
+      } catch (Exception e) {
+        fail("Expected a IOException");
+      }
+      int statusCode = conn.getResponseCode();
+      LOG.debug("Connection status code: {}", statusCode);
+      mapOfConnections.putIfAbsent(statusCode, new ArrayList<>());
+      List<HttpURLConnection> connectionList = 
mapOfConnections.get(statusCode);
+      connectionList.add(conn);
     }
+
+    assertEquals(String.format("Expected only %s and %s response",
+            OK_STATUS, ShuffleHandler.TOO_MANY_REQ_STATUS),
+        Sets.newHashSet(
+            HttpURLConnection.HTTP_OK,
+            ShuffleHandler.TOO_MANY_REQ_STATUS.code()),
+        mapOfConnections.keySet());
     
-    shuffleHandler.stop(); 
+    List<HttpURLConnection> successfulConnections =
+        mapOfConnections.get(HttpURLConnection.HTTP_OK);
+    assertEquals(String.format("Expected exactly %d requests " +
+            "with %s response", maxAllowedConnections, OK_STATUS),
+        maxAllowedConnections, successfulConnections.size());
+
+    //Ensure exactly one connection is HTTP 429 (TOO MANY REQUESTS)
+    List<HttpURLConnection> closedConnections =
+        mapOfConnections.get(ShuffleHandler.TOO_MANY_REQ_STATUS.code());
+    assertEquals(String.format("Expected exactly %d %s response",
+            notAcceptedConnections, ShuffleHandler.TOO_MANY_REQ_STATUS),
+        notAcceptedConnections, closedConnections.size());
+
+    // This connection should be closed because it is above the maximum limit
+    HttpURLConnection conn = closedConnections.get(0);
+    assertEquals(String.format("Expected a %s response",
+            ShuffleHandler.TOO_MANY_REQ_STATUS),
+        ShuffleHandler.TOO_MANY_REQ_STATUS.code(), conn.getResponseCode());
+    long backoff = Long.parseLong(
+        conn.getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER));
+    assertTrue("The backoff value cannot be negative.", backoff > 0);
+
+    shuffleHandler.stop();
+
+    //It's okay to get a ClosedChannelException.
+    //All other kinds of exceptions means something went wrong
+    assertEquals("Should have no caught exceptions",
+        Collections.emptyList(), failures.stream()
+            .filter(f -> !(f instanceof ClosedChannelException))
+            .collect(toList()));

Review Comment:
   Thanks for this comment. It's indeed a wrong thing that the InputStream is 
not closed.
   I'd suggest to create a separate jira to refactor this class, as it's 
currently in a horrible shape.





> Upgrade MR ShuffleHandler to use Netty4
> ---------------------------------------
>
>                 Key: HADOOP-15327
>                 URL: https://issues.apache.org/jira/browse/HADOOP-15327
>             Project: Hadoop Common
>          Issue Type: Sub-task
>            Reporter: Xiaoyu Yao
>            Assignee: Szilard Nemeth
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HADOOP-15327.001.patch, HADOOP-15327.002.patch, 
> HADOOP-15327.003.patch, HADOOP-15327.004.patch, HADOOP-15327.005.patch, 
> HADOOP-15327.005.patch, 
> getMapOutputInfo_BlockingOperationException_awaitUninterruptibly.log, 
> hades-results-20221108.zip, testfailure-testMapFileAccess-emptyresponse.zip, 
> testfailure-testReduceFromPartialMem.zip
>
>          Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> This way, we can remove the dependencies on the netty3 (jboss.netty)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to