shameersss1 commented on a change in pull request #72:
URL: https://github.com/apache/tez/pull/72#discussion_r792268571



##########
File path: 
tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
##########
@@ -1312,6 +1312,87 @@ protected void sendError(ChannelHandlerContext ctx, 
String message,
     }
   }
 
+  @Test(timeout = 5000)
+  public void testFailedTaskAttemptDelete() throws Exception {
+    final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "simple");
+    UserGroupInformation.setConfiguration(conf);
+    File absLogDir = new File("target", TestShuffleHandler.class.
+        getSimpleName() + "LocDir").getAbsoluteFile();
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
+    ApplicationId appId = ApplicationId.newInstance(12345, 1);
+    String appAttemptId = "attempt_12345_1_m_1_0";
+    String user = "randomUser";
+    List<File> fileMap = new ArrayList<File>();
+    createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
+        conf, fileMap);
+    ShuffleHandler shuffleHandler = new ShuffleHandler() {
+      @Override
+      protected Shuffle getShuffle(Configuration conf) {
+        // replace the shuffle handler with one stubbed for testing
+        return new Shuffle(conf) {
+          @Override
+          protected void sendError(ChannelHandlerContext ctx, String message,
+                                   HttpResponseStatus status) {
+            if (failures.size() == 0) {
+              failures.add(new Error(message));
+              ctx.channel().close();
+            }
+          }
+        };
+      }
+    };
+    shuffleHandler.init(conf);
+    try {
+      shuffleHandler.start();
+      DataOutputBuffer outputBuffer = new DataOutputBuffer();
+      outputBuffer.reset();
+      Token<JobTokenIdentifier> jt =
+          new Token<JobTokenIdentifier>("identifier".getBytes(),
+              "password".getBytes(), new Text(user), new 
Text("shuffleService"));
+      jt.write(outputBuffer);
+      shuffleHandler
+          .initializeApplication(new ApplicationInitializationContext(user,
+              appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+              outputBuffer.getLength())));
+      URL url =
+          new URL(
+              "http://127.0.0.1:";
+                  + shuffleHandler.getConfig().get(
+                  ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+                  + 
"/mapOutput?taskAttemptAction=delete&job=job_12345_0001&dag=1&map=" + 
appAttemptId);
+      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      String taskAttemptDirStr =
+          StringUtils.join(Path.SEPARATOR,
+              new String[] {absLogDir.getAbsolutePath(),
+                  ShuffleHandler.USERCACHE, user,
+                  ShuffleHandler.APPCACHE, appId.toString(), "dag_1/output/", 
appAttemptId});
+      File taskAttemptDir = new File(taskAttemptDirStr);
+      Assert.assertTrue("Task Attempt Directory does not exist!", 
taskAttemptDir.exists());

Review comment:
       ack!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to