This is an automated email from the ASF dual-hosted git repository. hexiaoqiao pushed a commit to branch branch-3.2.2 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 2f2b690df705ef62504ceb6cec1443147fc162ef Author: Peter Bacsko <[email protected]> AuthorDate: Wed Nov 25 11:44:48 2020 +0100 MAPREDUCE-7309. Improve performance of reading resource request for mapper/reducers from config. Contributed by Peter Bacsko & Wangda Tan. --- .../hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java | 12 +++++++++++- .../hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java | 7 +++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 7fc35c7..0cddaf3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -158,6 +158,9 @@ public abstract class TaskAttemptImpl implements org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt, EventHandler<TaskAttemptEvent> { + @VisibleForTesting + protected final static Map<TaskType, Resource> RESOURCE_REQUEST_CACHE + = new HashMap<>(); static final Counters EMPTY_COUNTERS = new Counters(); private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptImpl.class); @@ -172,7 +175,7 @@ public abstract class TaskAttemptImpl implements private final Clock clock; private final org.apache.hadoop.mapred.JobID oldJobId; private final TaskAttemptListener taskAttemptListener; - private final Resource resourceCapability; + private Resource resourceCapability; protected Set<String> dataLocalHosts; protected Set<String> dataLocalRacks; private final List<String> diagnostics = new ArrayList<String>(); @@ -707,6 +710,10 @@ public abstract class TaskAttemptImpl implements getResourceTypePrefix(taskType); boolean memorySet = false; boolean cpuVcoresSet = false; + if (RESOURCE_REQUEST_CACHE.get(taskType) != null) { + resourceCapability = RESOURCE_REQUEST_CACHE.get(taskType); + return; + } if (resourceTypePrefix != null) { List<ResourceInformation> resourceRequests = ResourceUtils.getRequestedResourcesFromConfig(conf, @@ -767,6 +774,9 @@ public abstract class TaskAttemptImpl implements if (!cpuVcoresSet) { this.resourceCapability.setVirtualCores(getCpuRequired(conf, taskType)); } + RESOURCE_REQUEST_CACHE.put(taskType, resourceCapability); + LOG.info("Resource capability of task type {} is set to {}", + taskType, resourceCapability); } private String getCpuVcoresKey(TaskType taskType) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 7f54ea1..ba40849 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -42,6 +42,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -180,6 +181,11 @@ public class TestTaskAttempt{ ResourceUtils.resetResourceTypes(new Configuration()); } + @Before + public void before() { + TaskAttemptImpl.RESOURCE_REQUEST_CACHE.clear(); + } + @After public void tearDown() { ResourceUtils.resetResourceTypes(new Configuration()); @@ -1634,6 +1640,7 @@ public class TestTaskAttempt{ TestAppender testAppender = new TestAppender(); final Logger logger = Logger.getLogger(TaskAttemptImpl.class); try { + TaskAttemptImpl.RESOURCE_REQUEST_CACHE.clear(); logger.addAppender(testAppender); EventHandler eventHandler = mock(EventHandler.class); Clock clock = SystemClock.getInstance(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
