Repository: tajo Updated Branches: refs/heads/master 912df6f08 -> 189cf3ffe
TAJO-996: Sometimes, scheduleFetchesByEvenDistributedVolumes loses some FetchImpls. Closes #109 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/189cf3ff Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/189cf3ff Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/189cf3ff Branch: refs/heads/master Commit: 189cf3ffe9d1b5c625c553f27fc5be37cd7522be Parents: 912df6f Author: Hyunsik Choi <[email protected]> Authored: Mon Aug 11 10:54:20 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon Aug 11 10:54:20 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../tajo/master/querymaster/Repartitioner.java | 1 + .../java/org/apache/tajo/worker/FetchImpl.java | 23 +++++++++ .../apache/tajo/master/TestRepartitioner.java | 52 +++++++++++++++----- 4 files changed, 68 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/189cf3ff/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e1d6d03..7487983 100644 --- a/CHANGES +++ b/CHANGES @@ -106,6 +106,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-996: Sometimes, scheduleFetchesByEvenDistributedVolumes loses + some FetchImpls. (hyunsik) + TAJO-949: PullServer does not release files, when a channel throws an internal exception. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/189cf3ff/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 43d6fd2..fa1ed4c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -831,6 +831,7 @@ public class Repartitioner { while (p > 0 && iterator.hasNext()) { FetchGroupMeta fetchGroupMeta = iterator.next(); assignedVolumes[p] += fetchGroupMeta.getVolume(); + TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls); // While the current one is smaller than next one, it adds additional fetches to current one. while(iterator.hasNext() && assignedVolumes[p - 1] > assignedVolumes[p]) { http://git-wip-us.apache.org/repos/asf/tajo/blob/189cf3ff/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java index 869c106..7baae64 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java @@ -26,6 +26,7 @@ import org.apache.tajo.common.ProtoObject; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.querymaster.QueryUnit; import org.apache.tajo.master.querymaster.Repartitioner; +import org.apache.tajo.util.TUtil; import java.net.URI; import java.util.ArrayList; @@ -220,4 +221,26 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl } return newFetchImpl; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FetchImpl fetch = (FetchImpl) o; + + return TUtil.checkEquals(hasNext, fetch.hasNext) && + TUtil.checkEquals(partitionId, fetch.partitionId) && + TUtil.checkEquals(attemptIds, fetch.attemptIds) && + TUtil.checkEquals(executionBlockId, fetch.executionBlockId) && + TUtil.checkEquals(host, fetch.host) && + TUtil.checkEquals(name, fetch.name) && + TUtil.checkEquals(rangeParams, fetch.rangeParams) && + TUtil.checkEquals(taskIds, fetch.taskIds) && + TUtil.checkEquals(type, fetch.type); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/189cf3ff/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java index 009c02e..29aeccd 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -19,7 +19,9 @@ package org.apache.tajo.master; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.QueryId; import org.apache.tajo.TestTajoIds; import org.apache.tajo.ipc.TajoWorkerProtocol; @@ -32,12 +34,10 @@ import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.junit.Test; import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import static junit.framework.Assert.assertEquals; +import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType; import static org.apache.tajo.master.querymaster.Repartitioner.FetchGroupMeta; import static org.junit.Assert.assertTrue; @@ -55,7 +55,7 @@ public class TestRepartitioner { intermediateEntries.add(new QueryUnit.IntermediateEntry(i, 0, partitionId, new QueryUnit.PullHost(hostName, port))); } - FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE, + FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE, sid, partitionId, intermediateEntries); fetch.setName(sid.toString()); @@ -95,38 +95,49 @@ public class TestRepartitioner { String tableName = "test1"; - fetchGroups.put(0, new FetchGroupMeta(100, new FetchImpl())); - fetchGroups.put(1, new FetchGroupMeta(80, new FetchImpl())); - fetchGroups.put(2, new FetchGroupMeta(70, new FetchImpl())); - fetchGroups.put(3, new FetchGroupMeta(30, new FetchImpl())); - fetchGroups.put(4, new FetchGroupMeta(10, new FetchImpl())); - fetchGroups.put(5, new FetchGroupMeta(5, new FetchImpl())); + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + FetchImpl [] fetches = new FetchImpl[12]; + for (int i = 0; i < 12; i++) { + fetches[i] = new FetchImpl(new QueryUnit.PullHost("localhost", 10000 + i), ShuffleType.HASH_SHUFFLE, ebId, i / 2); + } + + int [] VOLUMES = {100, 80, 70, 30, 10, 5}; + + for (int i = 0; i < 12; i += 2) { + fetchGroups.put(i, new FetchGroupMeta(VOLUMES[i / 2], fetches[i]).addFetche(fetches[i + 1])); + } Pair<Long [], Map<String, List<FetchImpl>>[]> results; results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1); long expected [] = {100 + 80 + 70 + 30 + 10 + 5}; assertFetchVolumes(expected, results.getFirst()); + assertFetchImpl(fetches, results.getSecond()); results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2); long expected0 [] = {130, 165}; assertFetchVolumes(expected0, results.getFirst()); + assertFetchImpl(fetches, results.getSecond()); results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3); long expected1 [] = {100, 95, 100}; assertFetchVolumes(expected1, results.getFirst()); + assertFetchImpl(fetches, results.getSecond()); results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4); long expected2 [] = {100, 80, 70, 45}; assertFetchVolumes(expected2, results.getFirst()); + assertFetchImpl(fetches, results.getSecond()); results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5); long expected3 [] = {100, 80, 70, 30, 15}; assertFetchVolumes(expected3, results.getFirst()); + assertFetchImpl(fetches, results.getSecond()); results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6); long expected4 [] = {100, 80, 70, 30, 10, 5}; assertFetchVolumes(expected4, results.getFirst()); + assertFetchImpl(fetches, results.getSecond()); } private static void assertFetchVolumes(long [] expected, Long [] results) { @@ -136,4 +147,23 @@ public class TestRepartitioner { assertTrue(expected[i] + " is expected, but " + results[i], expected[i] == results[i]); } } + + private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[] result) { + Set<FetchImpl> expectedURLs = Sets.newHashSet(); + + for (FetchImpl f : expected) { + expectedURLs.add(f); + } + + Set<FetchImpl> resultURLs = Sets.newHashSet(); + + for (Map<String, List<FetchImpl>> e : result) { + for (List<FetchImpl> list : e.values()) { + resultURLs.addAll(list); + } + } + + assertEquals(expectedURLs.size(), resultURLs.size()); + assertEquals(expectedURLs, resultURLs); + } }
