This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 3fe502d68 TEZ-4441: TezAppMaster may stuck because of reportError skip
send error event (#236) (zhengchenyu reviewed by Laszlo Bodor)
3fe502d68 is described below
commit 3fe502d687d1f3f1e8fd698a7ea129cb49005bce
Author: zhengchenyu <[email protected]>
AuthorDate: Mon Aug 29 19:52:29 2022 +0800
TEZ-4441: TezAppMaster may stuck because of reportError skip send error
event (#236) (zhengchenyu reviewed by Laszlo Bodor)
---
.../java/org/apache/tez/dag/app/DAGAppMaster.java | 2 +-
.../tez/dag/app/rm/TaskSchedulerManager.java | 8 +-
.../tez/dag/app/rm/TestTaskSchedulerHelpers.java | 17 +++++
.../tez/dag/app/rm/TestTaskSchedulerManager.java | 89 ++++++++++++++++++++++
4 files changed, 114 insertions(+), 2 deletions(-)
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index f1486e8ba..ffbf0976c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -2663,7 +2663,7 @@ public class DAGAppMaster extends AbstractService {
}
@VisibleForTesting
- static void parseAllPlugins(
+ public static void parseAllPlugins(
List<NamedEntityDescriptor> taskSchedulerDescriptors, BiMap<String,
Integer> taskSchedulerPluginMap,
List<NamedEntityDescriptor> containerLauncherDescriptors, BiMap<String,
Integer> containerLauncherPluginMap,
List<NamedEntityDescriptor> taskCommDescriptors, BiMap<String, Integer>
taskCommPluginMap,
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index cc2e16372..4e919db8b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -908,7 +908,8 @@ public class TaskSchedulerManager extends AbstractService
implements
LOG.info("Error reported by scheduler {} - {}",
Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex,
appContext) + ": " +
diagnostics);
- if
(taskSchedulerDescriptors[taskSchedulerIndex].getClassName().equals(yarnSchedulerClassName))
{
+ if (taskSchedulerDescriptors[taskSchedulerIndex].getEntityName()
+ .equals(TezConstants.getTezYarnServicePluginName())) {
LOG.warn(
"Reporting a SchedulerServiceError to the DAGAppMaster since the
error" +
" was reported by the YARN task scheduler");
@@ -1076,4 +1077,9 @@ public class TaskSchedulerManager extends AbstractService
implements
return
taskSchedulers[taskSchedulerIndex].getTaskScheduler().getClass().getName();
}
+ @VisibleForTesting
+ public TaskScheduler getTaskScheduler(int taskSchedulerIndex) {
+ return taskSchedulers[taskSchedulerIndex].getTaskScheduler();
+ }
+
}
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index b7acc6876..490067a54 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -161,6 +161,19 @@ class TestTaskSchedulerHelpers {
this.defaultPayload = defaultPayload;
}
+ TaskSchedulerManagerForTest(AppContext appContext,
+ EventHandler eventHandler,
+
TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
+ ContainerSignatureMatcher
containerSignatureMatcher,
+ UserPayload defaultPayload,
+ List<NamedEntityDescriptor>
descriptors) {
+ super(appContext, null, eventHandler, containerSignatureMatcher, null,
descriptors,
+ false, new
HadoopShimsLoader(appContext.getAMConf()).getHadoopShim());
+ this.amrmClientAsync = amrmClientAsync;
+ this.containerSignatureMatcher = containerSignatureMatcher;
+ this.defaultPayload = defaultPayload;
+ }
+
@SuppressWarnings("unchecked")
@Override
public void instantiateSchedulers(String host, int port, String
trackingUrl,
@@ -224,6 +237,10 @@ class TestTaskSchedulerHelpers {
fail("Expected Event: " + eventClass.getName() + " not sent");
return null;
}
+
+ public int getEventSize() {
+ return this.events.size();
+ }
}
static class TaskSchedulerWithDrainableContext extends
YarnTaskSchedulerService {
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index dcf9a5dd6..e416c6570 100644
---
a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -45,11 +45,17 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -59,6 +65,8 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ContainerSignatureMatcher;
@@ -72,10 +80,16 @@ import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.TaskAttempt;
+import
org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
@@ -89,6 +103,8 @@ import
org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.AMContainerState;
+import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
+import org.apache.tez.dag.app.rm.node.AMNodeTracker;
import org.apache.tez.dag.app.web.WebUIService;
import org.apache.tez.dag.helpers.DagInfoImplForTest;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@@ -839,6 +855,60 @@ public class TestTaskSchedulerManager {
}
}
+ @Test(timeout = 10000)
+ public void testHandleException() throws Exception {
+ Configuration tezConf = new Configuration(new YarnConfiguration());
+ UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(tezConf);
+
+ // Parse plugins
+ List<NamedEntityDescriptor> tsDescriptors = Lists.newLinkedList();
+ BiMap<String, Integer> tsMap = HashBiMap.create();
+ DAGAppMaster.parseAllPlugins(tsDescriptors, tsMap, Lists.newLinkedList(),
HashBiMap.create(), Lists.newLinkedList(),
+ HashBiMap.create(), null, false, defaultPayload);
+
+ // Only TezYarn found.
+ Assert.assertEquals(1, tsDescriptors.size());
+ Assert.assertEquals(TezConstants.getTezYarnServicePluginName(),
tsDescriptors.get(0).getEntityName());
+
+ // Construct eventHandler
+ TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new
TestTaskSchedulerHelpers.CapturingEventHandler();
+ TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+
+ // Construct AMRMClient
+ AMRMClient<YarnTaskSchedulerService.CookieContainerRequest> rmClientCore =
+ new TestTaskSchedulerHelpers.AMRMClientForTest();
+ TezAMRMClientAsync<YarnTaskSchedulerService.CookieContainerRequest>
rmClient =
+ spy(new TestTaskSchedulerHelpers.AMRMClientAsyncForTest(rmClientCore,
100));
+
+ // Construct appContext
+ AppContext appContext = mock(AppContext.class);
+ doReturn(new Configuration(false)).when(appContext).getAMConf();
+ AMContainerMap amContainerMap = new
AMContainerMap(mock(ContainerHeartbeatHandler.class),
+ mock(TaskCommunicatorManagerInterface.class), new
ContainerContextMatcher(), appContext);
+ AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+ doReturn(amContainerMap).when(appContext).getAllContainers();
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+ doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
+ doReturn(dagID).when(appContext).getCurrentDAGID();
+ doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
+
+ // Construct TaskSchedulerManager
+ TaskSchedulerManager taskSchedulerManagerReal =
+ new TestTaskSchedulerHelpers.TaskSchedulerManagerForTest(appContext,
eventHandler, rmClient,
+ new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher(),
defaultPayload, tsDescriptors);
+ TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+ taskSchedulerManager.init(tezConf);
+ taskSchedulerManager.start();
+
+ // Send error to schedule, then expect
DAGAppMasterEventSchedulingServiceError event.
+ YarnTaskSchedulerService scheduler = ((YarnTaskSchedulerService)
taskSchedulerManager.getTaskScheduler(0));
+ scheduler.onError(new Exception("Trigger by unit test"));
+ waitFor(() -> {
+ return eventHandler.getEventSize() > 0;
+ }, 1000, 5000);
+
eventHandler.verifyInvocation(DAGAppMasterEventSchedulingServiceError.class);
+ }
+
private static class ExceptionAnswer implements Answer {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
@@ -1107,4 +1177,23 @@ public class TestTaskSchedulerManager {
public void dagComplete() throws ServicePluginException {
}
}
+
+ public static void waitFor(Supplier<Boolean> check, int checkEveryMillis,
+ int waitForMillis) throws TimeoutException,
InterruptedException {
+ Preconditions.checkNotNull(check, "Input supplier interface should be
initailized");
+ Preconditions.checkArgument(waitForMillis >= checkEveryMillis,
+ "Total wait time should be greater than check interval time");
+
+ long st = Time.monotonicNow();
+ boolean result = check.get();
+
+ while (!result && (Time.monotonicNow() - st < waitForMillis)) {
+ Thread.sleep(checkEveryMillis);
+ result = check.get();
+ }
+
+ if (!result) {
+ throw new TimeoutException("Timed out waiting for condition.");
+ }
+ }
}