Repository: tajo Updated Branches: refs/heads/master db5496550 -> 533e709b7
TAJO-1267: Remove LazyTaskScheduler. (DaeMyung Kang via jihoon) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/533e709b Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/533e709b Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/533e709b Branch: refs/heads/master Commit: 533e709b75ab7cf8bc8a06b48870dcf2ebc8fe11 Parents: db54965 Author: Jihoon Son <[email protected]> Authored: Thu Dec 25 19:06:54 2014 +0900 Committer: Jihoon Son <[email protected]> Committed: Thu Dec 25 19:06:54 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../DefaultFragmentScheduleAlgorithm.java | 251 --------- .../tajo/master/FragmentScheduleAlgorithm.java | 38 -- .../FragmentScheduleAlgorithmFactory.java | 68 --- .../master/GreedyFragmentScheduleAlgorithm.java | 429 --------------- .../apache/tajo/master/LazyTaskScheduler.java | 529 ------------------- .../querymaster/QueryMasterManagerService.java | 6 +- tajo-core/src/main/resources/tajo-default.xml | 7 +- 8 files changed, 6 insertions(+), 1324 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 2ee58a2..124977d 100644 --- a/CHANGES +++ b/CHANGES @@ -227,6 +227,8 @@ Release 0.9.1 - unreleased TASKS + TAJO-1267: Remove LazyTaskScheduler. (DaeMyung Kang via jihoon) + TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java deleted file mode 100644 index 406550d..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.util.RackResolver; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.util.NetUtils; - -import java.util.*; -import java.util.Map.Entry; - -/** - * DefaultFragmentScheduleAlgorithm selects a fragment randomly for the given argument. - * For example, when getHostLocalFragment(host, disk) is called, this algorithm randomly selects a fragment among - * the fragments which are stored at the disk of the host specified by the arguments. - */ -public class DefaultFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm { - private final static Log LOG = LogFactory.getLog(DefaultFragmentScheduleAlgorithm.class); - private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping = - new HashMap<String, Map<Integer, FragmentsPerDisk>>(); - private Map<String, Set<FragmentPair>> rackFragmentMapping = - new HashMap<String, Set<FragmentPair>>(); - private int fragmentNum = 0; - private Random random = new Random(System.currentTimeMillis()); - - public static class FragmentsPerDisk { - private Integer diskId; - private Set<FragmentPair> fragmentPairSet; - - public FragmentsPerDisk(Integer diskId) { - this.diskId = diskId; - this.fragmentPairSet = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>()); - } - - public Integer getDiskId() { - return diskId; - } - - public Set<FragmentPair> getFragmentPairSet() { - return fragmentPairSet; - } - - public void addFragmentPair(FragmentPair fragmentPair) { - fragmentPairSet.add(fragmentPair); - } - - public boolean removeFragmentPair(FragmentPair fragmentPair) { - return fragmentPairSet.remove(fragmentPair); - } - - public int size() { - return fragmentPairSet.size(); - } - - public Iterator<FragmentPair> getFragmentPairIterator() { - return fragmentPairSet.iterator(); - } - - public boolean isEmpty() { - return fragmentPairSet.isEmpty(); - } - } - - @Override - public void addFragment(FragmentPair fragmentPair) { - String[] hosts = fragmentPair.getLeftFragment().getHosts(); - int[] diskIds = null; - if (fragmentPair.getLeftFragment() instanceof FileFragment) { - diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds(); - } - for (int i = 0; i < hosts.length; i++) { - addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair); - } - fragmentNum++; - } - - private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) { - // update the fragment maps per host - String normalizeHost = NetUtils.normalizeHost(host); - Map<Integer, FragmentsPerDisk> diskFragmentMap; - if (fragmentHostMapping.containsKey(normalizeHost)) { - diskFragmentMap = fragmentHostMapping.get(normalizeHost); - } else { - diskFragmentMap = new HashMap<Integer, FragmentsPerDisk>(); - fragmentHostMapping.put(normalizeHost, diskFragmentMap); - } - FragmentsPerDisk fragmentsPerDisk; - if (diskFragmentMap.containsKey(diskId)) { - fragmentsPerDisk = diskFragmentMap.get(diskId); - } else { - fragmentsPerDisk = new FragmentsPerDisk(diskId); - diskFragmentMap.put(diskId, fragmentsPerDisk); - } - fragmentsPerDisk.addFragmentPair(fragmentPair); - - // update the fragment maps per rack - String rack = RackResolver.resolve(normalizeHost).getNetworkLocation(); - Set<FragmentPair> fragmentPairList; - if (rackFragmentMapping.containsKey(rack)) { - fragmentPairList = rackFragmentMapping.get(rack); - } else { - fragmentPairList = Collections.newSetFromMap(new HashMap<FragmentPair, Boolean>()); - rackFragmentMapping.put(rack, fragmentPairList); - } - fragmentPairList.add(fragmentPair); - } - - @Override - public void removeFragment(FragmentPair fragmentPair) { - boolean removed = false; - for (String eachHost : fragmentPair.getLeftFragment().getHosts()) { - String normalizedHost = NetUtils.normalizeHost(eachHost); - Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost); - for (Entry<Integer, FragmentsPerDisk> entry : diskFragmentMap.entrySet()) { - FragmentsPerDisk fragmentsPerDisk = entry.getValue(); - removed = fragmentsPerDisk.removeFragmentPair(fragmentPair); - if (removed) { - if (fragmentsPerDisk.size() == 0) { - diskFragmentMap.remove(entry.getKey()); - } - if (diskFragmentMap.size() == 0) { - fragmentHostMapping.remove(normalizedHost); - } - break; - } - } - String rack = RackResolver.resolve(normalizedHost).getNetworkLocation(); - if (rackFragmentMapping.containsKey(rack)) { - Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack); - fragmentPairs.remove(fragmentPair); - if (fragmentPairs.size() == 0) { - rackFragmentMapping.remove(rack); - } - } - } - if (removed) { - fragmentNum--; - } - } - - /** - * Randomly select a fragment among the fragments stored on the host. - * @param host - * @return a randomly selected fragment - */ - @Override - public FragmentPair getHostLocalFragment(String host) { - String normalizedHost = NetUtils.normalizeHost(host); - if (fragmentHostMapping.containsKey(normalizedHost)) { - Collection<FragmentsPerDisk> disks = fragmentHostMapping.get(normalizedHost).values(); - Iterator<FragmentsPerDisk> diskIterator = disks.iterator(); - int randomIndex = random.nextInt(disks.size()); - FragmentsPerDisk fragmentsPerDisk = null; - for (int i = 0; i < randomIndex; i++) { - fragmentsPerDisk = diskIterator.next(); - } - - if (fragmentsPerDisk != null) { - Iterator<FragmentPair> fragmentIterator = fragmentsPerDisk.getFragmentPairIterator(); - if (fragmentIterator.hasNext()) { - return fragmentIterator.next(); - } - } - } - return null; - } - - /** - * Randomly select a fragment among the fragments stored at the disk of the host. - * @param host - * @param diskId - * @return a randomly selected fragment - */ - @Override - public FragmentPair getHostLocalFragment(String host, Integer diskId) { - String normalizedHost = NetUtils.normalizeHost(host); - if (fragmentHostMapping.containsKey(normalizedHost)) { - Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost); - if (fragmentsPerDiskMap.containsKey(diskId)) { - FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId); - if (!fragmentsPerDisk.isEmpty()) { - return fragmentsPerDisk.getFragmentPairIterator().next(); - } - } - } - return null; - } - - /** - * Randomly select a fragment among the fragments stored on nodes of the same rack with the host. - * @param host - * @return a randomly selected fragment - */ - @Override - public FragmentPair getRackLocalFragment(String host) { - String rack = RackResolver.resolve(host).getNetworkLocation(); - if (rackFragmentMapping.containsKey(rack)) { - Set<FragmentPair> fragmentPairs = rackFragmentMapping.get(rack); - if (!fragmentPairs.isEmpty()) { - return fragmentPairs.iterator().next(); - } - } - return null; - } - - /** - * Randomly select a fragment among the total fragments. - * @return a randomly selected fragment - */ - @Override - public FragmentPair getRandomFragment() { - if (!fragmentHostMapping.isEmpty()) { - return fragmentHostMapping.values().iterator().next().values().iterator().next().getFragmentPairIterator().next(); - } - return null; - } - - @Override - public FragmentPair[] getAllFragments() { - List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>(); - for (Map<Integer, FragmentsPerDisk> eachDiskFragmentMap : fragmentHostMapping.values()) { - for (FragmentsPerDisk fragmentsPerDisk : eachDiskFragmentMap.values()) { - fragmentPairs.addAll(fragmentsPerDisk.fragmentPairSet); - } - } - return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]); - } - - @Override - public int size() { - return fragmentNum; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java deleted file mode 100644 index 10d993d..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithm.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -/** - * FragmentScheduleAlgorithm is used by LazyTaskScheduler. - * FragmentScheduleAlgorithm selects a fragment for the given argument. - * - * There are two implementations of DefaultFragmentScheduleAlgorithm and GreedyFragmentScheduleAlgorithm. - */ -public interface FragmentScheduleAlgorithm { - void addFragment(FragmentPair fragmentPair); - void removeFragment(FragmentPair fragmentPair); - - FragmentPair getHostLocalFragment(String host); - FragmentPair getHostLocalFragment(String host, Integer diskId); - FragmentPair getRackLocalFragment(String host); - FragmentPair getRandomFragment(); - FragmentPair[] getAllFragments(); - - int size(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java deleted file mode 100644 index 820a0fb..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentScheduleAlgorithmFactory.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.Map; - -public class FragmentScheduleAlgorithmFactory { - - private static Class<? extends FragmentScheduleAlgorithm> CACHED_ALGORITHM_CLASS; - private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap(); - private static final Class<?>[] DEFAULT_PARAMS = {}; - - public static Class<? extends FragmentScheduleAlgorithm> getScheduleAlgorithmClass(Configuration conf) - throws IOException { - if (CACHED_ALGORITHM_CLASS != null) { - return CACHED_ALGORITHM_CLASS; - } else { - CACHED_ALGORITHM_CLASS = conf.getClass("tajo.querymaster.lazy-task-scheduler.algorithm", null, - FragmentScheduleAlgorithm.class); - } - - if (CACHED_ALGORITHM_CLASS == null) { - throw new IOException("Scheduler algorithm is null"); - } - return CACHED_ALGORITHM_CLASS; - } - - public static <T extends FragmentScheduleAlgorithm> T get(Class<T> clazz) { - T result; - try { - Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz); - if (constructor == null) { - constructor = clazz.getDeclaredConstructor(DEFAULT_PARAMS); - constructor.setAccessible(true); - CONSTRUCTOR_CACHE.put(clazz, constructor); - } - result = constructor.newInstance(new Object[]{}); - } catch (Exception e) { - throw new RuntimeException(e); - } - return result; - } - - public static FragmentScheduleAlgorithm get(Configuration conf) throws IOException { - return get(getScheduleAlgorithmClass(conf)); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java deleted file mode 100644 index 56cf8e5..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java +++ /dev/null @@ -1,429 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import com.google.common.base.Objects; -import com.google.common.collect.Lists; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.util.RackResolver; -import org.apache.tajo.master.DefaultFragmentScheduleAlgorithm.FragmentsPerDisk; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.TUtil; - -import java.util.*; - -/** - * GreedyFragmentScheduleAlgorithm selects a fragment considering the number of fragments that are not scheduled yet. - * Disks of hosts have the priorities which are represented by the remaining number of fragments. - * This algorithm selects a fragment with trying minimizing the maximum priority. - */ -public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorithm { - private final static Log LOG = LogFactory.getLog(GreedyFragmentScheduleAlgorithm.class); - private final HostPriorityComparator hostComparator = new HostPriorityComparator(); - private Map<String, Map<Integer, FragmentsPerDisk>> fragmentHostMapping = - new HashMap<String, Map<Integer, FragmentsPerDisk>>(); - private Map<HostAndDisk, PrioritizedHost> totalHostPriority = new HashMap<HostAndDisk, PrioritizedHost>(); - private Map<String, Set<PrioritizedHost>> hostPriorityPerRack = new HashMap<String, Set<PrioritizedHost>>(); - private TopologyCache topologyCache = new TopologyCache(); - private int totalFragmentNum = 0; - - private FragmentsPerDisk getHostFragmentSet(String host, Integer diskId) { - Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap; - FragmentsPerDisk fragmentsPerDisk; - if (fragmentHostMapping.containsKey(host)) { - fragmentsPerDiskMap = fragmentHostMapping.get(host); - } else { - fragmentsPerDiskMap = new HashMap<Integer, FragmentsPerDisk>(); - fragmentHostMapping.put(host, fragmentsPerDiskMap); - } - if (fragmentsPerDiskMap.containsKey(diskId)) { - fragmentsPerDisk = fragmentsPerDiskMap.get(diskId); - } else { - fragmentsPerDisk = new FragmentsPerDisk(diskId); - fragmentsPerDiskMap.put(diskId, fragmentsPerDisk); - } - return fragmentsPerDisk; - } - - private void updateHostPriority(HostAndDisk hostAndDisk, int priority) { - if (priority > 0) { - // update the priority among the total hosts - PrioritizedHost prioritizedHost; - if (totalHostPriority.containsKey(hostAndDisk)) { - prioritizedHost = totalHostPriority.get(hostAndDisk); - prioritizedHost.priority = priority; - } else { - prioritizedHost = new PrioritizedHost(hostAndDisk, priority); - totalHostPriority.put(hostAndDisk, prioritizedHost); - } - - // update the priority among the hosts in a rack - String rack = topologyCache.resolve(hostAndDisk.host); - Set<PrioritizedHost> hostsOfRack; - if (!hostPriorityPerRack.containsKey(rack)) { - hostsOfRack = new HashSet<PrioritizedHost>(); - hostsOfRack.add(prioritizedHost); - hostPriorityPerRack.put(rack, hostsOfRack); - } - } else { - if (totalHostPriority.containsKey(hostAndDisk)) { - PrioritizedHost prioritizedHost = totalHostPriority.remove(hostAndDisk); - - String rack = topologyCache.resolve(hostAndDisk.host); - if (hostPriorityPerRack.containsKey(rack)) { - Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack); - hostsOfRack.remove(prioritizedHost); - if (hostsOfRack.size() == 0){ - hostPriorityPerRack.remove(rack); - } - } - } - } - } - - @Override - public void addFragment(FragmentPair fragmentPair) { - String[] hosts = fragmentPair.getLeftFragment().getHosts(); - int[] diskIds = null; - if (fragmentPair.getLeftFragment() instanceof FileFragment) { - diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds(); - } - for (int i = 0; i < hosts.length; i++) { - addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair); - } - totalFragmentNum++; - } - - private void addFragment(String host, Integer diskId, FragmentPair fragmentPair) { - host = topologyCache.normalize(host); - FragmentsPerDisk fragmentsPerDisk = getHostFragmentSet(host, diskId); - fragmentsPerDisk.addFragmentPair(fragmentPair); - - int priority; - HostAndDisk hostAndDisk = new HostAndDisk(host, diskId); - if (totalHostPriority.containsKey(hostAndDisk)) { - priority = totalHostPriority.get(hostAndDisk).priority; - } else { - priority = 0; - } - updateHostPriority(hostAndDisk, priority+1); - } - - public int size() { - return totalFragmentNum; - } - - /** - * Selects a fragment that is stored in the given host, and replicated at the disk of the maximum - * priority. - * @param host - * @return If there are fragments stored in the host, returns a fragment. Otherwise, return null. - */ - @Override - public FragmentPair getHostLocalFragment(String host) { - String normalizedHost = topologyCache.normalize(host); - if (!fragmentHostMapping.containsKey(normalizedHost)) { - return null; - } - - Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost); - List<Integer> disks = Lists.newArrayList(fragmentsPerDiskMap.keySet()); - Collections.shuffle(disks); - FragmentsPerDisk fragmentsPerDisk = null; - FragmentPair fragmentPair = null; - - for (Integer diskId : disks) { - fragmentsPerDisk = fragmentsPerDiskMap.get(diskId); - if (fragmentsPerDisk != null && !fragmentsPerDisk.isEmpty()) { - fragmentPair = getBestFragment(fragmentsPerDisk); - } - if (fragmentPair != null) { - return fragmentPair; - } - } - - return null; - } - - /** - * Selects a fragment that is stored at the given disk of the given host, and replicated at the disk of the maximum - * priority. - * @param host - * @param diskId - * @return If there are fragments stored at the disk of the host, returns a fragment. Otherwise, return null. - */ - @Override - public FragmentPair getHostLocalFragment(String host, Integer diskId) { - String normalizedHost = NetUtils.normalizeHost(host); - if (fragmentHostMapping.containsKey(normalizedHost)) { - Map<Integer, FragmentsPerDisk> fragmentsPerDiskMap = fragmentHostMapping.get(normalizedHost); - if (fragmentsPerDiskMap.containsKey(diskId)) { - FragmentsPerDisk fragmentsPerDisk = fragmentsPerDiskMap.get(diskId); - if (!fragmentsPerDisk.isEmpty()) { - return getBestFragment(fragmentsPerDisk); - } - } - } - return null; - } - - /** - * In the descending order of priority, find a fragment that is shared by the given fragment set and the fragment set - * of the maximal priority. - * @param fragmentsPerDisk a fragment set - * @return a fragment that is shared by the given fragment set and the fragment set of the maximal priority - */ - private FragmentPair getBestFragment(FragmentsPerDisk fragmentsPerDisk) { - // Select a fragment that is shared by host and another hostAndDisk that has the most fragments - Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values(); - PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]); - Arrays.sort(sortedHosts, hostComparator); - - for (PrioritizedHost nextHost : sortedHosts) { - if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) { - Map<Integer, FragmentsPerDisk> diskFragmentsMap = fragmentHostMapping.get(nextHost.hostAndDisk.host); - if (diskFragmentsMap.containsKey(nextHost.hostAndDisk.diskId)) { - Set<FragmentPair> largeFragmentPairSet = diskFragmentsMap.get(nextHost.hostAndDisk.diskId).getFragmentPairSet(); - Iterator<FragmentPair> smallFragmentSetIterator = fragmentsPerDisk.getFragmentPairIterator(); - while (smallFragmentSetIterator.hasNext()) { - FragmentPair eachFragmentOfSmallSet = smallFragmentSetIterator.next(); - if (largeFragmentPairSet.contains(eachFragmentOfSmallSet)) { - return eachFragmentOfSmallSet; - } - } - } - } - } - return null; - } - - /** - * Selects a fragment that is stored at the same rack of the given host, and replicated at the disk of the maximum - * priority. - * @param host - * @return If there are fragments stored at the same rack of the given host, returns a fragment. Otherwise, return null. - */ - public FragmentPair getRackLocalFragment(String host) { - host = topologyCache.normalize(host); - // Select a fragment from a host that has the most fragments in the rack - String rack = topologyCache.resolve(host); - Set<PrioritizedHost> hostsOfRack = hostPriorityPerRack.get(rack); - if (hostsOfRack != null && hostsOfRack.size() > 0) { - PrioritizedHost[] sortedHosts = hostsOfRack.toArray(new PrioritizedHost[hostsOfRack.size()]); - Arrays.sort(sortedHosts, hostComparator); - for (PrioritizedHost nextHost : sortedHosts) { - if (fragmentHostMapping.containsKey(nextHost.hostAndDisk.host)) { - List<FragmentsPerDisk> disks = Lists.newArrayList(fragmentHostMapping.get(nextHost.hostAndDisk.host).values()); - Collections.shuffle(disks); - - for (FragmentsPerDisk fragmentsPerDisk : disks) { - if (!fragmentsPerDisk.isEmpty()) { - return fragmentsPerDisk.getFragmentPairIterator().next(); - } - } - } - } - } - return null; - } - - /** - * Selects a fragment from the disk of the maximum priority. - * @return If there are remaining fragments, it returns a fragment. Otherwise, it returns null. - */ - public FragmentPair getRandomFragment() { - // Select a fragment from a host that has the most fragments - Collection<PrioritizedHost> prioritizedHosts = totalHostPriority.values(); - PrioritizedHost[] sortedHosts = prioritizedHosts.toArray(new PrioritizedHost[prioritizedHosts.size()]); - Arrays.sort(sortedHosts, hostComparator); - PrioritizedHost randomHost = sortedHosts[0]; - if (fragmentHostMapping.containsKey(randomHost.hostAndDisk.host)) { - Iterator<FragmentsPerDisk> fragmentsPerDiskIterator = fragmentHostMapping.get(randomHost.hostAndDisk.host).values().iterator(); - if (fragmentsPerDiskIterator.hasNext()) { - Iterator<FragmentPair> fragmentPairIterator = fragmentsPerDiskIterator.next().getFragmentPairIterator(); - if (fragmentPairIterator.hasNext()) { - return fragmentPairIterator.next(); - } - } - } - return null; - } - - public FragmentPair[] getAllFragments() { - List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>(); - for (Map<Integer, FragmentsPerDisk> eachValue : fragmentHostMapping.values()) { - for (FragmentsPerDisk fragmentsPerDisk : eachValue.values()) { - Set<FragmentPair> pairSet = fragmentsPerDisk.getFragmentPairSet(); - fragmentPairs.addAll(pairSet); - } - } - return fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]); - } - - public void removeFragment(FragmentPair fragmentPair) { - String [] hosts = fragmentPair.getLeftFragment().getHosts(); - int[] diskIds = null; - if (fragmentPair.getLeftFragment() instanceof FileFragment) { - diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds(); - } - for (int i = 0; i < hosts.length; i++) { - int diskId = diskIds == null ? -1 : diskIds[i]; - String normalizedHost = NetUtils.normalizeHost(hosts[i]); - Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost); - - if (diskFragmentMap != null) { - FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskId); - if (fragmentsPerDisk != null) { - boolean isRemoved = fragmentsPerDisk.removeFragmentPair(fragmentPair); - if (isRemoved) { - if (fragmentsPerDisk.size() == 0) { - diskFragmentMap.remove(diskId); - if (diskFragmentMap.size() == 0) { - fragmentHostMapping.remove(normalizedHost); - } - } - HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskId); - if (totalHostPriority.containsKey(hostAndDisk)) { - PrioritizedHost prioritizedHost = totalHostPriority.get(hostAndDisk); - updateHostPriority(prioritizedHost.hostAndDisk, prioritizedHost.priority-1); - } - } - } - } - } - - totalFragmentNum--; - } - - private static class HostAndDisk { - private String host; - private Integer diskId; - - public HostAndDisk(String host, Integer diskId) { - this.host = host; - this.diskId = diskId; - } - - public String getHost() { - return host; - } - - public int getDiskId() { - return diskId; - } - - @Override - public int hashCode() { - return Objects.hashCode(host, diskId); - } - - @Override - public boolean equals(Object o) { - if (o instanceof HostAndDisk) { - HostAndDisk other = (HostAndDisk) o; - return this.host.equals(other.host) && - TUtil.checkEquals(this.diskId, other.diskId); - } - return false; - } - } - - public static class PrioritizedHost { - private HostAndDisk hostAndDisk; - private int priority; - - public PrioritizedHost(HostAndDisk hostAndDisk, int priority) { - this.hostAndDisk = hostAndDisk; - this.priority = priority; - } - - public PrioritizedHost(String host, Integer diskId, int priority) { - this.hostAndDisk = new HostAndDisk(host, diskId); - this.priority = priority; - } - - public String getHost() { - return hostAndDisk.host; - } - - public Integer getDiskId() { - return hostAndDisk.diskId; - } - - public Integer getPriority() { - return priority; - } - - @Override - public boolean equals(Object o) { - if (o instanceof PrioritizedHost) { - PrioritizedHost other = (PrioritizedHost) o; - return this.hostAndDisk.equals(other.hostAndDisk); - } - return false; - } - - @Override - public int hashCode() { - return hostAndDisk.hashCode(); - } - - @Override - public String toString() { - return "host: " + hostAndDisk.host + " disk: " + hostAndDisk.diskId + " priority: " + priority; - } - } - - - public static class HostPriorityComparator implements Comparator<PrioritizedHost> { - - @Override - public int compare(PrioritizedHost prioritizedHost, PrioritizedHost prioritizedHost2) { - return prioritizedHost2.priority - prioritizedHost.priority; - } - } - - - public static class TopologyCache { - private Map<String, String> hostRackMap = new HashMap<String, String>(); - private Map<String, String> normalizedHostMap = new HashMap<String, String>(); - - public String normalize(String host) { - if (normalizedHostMap.containsKey(host)) { - return normalizedHostMap.get(host); - } else { - String normalized = NetUtils.normalizeHost(host); - normalizedHostMap.put(host, normalized); - return normalized; - } - } - - public String resolve(String host) { - if (hostRackMap.containsKey(host)) { - return hostRackMap.get(host); - } else { - String rack = RackResolver.resolve(host).getNetworkLocation(); - hostRackMap.put(host, rack); - return rack; - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java deleted file mode 100644 index 32af17b..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java +++ /dev/null @@ -1,529 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.engine.planner.global.ExecutionBlock; -import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.engine.query.TaskRequest; -import org.apache.tajo.engine.query.TaskRequestImpl; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.event.*; -import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; -import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; -import org.apache.tajo.master.querymaster.Task; -import org.apache.tajo.master.querymaster.TaskAttempt; -import org.apache.tajo.master.querymaster.Stage; -import org.apache.tajo.master.container.TajoContainerId; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.NetUtils; -import org.apache.tajo.worker.FetchImpl; - -import java.io.IOException; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; - -public class LazyTaskScheduler extends AbstractTaskScheduler { - private static final Log LOG = LogFactory.getLog(LazyTaskScheduler.class); - - private final TaskSchedulerContext context; - private final Stage stage; - - private Thread schedulingThread; - private volatile boolean stopEventHandling; - - BlockingQueue<TaskSchedulerEvent> eventQueue - = new LinkedBlockingQueue<TaskSchedulerEvent>(); - - private TaskRequests taskRequests; - private FragmentScheduleAlgorithm scheduledFragments; - private ScheduledFetches scheduledFetches; - - private int diskLocalAssigned = 0; - private int hostLocalAssigned = 0; - private int rackLocalAssigned = 0; - private int totalAssigned = 0; - - private int nextTaskId = 0; - private int containerNum; - - public LazyTaskScheduler(TaskSchedulerContext context, Stage stage) { - super(LazyTaskScheduler.class.getName()); - this.context = context; - this.stage = stage; - } - - @Override - public void init(Configuration conf) { - taskRequests = new TaskRequests(); - try { - scheduledFragments = FragmentScheduleAlgorithmFactory.get(conf); - LOG.info(scheduledFragments.getClass().getSimpleName() + " is selected for the scheduling algorithm."); - } catch (IOException e) { - throw new RuntimeException(e); - } - if (!context.isLeafQuery()) { - scheduledFetches = new ScheduledFetches(); - } - - super.init(conf); - } - - @Override - public void start() { - containerNum = stage.getContext().getResourceAllocator().calculateNumRequestContainers( - stage.getContext().getQueryMasterContext().getWorkerContext(), - context.getEstimatedTaskNum(), 512); - - LOG.info("Start TaskScheduler"); - this.schedulingThread = new Thread() { - public void run() { - - while(!stopEventHandling && !Thread.currentThread().isInterrupted()) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - break; - } - - schedule(); - } - LOG.info("TaskScheduler schedulingThread stopped"); - } - }; - - this.schedulingThread.start(); - super.start(); - } - - private static final TaskAttemptId NULL_ATTEMPT_ID; - public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq; - static { - ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); - NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0); - - TajoWorkerProtocol.TaskRequestProto.Builder builder = - TajoWorkerProtocol.TaskRequestProto.newBuilder(); - builder.setId(NULL_ATTEMPT_ID.getProto()); - builder.setShouldDie(true); - builder.setOutputTable(""); - builder.setSerializedData(""); - builder.setClusteredOutput(false); - stopTaskRunnerReq = builder.build(); - } - - @Override - public void stop() { - stopEventHandling = true; - schedulingThread.interrupt(); - - // Return all of request callbacks instantly. - for (TaskRequestEvent req : taskRequests.taskRequestQueue) { - req.getCallback().run(stopTaskRunnerReq); - } - - LOG.info("Task Scheduler stopped"); - super.stop(); - } - - List<TaskRequestEvent> taskRequestEvents = new ArrayList<TaskRequestEvent>(); - public void schedule() { - if (taskRequests.size() > 0) { - if (context.isLeafQuery()) { - LOG.debug("Try to schedule tasks with taskRequestEvents: " + - taskRequests.size() + ", Fragment Schedule Request: " + - scheduledFragments.size()); - taskRequests.getTaskRequests(taskRequestEvents, - scheduledFragments.size()); - LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents "); - if (taskRequestEvents.size() > 0) { - assignLeafTasks(taskRequestEvents); - } - taskRequestEvents.clear(); - } else { - LOG.debug("Try to schedule tasks with taskRequestEvents: " + - taskRequests.size() + ", Fetch Schedule Request: " + - scheduledFetches.size()); - taskRequests.getTaskRequests(taskRequestEvents, - scheduledFetches.size()); - LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents "); - if (taskRequestEvents.size() > 0) { - assignNonLeafTasks(taskRequestEvents); - } - taskRequestEvents.clear(); - } - } - } - - @Override - public void handle(TaskSchedulerEvent event) { - int qSize = eventQueue.size(); - if (qSize != 0 && qSize % 1000 == 0) { - LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize); - } - int remCapacity = eventQueue.remainingCapacity(); - if (remCapacity < 1000) { - LOG.warn("Very low remaining capacity in the event-queue " - + "of DefaultTaskScheduler: " + remCapacity); - } - - if (event.getType() == EventType.T_SCHEDULE) { - if (event instanceof FragmentScheduleEvent) { - FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event; - Collection<Fragment> rightFragments = castEvent.getRightFragments(); - if (rightFragments == null || rightFragments.isEmpty()) { - scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), null)); - } else { - for (Fragment eachFragment: rightFragments) { - scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), eachFragment)); - } - } - if (castEvent.getLeftFragment() instanceof FileFragment) { - initDiskBalancer(castEvent.getLeftFragment().getHosts(), ((FileFragment)castEvent.getLeftFragment()).getDiskIds()); - } - } else if (event instanceof FetchScheduleEvent) { - FetchScheduleEvent castEvent = (FetchScheduleEvent) event; - scheduledFetches.addFetch(castEvent.getFetches()); - } else if (event instanceof TaskAttemptToSchedulerEvent) { - TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event; - assignTask(castEvent.getContext(), castEvent.getTaskAttempt()); - } - } - } - - public void handleTaskRequestEvent(TaskRequestEvent event) { - taskRequests.handle(event); - } - - @Override - public int remainingScheduledObjectNum() { - if (context.isLeafQuery()) { - return scheduledFragments.size(); - } else { - return scheduledFetches.size(); - } - } - - private Map<String, DiskBalancer> hostDiskBalancerMap = new HashMap<String, DiskBalancer>(); - - private void initDiskBalancer(String[] hosts, int[] diskIds) { - for (int i = 0; i < hosts.length; i++) { - DiskBalancer diskBalancer; - String normalized = NetUtils.normalizeHost(hosts[i]); - if (hostDiskBalancerMap.containsKey(normalized)) { - diskBalancer = hostDiskBalancerMap.get(normalized); - } else { - diskBalancer = new DiskBalancer(normalized); - hostDiskBalancerMap.put(normalized, diskBalancer); - } - diskBalancer.addDiskId(diskIds[i]); - } - } - - private static class DiskBalancer { - private HashMap<TajoContainerId, Integer> containerDiskMap = new HashMap<TajoContainerId, - Integer>(); - private HashMap<Integer, Integer> diskReferMap = new HashMap<Integer, Integer>(); - private String host; - - public DiskBalancer(String host){ - this.host = host; - } - - public void addDiskId(Integer diskId) { - if (!diskReferMap.containsKey(diskId)) { - diskReferMap.put(diskId, 0); - } - } - - public Integer getDiskId(TajoContainerId containerId) { - if (!containerDiskMap.containsKey(containerId)) { - assignVolumeId(containerId); - } - - return containerDiskMap.get(containerId); - } - - public void assignVolumeId(TajoContainerId containerId){ - Map.Entry<Integer, Integer> volumeEntry = null; - - for (Map.Entry<Integer, Integer> entry : diskReferMap.entrySet()) { - if(volumeEntry == null) volumeEntry = entry; - - if (volumeEntry.getValue() >= entry.getValue()) { - volumeEntry = entry; - } - } - - if(volumeEntry != null){ - diskReferMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1); - LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", Concurrency : " - + diskReferMap.get(volumeEntry.getKey())); - containerDiskMap.put(containerId, volumeEntry.getKey()); - } - } - - public String getHost() { - return host; - } - } - - private class TaskRequests implements EventHandler<TaskRequestEvent> { - private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue = - new LinkedBlockingQueue<TaskRequestEvent>(); - - @Override - public void handle(TaskRequestEvent event) { - LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId()); - if(stopEventHandling) { - event.getCallback().run(stopTaskRunnerReq); - return; - } - int qSize = taskRequestQueue.size(); - if (qSize != 0 && qSize % 1000 == 0) { - LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize); - } - int remCapacity = taskRequestQueue.remainingCapacity(); - if (remCapacity < 1000) { - LOG.warn("Very low remaining capacity in the event-queue " - + "of DefaultTaskScheduler: " + remCapacity); - } - - taskRequestQueue.add(event); - } - - public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests, - int num) { - taskRequestQueue.drainTo(taskRequests, num); - } - - public int size() { - return taskRequestQueue.size(); - } - } - - private long adjustTaskSize() { - long originTaskSize = context.getMasterContext().getConf().getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024; - long fragNumPerTask = context.getTaskSize() / originTaskSize; - if (fragNumPerTask * containerNum > remainingScheduledObjectNum()) { - return context.getTaskSize(); - } else { - fragNumPerTask = (long) Math.ceil((double)remainingScheduledObjectNum() / (double)containerNum); - return originTaskSize * fragNumPerTask; - } - } - - private void assignLeafTasks(List<TaskRequestEvent> taskRequests) { - Collections.shuffle(taskRequests); - Iterator<TaskRequestEvent> it = taskRequests.iterator(); - - TaskRequestEvent taskRequest; - while (it.hasNext() && scheduledFragments.size() > 0) { - taskRequest = it.next(); - LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," + - "containerId=" + taskRequest.getContainerId()); - ContainerProxy container = context.getMasterContext().getResourceAllocator(). - getContainer(taskRequest.getContainerId()); - - if(container == null) { - continue; - } - - String host = container.getTaskHostName(); - TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(container.containerID, - host, taskRequest.getCallback()); - Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++); - - FragmentPair fragmentPair; - List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>(); - boolean diskLocal = false; - long assignedFragmentSize = 0; - long taskSize = adjustTaskSize(); - LOG.info("Adjusted task size: " + taskSize); - - TajoConf conf = stage.getContext().getConf(); - // host local, disk local - String normalized = NetUtils.normalizeHost(host); - Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID); - if (diskId != null && diskId != -1) { - do { - fragmentPair = scheduledFragments.getHostLocalFragment(host, diskId); - if (fragmentPair == null || fragmentPair.getLeftFragment() == null) { - break; - } - - if (assignedFragmentSize + - StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) { - break; - } else { - fragmentPairs.add(fragmentPair); - assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()); - if (fragmentPair.getRightFragment() != null) { - assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment()); - } - } - scheduledFragments.removeFragment(fragmentPair); - diskLocal = true; - } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize); - } - - if (assignedFragmentSize < taskSize) { - // host local - do { - fragmentPair = scheduledFragments.getHostLocalFragment(host); - if (fragmentPair == null || fragmentPair.getLeftFragment() == null) { - break; - } - - if (assignedFragmentSize + - StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) { - break; - } else { - fragmentPairs.add(fragmentPair); - assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()); - if (fragmentPair.getRightFragment() != null) { - assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment()); - } - } - scheduledFragments.removeFragment(fragmentPair); - diskLocal = false; - } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize); - } - - // rack local - if (fragmentPairs.size() == 0) { - fragmentPair = scheduledFragments.getRackLocalFragment(host); - - // random - if (fragmentPair == null) { - fragmentPair = scheduledFragments.getRandomFragment(); - } else { - rackLocalAssigned++; - } - - if (fragmentPair != null) { - fragmentPairs.add(fragmentPair); - scheduledFragments.removeFragment(fragmentPair); - } - } else { - if (diskLocal) { - diskLocalAssigned++; - } else { - hostLocalAssigned++; - } - } - - if (fragmentPairs.size() == 0) { - throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!"); - } - - LOG.info("host: " + host + " disk id: " + diskId + " fragment num: " + fragmentPairs.size()); - - task.setFragment(fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()])); - stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); - } - } - - private void assignNonLeafTasks(List<TaskRequestEvent> taskRequests) { - Iterator<TaskRequestEvent> it = taskRequests.iterator(); - - TaskRequestEvent taskRequest; - while (it.hasNext()) { - taskRequest = it.next(); - LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId()); - - // random allocation - if (scheduledFetches.size() > 0) { - LOG.debug("Assigned based on * match"); - ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer( - taskRequest.getContainerId()); - TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(container.containerID, - container.getTaskHostName(), taskRequest.getCallback()); - Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++); - task.setFragment(scheduledFragments.getAllFragments()); - stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); - } - } - } - - private void assignTask(TaskAttemptScheduleContext attemptContext, TaskAttempt taskAttempt) { - TaskAttemptId attemptId = taskAttempt.getId(); - TaskRequest taskAssign = new TaskRequestImpl( - attemptId, - new ArrayList<FragmentProto>(taskAttempt.getTask().getAllFragments()), - "", - false, - taskAttempt.getTask().getLogicalPlan().toJson(), - context.getMasterContext().getQueryContext(), - stage.getDataChannel(), stage.getBlock().getEnforcer()); - if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { - taskAssign.setInterQuery(); - } - - if (!context.isLeafQuery()) { - Map<String, List<FetchImpl>> fetch = scheduledFetches.getNextFetch(); - scheduledFetches.popNextFetch(); - - for (Entry<String, List<FetchImpl>> fetchEntry : fetch.entrySet()) { - for (FetchImpl eachValue : fetchEntry.getValue()) { - taskAssign.addFetch(fetchEntry.getKey(), eachValue); - } - } - } - - context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId, - attemptContext.getContainerId(), taskAttempt.getWorkerConnectionInfo())); - - totalAssigned++; - attemptContext.getCallback().run(taskAssign.getProto()); - - if (context.isLeafQuery()) { - LOG.debug("DiskLocalAssigned / Total: " + diskLocalAssigned + " / " + totalAssigned); - LOG.debug("HostLocalAssigned / Total: " + hostLocalAssigned + " / " + totalAssigned); - LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned); - } - } - - private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) { - if (masterPlan.isRoot(block)) { - return false; - } - - ExecutionBlock parent = masterPlan.getParent(block); - if (masterPlan.isRoot(parent) && parent.hasUnion()) { - return false; - } - - return true; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java index c2e1009..9f7d3f8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java @@ -30,9 +30,9 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.LazyTaskScheduler; -import org.apache.tajo.master.event.*; +import org.apache.tajo.master.DefaultTaskScheduler; import org.apache.tajo.master.container.TajoContainerId; +import org.apache.tajo.master.event.*; import org.apache.tajo.master.session.Session; import org.apache.tajo.rpc.AsyncRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; @@ -128,7 +128,7 @@ public class QueryMasterManagerService extends CompositeService QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId()); if(queryMasterTask == null || queryMasterTask.isStopped()) { - done.run(LazyTaskScheduler.stopTaskRunnerReq); + done.run(DefaultTaskScheduler.stopTaskRunnerReq); } else { TajoContainerId cid = queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId()); http://git-wip-us.apache.org/repos/asf/tajo/blob/533e709b/tajo-core/src/main/resources/tajo-default.xml ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/tajo-default.xml b/tajo-core/src/main/resources/tajo-default.xml index c49e8e5..db92b02 100644 --- a/tajo-core/src/main/resources/tajo-default.xml +++ b/tajo-core/src/main/resources/tajo-default.xml @@ -42,9 +42,4 @@ <value>org.apache.tajo.master.DefaultTaskScheduler</value> </property> - <property> - <name>tajo.querymaster.lazy-task-scheduler.algorithm</name> - <value>org.apache.tajo.master.GreedyFragmentScheduleAlgorithm</value> - </property> - -</configuration> \ No newline at end of file +</configuration>
