Repository: tajo Updated Branches: refs/heads/master a5de83720 -> 45559ce60
TAJO-969: Distributed sort on a large data set may result in incorrect results. Closes #87 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/45559ce6 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/45559ce6 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/45559ce6 Branch: refs/heads/master Commit: 45559ce60c1da55ac7be1a5900afab45cc9deb06 Parents: a5de837 Author: Hyunsik Choi <[email protected]> Authored: Wed Jul 23 18:13:44 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Wed Jul 23 18:13:56 2014 +0900 ---------------------------------------------------------------------- .../tajo/master/querymaster/QueryUnit.java | 10 +++++++- .../tajo/master/querymaster/Repartitioner.java | 10 ++++++-- .../java/org/apache/tajo/worker/FetchImpl.java | 24 +++++++++++++++++++- 3 files changed, 40 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/45559ce6/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java index 806c0f1..8c953bd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java @@ -626,7 +626,7 @@ public class QueryUnit implements EventHandler<TaskEvent> { return this.intermediateData; } - public static class PullHost { + public static class PullHost implements Cloneable { String host; int port; public PullHost(String pullServerAddr, int pullServerPort){ @@ -659,6 +659,14 @@ public class QueryUnit implements EventHandler<TaskEvent> { return false; } + + @Override + public PullHost clone() throws CloneNotSupportedException { + PullHost newPullHost = (PullHost) super.clone(); + newPullHost.host = host; + newPullHost.port = port; + return newPullHost; + } } public static class IntermediateEntry { http://git-wip-us.apache.org/repos/asf/tajo/blob/45559ce6/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 055e9a2..31c520f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -636,8 +636,14 @@ public class Repartitioner { for (FetchImpl fetch: fetches) { String rangeParam = TupleUtil.rangeToQuery(ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0, encoder); - fetch.setRangeParams(rangeParam); - fetchSet.add(fetch); + FetchImpl copy = null; + try { + copy = fetch.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + copy.setRangeParams(rangeParam); + fetchSet.add(copy); } map.put(ranges[i], fetchSet); } http://git-wip-us.apache.org/repos/asf/tajo/blob/45559ce6/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java index 9d1f428..869c106 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java @@ -20,6 +20,7 @@ package org.apache.tajo.worker; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.ipc.TajoWorkerProtocol; @@ -33,7 +34,7 @@ import java.util.List; /** * <code>FetchImpl</code> information to indicate the locations of intermediate data. */ -public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto> { +public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cloneable { private TajoWorkerProtocol.FetchProto.Builder builder = null; private QueryUnit.PullHost host; // The pull server host information @@ -110,6 +111,7 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto> { builder.setPartitionId(partitionId); builder.setHasNext(hasNext); builder.setName(name); + if (rangeParams != null && !rangeParams.isEmpty()) { builder.setRangeParams(rangeParams); } @@ -198,4 +200,24 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto> { public List<Integer> getAttemptIds() { return attemptIds; } + + public FetchImpl clone() throws CloneNotSupportedException { + FetchImpl newFetchImpl = (FetchImpl) super.clone(); + + newFetchImpl.builder = TajoWorkerProtocol.FetchProto.newBuilder(); + newFetchImpl.host = host.clone(); + newFetchImpl.type = type; + newFetchImpl.executionBlockId = executionBlockId; + newFetchImpl.partitionId = partitionId; + newFetchImpl.name = name; + newFetchImpl.rangeParams = rangeParams; + newFetchImpl.hasNext = hasNext; + if (taskIds != null) { + newFetchImpl.taskIds = Lists.newArrayList(taskIds); + } + if (attemptIds != null) { + newFetchImpl.attemptIds = Lists.newArrayList(attemptIds); + } + return newFetchImpl; + } }
