[
https://issues.apache.org/jira/browse/TAJO-992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14100305#comment-14100305
]
ASF GitHub Bot commented on TAJO-992:
-------------------------------------
Github user hyunsik commented on a diff in the pull request:
https://github.com/apache/tajo/pull/115#discussion_r16338083
--- Diff:
tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
---
@@ -923,20 +960,62 @@ public static void
scheduleScatteredHashShuffleFetches(TaskSchedulerContext sche
LOG.info(subQuery.getId()
+ ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name()
- + ", DeterminedTaskNum : " + fetches.size());
+ + ", Intermediate Size: " + totalIntermediateSize
+ + ", splitSize: " + splitVolume
+ + ", DeterminedTaskNum: " + fetches.size());
}
- static class IntermediateEntryComparator implements
Comparator<IntermediateEntry> {
+ /**
+ * If a IntermediateEntry is large than splitVolume, List<FetchImpl> has
single element.
+ * @param ebId
+ * @param entries
+ * @param splitVolume
+ * @return
+ */
+ public static List<List<FetchImpl>> splitOrMergeIntermediates(
+ ExecutionBlockId ebId, List<IntermediateEntry> entries, long
splitVolume, long pageSize) {
+ // Each List<FetchImpl> has splitVolume size.
+ List<List<FetchImpl>> fetches = new ArrayList<List<FetchImpl>>();
+
+ Iterator<IntermediateEntry> iter = entries.iterator();
+ if (!iter.hasNext()) {
+ return null;
+ }
+ List<FetchImpl> fetchListForSingleTask = new ArrayList<FetchImpl>();
+ long fetchListVolume = 0;
- @Override
- public int compare(IntermediateEntry o1, IntermediateEntry o2) {
- int cmp = Ints.compare(o1.getPartId(), o2.getPartId());
- if (cmp != 0) {
- return cmp;
+ while (iter.hasNext()) {
+ IntermediateEntry currentInterm = iter.next();
+
+ long firstSplitVolume = splitVolume - fetchListVolume;
+ if (firstSplitVolume < pageSize) {
+ firstSplitVolume = splitVolume;
+ }
+ List<Pair<Long, Long>> splits =
currentInterm.split(firstSplitVolume, splitVolume);
--- End diff --
Could you add some comments what each long of a pair means?
> Reduce number of hash shuffle output file.
> ------------------------------------------
>
> Key: TAJO-992
> URL: https://issues.apache.org/jira/browse/TAJO-992
> Project: Tajo
> Issue Type: Sub-task
> Components: data shuffle
> Reporter: Hyoungjun Kim
> Assignee: Hyoungjun Kim
>
> Currently Tajo creates too many intermediate files in the case of hash
> shuffle. A execution block(SubQuery) on a TajoWorker creates intermediate
> files as following rule:
> # intermediate files in a worker = # tasks / # workers * # partitions
> This may cause 'too many file opens' error and makes it difficult to scale
> out. To solve this problem, We should reduce number of hash shuffle output
> file.
--
This message was sent by Atlassian JIRA
(v6.2#6252)