Repository: tajo
Updated Branches:
  refs/heads/master a5de83720 -> 45559ce60


TAJO-969: Distributed sort on a large data set may result in incorrect results.

Closes #87


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/45559ce6
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/45559ce6
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/45559ce6

Branch: refs/heads/master
Commit: 45559ce60c1da55ac7be1a5900afab45cc9deb06
Parents: a5de837
Author: Hyunsik Choi <[email protected]>
Authored: Wed Jul 23 18:13:44 2014 +0900
Committer: Hyunsik Choi <[email protected]>
Committed: Wed Jul 23 18:13:56 2014 +0900

----------------------------------------------------------------------
 .../tajo/master/querymaster/QueryUnit.java      | 10 +++++++-
 .../tajo/master/querymaster/Repartitioner.java  | 10 ++++++--
 .../java/org/apache/tajo/worker/FetchImpl.java  | 24 +++++++++++++++++++-
 3 files changed, 40 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/45559ce6/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 806c0f1..8c953bd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -626,7 +626,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     return this.intermediateData;
   }
 
-  public static class PullHost {
+  public static class PullHost implements Cloneable {
     String host;
     int port;
     public PullHost(String pullServerAddr, int pullServerPort){
@@ -659,6 +659,14 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 
       return false;
     }
+
+    @Override
+    public PullHost clone() throws CloneNotSupportedException {
+      PullHost newPullHost = (PullHost) super.clone();
+      newPullHost.host = host;
+      newPullHost.port = port;
+      return newPullHost;
+    }
   }
 
   public static class IntermediateEntry {

http://git-wip-us.apache.org/repos/asf/tajo/blob/45559ce6/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 055e9a2..31c520f 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -636,8 +636,14 @@ public class Repartitioner {
         for (FetchImpl fetch: fetches) {
           String rangeParam =
               TupleUtil.rangeToQuery(ranges[i], ascendingFirstKey ? i == 
(ranges.length - 1) : i == 0, encoder);
-          fetch.setRangeParams(rangeParam);
-          fetchSet.add(fetch);
+          FetchImpl copy = null;
+          try {
+            copy = fetch.clone();
+          } catch (CloneNotSupportedException e) {
+            throw new RuntimeException(e);
+          }
+          copy.setRangeParams(rangeParam);
+          fetchSet.add(copy);
         }
         map.put(ranges[i], fetchSet);
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/45559ce6/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index 9d1f428..869c106 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -20,6 +20,7 @@ package org.apache.tajo.worker;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -33,7 +34,7 @@ import java.util.List;
 /**
  * <code>FetchImpl</code> information to indicate the locations of 
intermediate data.
  */
-public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto> {
+public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, 
Cloneable {
   private TajoWorkerProtocol.FetchProto.Builder builder = null;
 
   private QueryUnit.PullHost host;             // The pull server host 
information
@@ -110,6 +111,7 @@ public class FetchImpl implements 
ProtoObject<TajoWorkerProtocol.FetchProto> {
     builder.setPartitionId(partitionId);
     builder.setHasNext(hasNext);
     builder.setName(name);
+
     if (rangeParams != null && !rangeParams.isEmpty()) {
       builder.setRangeParams(rangeParams);
     }
@@ -198,4 +200,24 @@ public class FetchImpl implements 
ProtoObject<TajoWorkerProtocol.FetchProto> {
   public List<Integer> getAttemptIds() {
     return attemptIds;
   }
+
+  public FetchImpl clone() throws CloneNotSupportedException {
+    FetchImpl newFetchImpl = (FetchImpl) super.clone();
+
+    newFetchImpl.builder = TajoWorkerProtocol.FetchProto.newBuilder();
+    newFetchImpl.host = host.clone();
+    newFetchImpl.type = type;
+    newFetchImpl.executionBlockId = executionBlockId;
+    newFetchImpl.partitionId = partitionId;
+    newFetchImpl.name = name;
+    newFetchImpl.rangeParams = rangeParams;
+    newFetchImpl.hasNext = hasNext;
+    if (taskIds != null) {
+      newFetchImpl.taskIds = Lists.newArrayList(taskIds);
+    }
+    if (attemptIds != null) {
+      newFetchImpl.attemptIds = Lists.newArrayList(attemptIds);
+    }
+    return newFetchImpl;
+  }
 }

Reply via email to