This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 3fe502d68 TEZ-4441: TezAppMaster may stuck because of reportError skip 
send error event (#236) (zhengchenyu reviewed by Laszlo Bodor)
3fe502d68 is described below

commit 3fe502d687d1f3f1e8fd698a7ea129cb49005bce
Author: zhengchenyu <[email protected]>
AuthorDate: Mon Aug 29 19:52:29 2022 +0800

    TEZ-4441: TezAppMaster may stuck because of reportError skip send error 
event (#236) (zhengchenyu reviewed by Laszlo Bodor)
---
 .../java/org/apache/tez/dag/app/DAGAppMaster.java  |  2 +-
 .../tez/dag/app/rm/TaskSchedulerManager.java       |  8 +-
 .../tez/dag/app/rm/TestTaskSchedulerHelpers.java   | 17 +++++
 .../tez/dag/app/rm/TestTaskSchedulerManager.java   | 89 ++++++++++++++++++++++
 4 files changed, 114 insertions(+), 2 deletions(-)

diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index f1486e8ba..ffbf0976c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -2663,7 +2663,7 @@ public class DAGAppMaster extends AbstractService {
   }
 
   @VisibleForTesting
-  static void parseAllPlugins(
+  public static void parseAllPlugins(
       List<NamedEntityDescriptor> taskSchedulerDescriptors, BiMap<String, 
Integer> taskSchedulerPluginMap,
       List<NamedEntityDescriptor> containerLauncherDescriptors, BiMap<String, 
Integer> containerLauncherPluginMap,
       List<NamedEntityDescriptor> taskCommDescriptors, BiMap<String, Integer> 
taskCommPluginMap,
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index cc2e16372..4e919db8b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -908,7 +908,8 @@ public class TaskSchedulerManager extends AbstractService 
implements
       LOG.info("Error reported by scheduler {} - {}",
           Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, 
appContext) + ": " +
               diagnostics);
-      if 
(taskSchedulerDescriptors[taskSchedulerIndex].getClassName().equals(yarnSchedulerClassName))
 {
+      if (taskSchedulerDescriptors[taskSchedulerIndex].getEntityName()
+          .equals(TezConstants.getTezYarnServicePluginName())) {
         LOG.warn(
             "Reporting a SchedulerServiceError to the DAGAppMaster since the 
error" +
                 " was reported by the YARN task scheduler");
@@ -1076,4 +1077,9 @@ public class TaskSchedulerManager extends AbstractService 
implements
     return 
taskSchedulers[taskSchedulerIndex].getTaskScheduler().getClass().getName();
   }
 
+  @VisibleForTesting
+  public TaskScheduler getTaskScheduler(int taskSchedulerIndex) {
+    return taskSchedulers[taskSchedulerIndex].getTaskScheduler();
+  }
+
 }
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index b7acc6876..490067a54 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -161,6 +161,19 @@ class TestTaskSchedulerHelpers {
       this.defaultPayload = defaultPayload;
     }
 
+    TaskSchedulerManagerForTest(AppContext appContext,
+                                       EventHandler eventHandler,
+                                       
TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
+                                       ContainerSignatureMatcher 
containerSignatureMatcher,
+                                       UserPayload defaultPayload,
+                                       List<NamedEntityDescriptor> 
descriptors) {
+      super(appContext, null, eventHandler, containerSignatureMatcher, null, 
descriptors,
+          false, new 
HadoopShimsLoader(appContext.getAMConf()).getHadoopShim());
+      this.amrmClientAsync = amrmClientAsync;
+      this.containerSignatureMatcher = containerSignatureMatcher;
+      this.defaultPayload = defaultPayload;
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void instantiateSchedulers(String host, int port, String 
trackingUrl,
@@ -224,6 +237,10 @@ class TestTaskSchedulerHelpers {
       fail("Expected Event: " + eventClass.getName() + " not sent");
       return null;
     }
+
+    public int getEventSize() {
+      return this.events.size();
+    }
   }
 
   static class TaskSchedulerWithDrainableContext extends 
YarnTaskSchedulerService {
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index dcf9a5dd6..e416c6570 100644
--- 
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -45,11 +45,17 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -59,6 +65,8 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tez.common.ContainerSignatureMatcher;
@@ -72,10 +80,16 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
 import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.dag.app.DAGAppMasterState;
 import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.TaskAttempt;
+import 
org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
 import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
@@ -89,6 +103,8 @@ import 
org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.AMContainerState;
+import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
+import org.apache.tez.dag.app.rm.node.AMNodeTracker;
 import org.apache.tez.dag.app.web.WebUIService;
 import org.apache.tez.dag.helpers.DagInfoImplForTest;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@@ -839,6 +855,60 @@ public class TestTaskSchedulerManager {
     }
   }
 
+  @Test(timeout = 10000)
+  public void testHandleException() throws Exception {
+    Configuration tezConf = new Configuration(new YarnConfiguration());
+    UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(tezConf);
+
+    // Parse plugins
+    List<NamedEntityDescriptor> tsDescriptors = Lists.newLinkedList();
+    BiMap<String, Integer> tsMap = HashBiMap.create();
+    DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, Lists.newLinkedList(), 
HashBiMap.create(), Lists.newLinkedList(),
+        HashBiMap.create(), null, false, defaultPayload);
+
+    // Only TezYarn found.
+    Assert.assertEquals(1, tsDescriptors.size());
+    Assert.assertEquals(TezConstants.getTezYarnServicePluginName(), 
tsDescriptors.get(0).getEntityName());
+
+    // Construct eventHandler
+    TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new 
TestTaskSchedulerHelpers.CapturingEventHandler();
+    TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+
+    // Construct AMRMClient
+    AMRMClient<YarnTaskSchedulerService.CookieContainerRequest> rmClientCore =
+        new TestTaskSchedulerHelpers.AMRMClientForTest();
+    TezAMRMClientAsync<YarnTaskSchedulerService.CookieContainerRequest> 
rmClient =
+        spy(new TestTaskSchedulerHelpers.AMRMClientAsyncForTest(rmClientCore, 
100));
+
+    // Construct appContext
+    AppContext appContext = mock(AppContext.class);
+    doReturn(new Configuration(false)).when(appContext).getAMConf();
+    AMContainerMap amContainerMap = new 
AMContainerMap(mock(ContainerHeartbeatHandler.class),
+        mock(TaskCommunicatorManagerInterface.class), new 
ContainerContextMatcher(), appContext);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+    doReturn(amContainerMap).when(appContext).getAllContainers();
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
+    doReturn(dagID).when(appContext).getCurrentDAGID();
+    doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
+
+    // Construct TaskSchedulerManager
+    TaskSchedulerManager taskSchedulerManagerReal =
+        new TestTaskSchedulerHelpers.TaskSchedulerManagerForTest(appContext, 
eventHandler, rmClient,
+            new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher(), 
defaultPayload, tsDescriptors);
+    TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+    taskSchedulerManager.init(tezConf);
+    taskSchedulerManager.start();
+
+    // Send error to schedule, then expect 
DAGAppMasterEventSchedulingServiceError event.
+    YarnTaskSchedulerService scheduler = ((YarnTaskSchedulerService) 
taskSchedulerManager.getTaskScheduler(0));
+    scheduler.onError(new Exception("Trigger by unit test"));
+    waitFor(() -> {
+      return eventHandler.getEventSize() > 0;
+    }, 1000, 5000);
+    
eventHandler.verifyInvocation(DAGAppMasterEventSchedulingServiceError.class);
+  }
+
   private static class ExceptionAnswer implements Answer {
     @Override
     public Object answer(InvocationOnMock invocation) throws Throwable {
@@ -1107,4 +1177,23 @@ public class TestTaskSchedulerManager {
     public void dagComplete() throws ServicePluginException {
     }
   }
+
+  public static void waitFor(Supplier<Boolean> check, int checkEveryMillis,
+                             int waitForMillis) throws TimeoutException, 
InterruptedException {
+    Preconditions.checkNotNull(check, "Input supplier interface should be 
initailized");
+    Preconditions.checkArgument(waitForMillis >= checkEveryMillis,
+        "Total wait time should be greater than check interval time");
+
+    long st = Time.monotonicNow();
+    boolean result = check.get();
+
+    while (!result && (Time.monotonicNow() - st < waitForMillis)) {
+      Thread.sleep(checkEveryMillis);
+      result = check.get();
+    }
+
+    if (!result) {
+      throw new TimeoutException("Timed out waiting for condition.");
+    }
+  }
 }

Reply via email to