Repository: tajo Updated Branches: refs/heads/master 072b5a3ad -> eeaf379a4
TAJO-987: Hash shuffle should be balanced according to intermediate volumes. Closes #101 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/eeaf379a Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/eeaf379a Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/eeaf379a Branch: refs/heads/master Commit: eeaf379a48030dd819a6daf2040c779379543ac8 Parents: 072b5a3 Author: Hyunsik Choi <[email protected]> Authored: Mon Aug 4 12:08:51 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon Aug 4 12:08:51 2014 +0900 ---------------------------------------------------------------------- CHANGES | 5 +- .../tajo/master/querymaster/Repartitioner.java | 111 +++++++++++++++++-- .../apache/tajo/master/TestRepartitioner.java | 53 +++++++++ 3 files changed, 160 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/eeaf379a/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 705e91e..d6ac279 100644 --- a/CHANGES +++ b/CHANGES @@ -29,7 +29,10 @@ Release 0.9.0 - unreleased IMPROVEMENT - TAJO-955: Add database selection submit button in catalogview.jsp for text + TAJO-987: Hash shuffle should be balanced according to intermediate + volumes. (hyunsik) + + TAJO-955: Add database selection submit button in catalogview.jsp for text based browse. (Hyoungjun Kim via jihoon) TAJO-956: CONCAT should be support multiple params and null param. http://git-wip-us.apache.org/repos/asf/tajo/blob/eeaf379a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index f86106f..1fa3f11 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -18,6 +18,7 @@ package org.apache.tajo.master.querymaster; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.primitives.Ints; import org.apache.commons.logging.Log; @@ -48,6 +49,7 @@ import org.apache.tajo.storage.AbstractStorageManager; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.worker.FetchImpl; @@ -59,9 +61,7 @@ import java.net.URI; import java.util.*; import java.util.Map.Entry; -import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE; -import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE; -import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.SCATTERED_HASH_SHUFFLE; +import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.*; /** * Repartitioner creates non-leaf tasks and shuffles intermediate data. @@ -675,6 +675,31 @@ public class Repartitioner { } } + @VisibleForTesting + public static class FetchGroupMeta { + long totalVolume; + List<FetchImpl> fetchUrls; + + public FetchGroupMeta(long volume, FetchImpl fetchUrls) { + this.totalVolume = volume; + this.fetchUrls = Lists.newArrayList(fetchUrls); + } + + public FetchGroupMeta addFetche(FetchImpl fetches) { + this.fetchUrls.add(fetches); + return this; + } + + public void increaseVolume(long volume) { + this.totalVolume += volume; + } + + public long getVolume() { + return totalVolume; + } + + } + public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan, SubQuery subQuery, DataChannel channel, int maxNum) { @@ -689,7 +714,7 @@ public class Repartitioner { SubQuery.scheduleFragments(subQuery, fragments); Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost; - Map<Integer, Collection<FetchImpl>> finalFetches = new HashMap<Integer, Collection<FetchImpl>>(); + Map<Integer, FetchGroupMeta> finalFetches = new HashMap<Integer, FetchGroupMeta>(); Map<ExecutionBlockId, List<IntermediateEntry>> intermediates = new HashMap<ExecutionBlockId, List<IntermediateEntry>>(); @@ -717,10 +742,15 @@ public class Repartitioner { FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(), block.getId(), interm.getKey(), e.getValue()); + long volumeSum = 0; + for (IntermediateEntry ie : e.getValue()) { + volumeSum += ie.getVolume(); + } + if (finalFetches.containsKey(interm.getKey())) { - finalFetches.get(interm.getKey()).add(fetch); + finalFetches.get(interm.getKey()).addFetche(fetch).increaseVolume(volumeSum); } else { - finalFetches.put(interm.getKey(), TUtil.newList(fetch)); + finalFetches.put(interm.getKey(), new FetchGroupMeta(volumeSum, fetch)); } } } @@ -756,12 +786,77 @@ public class Repartitioner { scan.getTableName()); } else { schedulerContext.setEstimatedTaskNum(determinedTaskNum); - // divide fetch uris into the the proper number of tasks in a round robin manner. - scheduleFetchesByRoundRobin(subQuery, finalFetches, scan.getTableName(), determinedTaskNum); + // divide fetch uris into the the proper number of tasks according to volumes + scheduleFetchesByEvenDistributedVolumes(subQuery, finalFetches, scan.getTableName(), determinedTaskNum); LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum); } } + public static Pair<Long [], Map<String, List<FetchImpl>>[]> makeEvenDistributedFetchImpl( + Map<Integer, FetchGroupMeta> partitions, String tableName, int num) { + + // Sort fetchGroupMeta in a descending order of data volumes. + List<FetchGroupMeta> fetchGroupMetaList = Lists.newArrayList(partitions.values()); + Collections.sort(fetchGroupMetaList, new Comparator<FetchGroupMeta>() { + @Override + public int compare(FetchGroupMeta o1, FetchGroupMeta o2) { + return o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0); + } + }); + + // Initialize containers + Map<String, List<FetchImpl>>[] fetchesArray = new Map[num]; + Long [] assignedVolumes = new Long[num]; + // initialization + for (int i = 0; i < num; i++) { + fetchesArray[i] = new HashMap<String, List<FetchImpl>>(); + assignedVolumes[i] = 0l; + } + + // This algorithm assignes bigger first manner by using a sorted iterator. It is a kind of greedy manner. + // Its complexity is O(n). Since FetchGroup can be more than tens of thousands, we should consider its complexity. + // In terms of this point, it will show reasonable performance and results. even though it is not an optimal + // algorithm. + Iterator<FetchGroupMeta> iterator = fetchGroupMetaList.iterator(); + + int p = 0; + while(iterator.hasNext()) { + while (p < num && iterator.hasNext()) { + FetchGroupMeta fetchGroupMeta = iterator.next(); + assignedVolumes[p] += fetchGroupMeta.getVolume(); + + TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls); + p++; + } + + p = num - 1; + while (p > 0 && iterator.hasNext()) { + FetchGroupMeta fetchGroupMeta = iterator.next(); + assignedVolumes[p] += fetchGroupMeta.getVolume(); + + // While the current one is smaller than next one, it adds additional fetches to current one. + while(iterator.hasNext() && assignedVolumes[p - 1] > assignedVolumes[p]) { + FetchGroupMeta additionalFetchGroup = iterator.next(); + assignedVolumes[p] += additionalFetchGroup.getVolume(); + TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.fetchUrls); + } + + p--; + } + } + + return new Pair<Long[], Map<String, List<FetchImpl>>[]>(assignedVolumes, fetchesArray); + } + + public static void scheduleFetchesByEvenDistributedVolumes(SubQuery subQuery, Map<Integer, FetchGroupMeta> partitions, + String tableName, int num) { + Map<String, List<FetchImpl>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond(); + // Schedule FetchImpls + for (Map<String, List<FetchImpl>> eachFetches : fetchsArray) { + SubQuery.scheduleFetches(subQuery, eachFetches); + } + } + // Scattered hash shuffle hashes the key columns and groups the hash keys associated with // the same hash key. Then, if the volume of a group is larger // than DIST_QUERY_TABLE_PARTITION_VOLUME, it divides the group into more than two sub groups http://git-wip-us.apache.org/repos/asf/tajo/blob/eeaf379a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java index 0ccaebe..009c02e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -18,11 +18,14 @@ package org.apache.tajo.master; +import com.google.common.collect.Maps; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; import org.apache.tajo.TestTajoIds; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.querymaster.QueryUnit; +import org.apache.tajo.master.querymaster.Repartitioner; +import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.FetchImpl; import org.jboss.netty.handler.codec.http.QueryStringDecoder; @@ -35,6 +38,8 @@ import java.util.List; import java.util.Map; import static junit.framework.Assert.assertEquals; +import static org.apache.tajo.master.querymaster.Repartitioner.FetchGroupMeta; +import static org.junit.Assert.assertTrue; public class TestRepartitioner { @Test @@ -83,4 +88,52 @@ public class TestRepartitioner { } return ret; } + + @Test + public void testScheduleFetchesByEvenDistributedVolumes() { + Map<Integer, FetchGroupMeta> fetchGroups = Maps.newHashMap(); + String tableName = "test1"; + + + fetchGroups.put(0, new FetchGroupMeta(100, new FetchImpl())); + fetchGroups.put(1, new FetchGroupMeta(80, new FetchImpl())); + fetchGroups.put(2, new FetchGroupMeta(70, new FetchImpl())); + fetchGroups.put(3, new FetchGroupMeta(30, new FetchImpl())); + fetchGroups.put(4, new FetchGroupMeta(10, new FetchImpl())); + fetchGroups.put(5, new FetchGroupMeta(5, new FetchImpl())); + + Pair<Long [], Map<String, List<FetchImpl>>[]> results; + + results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1); + long expected [] = {100 + 80 + 70 + 30 + 10 + 5}; + assertFetchVolumes(expected, results.getFirst()); + + results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2); + long expected0 [] = {130, 165}; + assertFetchVolumes(expected0, results.getFirst()); + + results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3); + long expected1 [] = {100, 95, 100}; + assertFetchVolumes(expected1, results.getFirst()); + + results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4); + long expected2 [] = {100, 80, 70, 45}; + assertFetchVolumes(expected2, results.getFirst()); + + results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5); + long expected3 [] = {100, 80, 70, 30, 15}; + assertFetchVolumes(expected3, results.getFirst()); + + results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6); + long expected4 [] = {100, 80, 70, 30, 10, 5}; + assertFetchVolumes(expected4, results.getFirst()); + } + + private static void assertFetchVolumes(long [] expected, Long [] results) { + assertEquals("the lengths of volumes are mismatch", expected.length, results.length); + + for (int i = 0; i < expected.length; i++) { + assertTrue(expected[i] + " is expected, but " + results[i], expected[i] == results[i]); + } + } }
