Repository: tajo Updated Branches: refs/heads/master 0d94efe0f -> fdb76ed2c
TAJO-2081: Incorrect task locality on single node. Closes #968 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fdb76ed2 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fdb76ed2 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fdb76ed2 Branch: refs/heads/master Commit: fdb76ed2c7c06e7a9b43bbea18e202586b88606a Parents: 0d94efe Author: Jinho Kim <[email protected]> Authored: Fri Mar 4 15:09:10 2016 +0900 Committer: Jinho Kim <[email protected]> Committed: Fri Mar 4 15:09:10 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/conf/TajoConf.java | 2 + .../tajo/querymaster/DefaultTaskScheduler.java | 64 +++++++++----------- 3 files changed, 34 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/fdb76ed2/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 50e1de0..7892aa2 100644 --- a/CHANGES +++ b/CHANGES @@ -106,6 +106,8 @@ Release 0.12.0 - unreleased BUG FIXES + TAJO-2081: Incorrect task locality on single node. (jinho) + TAJO-2080: ArrayIndexOutOfBoundsException when performing aggregation on an union block. (jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/fdb76ed2/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index a535ece..c36f43b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -203,6 +203,8 @@ public class TajoConf extends Configuration { QUERYMASTER_TASK_SCHEDULER_DELAY("tajo.qm.task-scheduler.delay", 50), // 50 ms + QUERYMASTER_TASK_SCHEDULER_REQUEST_MAX_NUM("tajo.qm.task-scheduler.request.max-num", 50), + // Catalog CATALOG_ADDRESS("tajo.catalog.client-rpc.address", "localhost:26005", Validators.networkAddr()), http://git-wip-us.apache.org/repos/asf/tajo/blob/fdb76ed2/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index e290184..a0e76cc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -64,8 +64,6 @@ import static org.apache.tajo.ResourceProtos.*; public class DefaultTaskScheduler extends AbstractTaskScheduler { private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class); - private static final String REQUEST_MAX_NUM = "tajo.qm.task-scheduler.request.max-num"; - private final TaskSchedulerContext context; private Stage stage; private TajoConf tajoConf; @@ -123,7 +121,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { break; } } - LOG.info("TaskScheduler schedulingThread stopped"); + info(LOG, "TaskScheduler schedulingThread stopped"); } }; super.init(conf); @@ -131,8 +129,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { @Override public void start() { - LOG.info("Start TaskScheduler"); - maximumRequestContainer = tajoConf.getInt(REQUEST_MAX_NUM, stage.getContext().getWorkerMap().size()); + info(LOG, "Start TaskScheduler"); + maximumRequestContainer = Math.min(tajoConf.getIntVar(TajoConf.ConfVars.QUERYMASTER_TASK_SCHEDULER_REQUEST_MAX_NUM) + , stage.getContext().getWorkerMap().size()); if (isLeaf) { candidateWorkers.addAll(getWorkerIds(getLeafTaskHosts())); @@ -160,10 +159,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } candidateWorkers.clear(); scheduledRequests.clear(); - LOG.info("Task Scheduler stopped"); + info(LOG, "Task Scheduler stopped"); super.stop(); } + protected void info(Log log, String message) { + log.info(String.format("[%s] %s", stage.getId(), message)); + } + private Fragment[] fragmentsForNonLeafTask; private Fragment[] broadcastFragmentsForNonLeafTask; @@ -417,7 +420,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { TaskAttemptId taskAttemptId = null; if (unassignedTaskForEachVolume.size() > 0) { - int retry = unassignedTaskForEachVolume.size(); + int retry = diskVolumeLoads.size(); do { //clean and get a remaining local task taskAttemptId = getAndRemove(volumeId); @@ -473,6 +476,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { Iterator<TaskAttempt> iterator = list.iterator(); taskAttempt = iterator.next(); iterator.remove(); + remainTasksNum.decrementAndGet(); } taskAttemptId = taskAttempt.getId(); @@ -484,6 +488,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } increaseConcurrency(volumeId); + } else { + unassignedTaskForEachVolume.remove(volumeId); } return taskAttemptId; @@ -519,14 +525,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } if (volumeId > -1) { - LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency); + info(LOG, "Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency); } else if (volumeId == -1) { // this case is disabled namenode block meta or compressed text file or amazon s3 - LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency); + info(LOG, "Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency); } else if (volumeId == REMOTE) { // this case has processed all block on host and it will be assigned to remote - LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize() - + ", Remote Concurrency : " + concurrency); + info(LOG, "Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize() + + ", Remote Concurrency : " + concurrency + ", Unassigned volumes: " + unassignedTaskForEachVolume.size()); } diskVolumeLoads.put(volumeId, concurrency); return concurrency; @@ -537,13 +543,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { */ private synchronized void decreaseConcurrency(int volumeId){ if(diskVolumeLoads.containsKey(volumeId)){ - Integer concurrency = diskVolumeLoads.get(volumeId); + int concurrency = diskVolumeLoads.get(volumeId); if(concurrency > 0){ diskVolumeLoads.put(volumeId, concurrency - 1); - } else { - if (volumeId > REMOTE && !unassignedTaskForEachVolume.containsKey(volumeId)) { - diskVolumeLoads.remove(volumeId); - } } } } @@ -559,7 +561,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) { if(volumeEntry == null) volumeEntry = entry; - if (volumeEntry.getValue() >= entry.getValue()) { + if (entry.getKey() != REMOTE && volumeEntry.getValue() >= entry.getValue()) { volumeEntry = entry; } } @@ -596,19 +598,16 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { public void cancel(TaskAttempt taskAttempt) { + TaskAttemptToSchedulerEvent schedulerEvent = new TaskAttemptToSchedulerEvent( + EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(), + null, taskAttempt); + if(taskAttempt.isLeafTask()) { releaseTaskAttempt(taskAttempt); - List<DataLocation> locations = taskAttempt.getTask().getDataLocations(); - - for (DataLocation location : locations) { - HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost()); - volumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt); - } - - scheduledRequests.leafTasks.add(taskAttempt.getId()); + scheduledRequests.addLeafTask(schedulerEvent); } else { - scheduledRequests.nonLeafTasks.add(taskAttempt.getId()); + scheduledRequests.addNonLeafTask(schedulerEvent); } context.getMasterContext().getEventHandler().handle( @@ -826,7 +825,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { tailLimit = Math.max(remainingScheduledObjectNum() / nodes, 1); } - if (hostVolumeMapping.getRemoteConcurrency() >= tailLimit) { //remote task throttling per node + //remote task throttling per node + if (nodes > 1 && hostVolumeMapping.getRemoteConcurrency() >= tailLimit) { continue; } else { // assign to remote volume @@ -904,9 +904,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { cancellation++; } - if(LOG.isDebugEnabled()) { - LOG.debug("Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); - } + info(LOG, "Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); continue; } } catch (Exception e) { @@ -918,7 +916,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { rackLocalAssigned += rackAssign; if (rackAssign > 0) { - LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), " + + info(LOG, String.format("Assigned Local/Rack/Total: (%d/%d/%d), " + "Attempted Cancel/Assign/Total: (%d/%d/%d), " + "Locality: %.2f%%, Rack host: %s", hostLocalAssigned, rackLocalAssigned, totalAssigned, @@ -1022,9 +1020,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { cancellation++; } - if(LOG.isDebugEnabled()) { - LOG.debug("Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); - } + info(LOG, "Canceled requests: " + responseProto.getCancellationTaskCount() + " from " + addr); continue; }
