Repository: helix
Updated Branches:
  refs/heads/helix-0.6.x 7147ec874 -> 9fc6c540b


[HELIX-635] GenericTaskAssignmentCalculator rebalance with consistent hashing

1. Implement consistent hashing mapping calculation
2. Remove reassign logics and applied in consistent hashing
3. Add tests for GenericTaskAssignmentCalculator


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0a18726f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0a18726f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0a18726f

Branch: refs/heads/helix-0.6.x
Commit: 0a18726fcad7b8a0fe5e77d7a2c9848b86461ccc
Parents: 7147ec8
Author: Junkai Xue <j...@linkedin.com>
Authored: Tue Sep 13 15:28:57 2016 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Tue Sep 13 15:59:24 2016 -0700

----------------------------------------------------------------------
 .../strategy/CrushRebalanceStrategy.java        |   2 +-
 .../crushMapping/CRUSHPlacementAlgorithm.java   |   1 +
 .../strategy/crushMapping/JenkinsHash.java      | 140 -----------
 .../task/GenericTaskAssignmentCalculator.java   | 238 +++++++------------
 .../java/org/apache/helix/util/JenkinsHash.java | 140 +++++++++++
 .../TestGenericTaskAssignmentCalculator.java    | 171 +++++++++++++
 .../task/TestIndependentTaskRebalancer.java     |   8 +-
 7 files changed, 405 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
index a8fe107..b91d26c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -24,7 +24,7 @@ import com.google.common.base.Predicates;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
 import 
org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm;
-import 
org.apache.helix.controller.rebalancer.strategy.crushMapping.JenkinsHash;
+import org.apache.helix.util.JenkinsHash;
 import org.apache.helix.controller.rebalancer.topology.Node;
 import org.apache.helix.controller.rebalancer.topology.Topology;
 import org.apache.helix.controller.stages.ClusterDataCache;

http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
index 870656c..b7c1c68 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.util.JenkinsHash;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
deleted file mode 100644
index 66566f8..0000000
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Copyright 2013 Twitter, Inc.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.helix.controller.rebalancer.strategy.crushMapping;
-
-public class JenkinsHash {
-  // max value to limit it to 4 bytes
-  private static final long MAX_VALUE = 0xFFFFFFFFL;
-  private static final long CRUSH_HASH_SEED = 1315423911L;
-
-  /**
-   * Convert a byte into a long value without making it negative.
-   */
-  private static long byteToLong(byte b) {
-    long val = b & 0x7F;
-    if ((b & 0x80) != 0) {
-      val += 128;
-    }
-    return val;
-  }
-
-  /**
-   * Do addition and turn into 4 bytes.
-   */
-  private static long add(long val, long add) {
-    return (val + add) & MAX_VALUE;
-  }
-
-  /**
-   * Do subtraction and turn into 4 bytes.
-   */
-  private static long subtract(long val, long subtract) {
-    return (val - subtract) & MAX_VALUE;
-  }
-
-  /**
-   * Left shift val by shift bits and turn in 4 bytes.
-   */
-  private static long xor(long val, long xor) {
-    return (val ^ xor) & MAX_VALUE;
-  }
-
-  /**
-   * Left shift val by shift bits.  Cut down to 4 bytes.
-   */
-  private static long leftShift(long val, int shift) {
-    return (val << shift) & MAX_VALUE;
-  }
-
-  /**
-   * Convert 4 bytes from the buffer at offset into a long value.
-   */
-  private static long fourByteToLong(byte[] bytes, int offset) {
-    return (byteToLong(bytes[offset + 0])
-        + (byteToLong(bytes[offset + 1]) << 8)
-        + (byteToLong(bytes[offset + 2]) << 16)
-        + (byteToLong(bytes[offset + 3]) << 24));
-  }
-
-  /**
-   * Mix up the values in the hash function.
-   */
-  private static Triple hashMix(Triple t) {
-    long a = t.a; long b = t.b; long c = t.c;
-    a = subtract(a, b); a = subtract(a, c); a = xor(a, c >> 13);
-    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 8));
-    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 13));
-    a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 12));
-    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 16));
-    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 5));
-    a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 3));
-    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 10));
-    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 15));
-    return new Triple(a, b, c);
-  }
-
-  private static class Triple {
-    long a;
-    long b;
-    long c;
-
-    public Triple(long a, long b, long c) {
-      this.a = a; this.b = b; this.c = c;
-    }
-  }
-
-  public long hash(long a) {
-    long hash = xor(CRUSH_HASH_SEED, a);
-    long b = a;
-    long x = 231232L;
-    long y = 1232L;
-    Triple val = hashMix(new Triple(b, x, hash));
-    b = val.a; x = val.b; hash = val.c;
-    val = hashMix(new Triple(y, a, hash));
-    hash = val.c;
-    return hash;
-  }
-
-  public long hash(long a, long b) {
-    long hash = xor(xor(CRUSH_HASH_SEED, a), b);
-    long x = 231232L;
-    long y = 1232L;
-    Triple val = hashMix(new Triple(a, b, hash));
-    a = val.a; b = val.b; hash = val.c;
-    val = hashMix(new Triple(x, a, hash));
-    x = val.a; a = val.b; hash = val.c;
-    val = hashMix(new Triple(b, y, hash));
-    hash = val.c;
-    return hash;
-  }
-
-  public long hash(long a, long b, long c) {
-    long hash = xor(xor(xor(CRUSH_HASH_SEED, a), b), c);
-    long x = 231232L;
-    long y = 1232L;
-    Triple val = hashMix(new Triple(a, b, hash));
-    a = val.a; b = val.b; hash = val.c;
-    val = hashMix(new Triple(c, x, hash));
-    c = val.a; x = val.b; hash = val.c;
-    val = hashMix(new Triple(y, a, hash));
-    y = val.a; a = val.b; hash = val.c;
-    val = hashMix(new Triple(b, x, hash));
-    b = val.a; x = val.b; hash = val.c;
-    val = hashMix(new Triple(y, c, hash));
-    hash = val.c;
-    return hash;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
 
b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index ac96768..fbc7af3 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -19,30 +19,26 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
-import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixException;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.util.JenkinsHash;
 import org.apache.log4j.Logger;
 
-import com.google.common.base.Function;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -54,9 +50,6 @@ import com.google.common.collect.Sets;
 public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
   private static final Logger LOG = 
Logger.getLogger(GenericTaskAssignmentCalculator.class);
 
-  /** Reassignment policy for this algorithm */
-  private RetryPolicy _retryPolicy = new DefaultRetryReassigner();
-
   @Override
   public Set<Integer> getAllTaskPartitions(JobConfig jobCfg, JobContext jobCtx,
       WorkflowConfig workflowCfg, WorkflowContext workflowCtx, 
ClusterDataCache cache) {
@@ -96,14 +89,7 @@ public class GenericTaskAssignmentCalculator extends 
TaskAssignmentCalculator {
     // Transform from partition id to fully qualified partition name
     List<Integer> partitionNums = Lists.newArrayList(partitionSet);
     Collections.sort(partitionNums);
-    final String resourceId = prevAssignment.getResourceName();
-    List<String> partitions =
-        new ArrayList<String>(Lists.transform(partitionNums, new 
Function<Integer, String>() {
-          @Override
-          public String apply(Integer partitionNum) {
-            return resourceId + "_" + partitionNum;
-          }
-        }));
+    String resourceId = prevAssignment.getResourceName();
 
     // Compute the current assignment
     Map<String, Map<String, String>> currentMapping = Maps.newHashMap();
@@ -122,156 +108,108 @@ public class GenericTaskAssignmentCalculator extends 
TaskAssignmentCalculator {
     }
 
     // Get the assignment keyed on partition
-    RebalanceStrategy strategy = new AutoRebalanceStrategy(resourceId, 
partitions, states);
-    List<String> allNodes =
-        Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, 
instances, cache));
-    Collections.sort(allNodes);
-    ZNRecord record =
-        strategy.computePartitionAssignment(allNodes, allNodes, 
currentMapping, cache);
-    Map<String, List<String>> preferenceLists = record.getListFields();
-
-    // Convert to an assignment keyed on participant
-    Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
-    for (Map.Entry<String, List<String>> e : preferenceLists.entrySet()) {
-      String partitionName = e.getKey();
-      partitionName = String.valueOf(TaskUtil.getPartitionId(partitionName));
-      List<String> preferenceList = e.getValue();
-      for (String participantName : preferenceList) {
-        if (!taskAssignment.containsKey(participantName)) {
-          taskAssignment.put(participantName, new TreeSet<Integer>());
-        }
-        
taskAssignment.get(participantName).add(Integer.valueOf(partitionName));
-      }
+    if (jobCfg.getTargetResource() != null) {
+      LOG.error(
+          "Target resource is not null, should call 
FixedTaskAssignmentCalculator, target resource : "
+              + jobCfg.getTargetResource());
+      return new HashMap<String, SortedSet<Integer>>();
     }
 
-    // Finally, adjust the assignment if tasks have been failing
-    taskAssignment = _retryPolicy.reassign(jobCfg, jobContext, allNodes, 
taskAssignment);
+    List<String> allNodes = Lists.newArrayList(instances);
+    ConsistentHashingPlacement placement = new 
ConsistentHashingPlacement(allNodes);
+    Map<String, SortedSet<Integer>> taskAssignment =
+        placement.computeMapping(jobCfg, jobContext, partitionNums, 
resourceId);
+
     return taskAssignment;
   }
 
-  /**
-   * Filter a list of instances based on targeted resource policies
-   * @param jobCfg the job configuration
-   * @param currStateOutput the current state of all instances in the cluster
-   * @param instances valid instances
-   * @param cache current snapshot of the cluster
-   * @return a set of instances that can be assigned to
-   */
-  private Set<String> getEligibleInstances(JobConfig jobCfg, 
CurrentStateOutput currStateOutput,
-      Iterable<String> instances, ClusterDataCache cache) {
-    // No target resource means any instance is available
-    Set<String> allInstances = Sets.newHashSet(instances);
-    String targetResource = jobCfg.getTargetResource();
-    if (targetResource == null) {
-      return allInstances;
-    }
+  private class ConsistentHashingPlacement {
+    private JenkinsHash _hashFunction;
+    private ConsistentHashSelector _selector;
+    private int _numInstances;
 
-    // Bad ideal state means don't assign
-    IdealState idealState = cache.getIdealState(targetResource);
-    if (idealState == null) {
-      return Collections.emptySet();
+    public ConsistentHashingPlacement(List<String> potentialInstances) {
+      _hashFunction = new JenkinsHash();
+      _selector = new ConsistentHashSelector(potentialInstances);
+      _numInstances = potentialInstances.size();
     }
 
-    // Get the partitions on the target resource to use
-    Set<String> partitions = idealState.getPartitionSet();
-    List<String> targetPartitions = jobCfg.getTargetPartitions();
-    if (targetPartitions != null && !targetPartitions.isEmpty()) {
-      partitions.retainAll(targetPartitions);
-    }
+    public Map<String, SortedSet<Integer>> computeMapping(JobConfig jobConfig,
+        JobContext jobContext, List<Integer> partitions, String resourceId) {
+      if (_numInstances == 0) {
+        return new HashMap<String, SortedSet<Integer>>();
+      }
+
+      Map<String, SortedSet<Integer>> taskAssignment = Maps.newHashMap();
+
+      for (int partition : partitions) {
+        long hashedValue = new String(resourceId + "_" + partition).hashCode();
+        int shiftTimes;
+        int numAttempts = jobContext.getPartitionNumAttempts(partition);
+        int maxAttempts = jobConfig.getMaxAttemptsPerTask();
 
-    // Based on state matches, add eligible instances
-    Set<String> eligibleInstances = Sets.newHashSet();
-    Set<String> targetStates = jobCfg.getTargetPartitionStates();
-    for (String partition : partitions) {
-      Map<String, String> stateMap =
-          currStateOutput.getCurrentStateMap(targetResource, new 
Partition(partition));
-      Map<String, String> pendingStateMap =
-          currStateOutput.getPendingStateMap(targetResource, new 
Partition(partition));
-      for (Map.Entry<String, String> e : stateMap.entrySet()) {
-        String instanceName = e.getKey();
-        String state = e.getValue();
-        String pending = pendingStateMap.get(instanceName);
-        if (pending != null) {
-          continue;
+        if (jobConfig.getMaxAttemptsPerTask() < _numInstances) {
+          shiftTimes = numAttempts == -1 ? 0 : numAttempts;
+        } else {
+          shiftTimes = (maxAttempts == 0)
+              ? 0
+              : jobContext.getPartitionNumAttempts(partition) / (maxAttempts / 
_numInstances);
+        }
+        // Hash the value based on the shifting time. The default shift time 
will be 0.
+        for (int i = 0; i <= shiftTimes; i++) {
+          hashedValue = _hashFunction.hash(hashedValue);
         }
-        if (targetStates == null || targetStates.isEmpty() || 
targetStates.contains(state)) {
-          eligibleInstances.add(instanceName);
+        String selectedInstance = select(hashedValue);
+        if (selectedInstance != null) {
+          if (!taskAssignment.containsKey(selectedInstance)) {
+            taskAssignment.put(selectedInstance, new TreeSet<Integer>());
+          }
+          taskAssignment.get(selectedInstance).add(partition);
         }
       }
+      return taskAssignment;
     }
-    allInstances.retainAll(eligibleInstances);
-    return allInstances;
-  }
 
-  public interface RetryPolicy {
-    /**
-     * Adjust the assignment to allow for reassignment if a task keeps failing 
where it's currently
-     * assigned
-     * @param jobCfg the job configuration
-     * @param jobCtx the job context
-     * @param instances instances that can serve tasks
-     * @param origAssignment the unmodified assignment
-     * @return the adjusted assignment
-     */
-    Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, JobContext 
jobCtx,
-        Collection<String> instances, Map<String, SortedSet<Integer>> 
origAssignment);
-  }
+    private String select(long data) throws HelixException {
+      return _selector.get(data);
+    }
+
+    private class ConsistentHashSelector {
+      private final static int DEFAULT_TOKENS_PER_INSTANCE = 1000;
+      private final SortedMap<Long, String> circle = new TreeMap<Long, 
String>();
+      protected int instanceSize = 0;
 
-  private static class DefaultRetryReassigner implements RetryPolicy {
-    @Override
-    public Map<String, SortedSet<Integer>> reassign(JobConfig jobCfg, 
JobContext jobCtx,
-        Collection<String> instances, Map<String, SortedSet<Integer>> 
origAssignment) {
-      // Compute an increasing integer ID for each instance
-      BiMap<String, Integer> instanceMap = HashBiMap.create(instances.size());
-      int instanceIndex = 0;
-      for (String instance : instances) {
-        instanceMap.put(instance, instanceIndex++);
+      public ConsistentHashSelector(List<String> instances) {
+        for (String instance : instances) {
+          long tokenCount = DEFAULT_TOKENS_PER_INSTANCE;
+          add(instance, tokenCount);
+          instanceSize++;
+        }
       }
 
-      // Move partitions
-      Map<String, SortedSet<Integer>> newAssignment = Maps.newHashMap();
-      for (Map.Entry<String, SortedSet<Integer>> e : 
origAssignment.entrySet()) {
-        String instance = e.getKey();
-        SortedSet<Integer> partitions = e.getValue();
-        Integer instanceId = instanceMap.get(instance);
-        if (instanceId != null) {
-          for (int p : partitions) {
-            // Determine for each partition if there have been failures with 
the current assignment
-            // strategy, and if so, force a shift in assignment for that 
partition only
-            int shiftValue = getNumInstancesToShift(jobCfg, jobCtx, instances, 
p);
-            int newInstanceId = (instanceId + shiftValue) % instances.size();
-            String newInstance = instanceMap.inverse().get(newInstanceId);
-            if (newInstance == null) {
-              newInstance = instance;
-            }
-            if (!newAssignment.containsKey(newInstance)) {
-              newAssignment.put(newInstance, new TreeSet<Integer>());
-            }
-            newAssignment.get(newInstance).add(p);
-          }
-        } else {
-          // In case something goes wrong, just keep the previous assignment
-          newAssignment.put(instance, partitions);
+      public void add(String instance, long numberOfReplicas) {
+        for (int i = 0; i < numberOfReplicas; i++) {
+          circle.put(_hashFunction.hash(instance.hashCode(), i), instance);
+        }
+      }
+
+      public void remove(String instance, long numberOfReplicas) {
+        for (int i = 0; i < numberOfReplicas; i++) {
+          circle.remove(_hashFunction.hash(instance.hashCode(), i));
         }
       }
-      return newAssignment;
-    }
 
-    /**
-     * In case tasks fail, we may not want to schedule them in the same place. 
This method allows us
-     * to compute a shifting value so that we can systematically choose other 
instances to try
-     * @param jobCfg the job configuration
-     * @param jobCtx the job context
-     * @param instances instances that can be chosen
-     * @param p the partition to look up
-     * @return the shifting value
-     */
-    private int getNumInstancesToShift(JobConfig jobCfg, JobContext jobCtx,
-        Collection<String> instances, int p) {
-      int numAttempts = jobCtx.getPartitionNumAttempts(p);
-      int maxNumAttempts = jobCfg.getMaxAttemptsPerTask();
-      int numInstances = Math.min(instances.size(), 
jobCfg.getMaxForcedReassignmentsPerTask() + 1);
-      return numAttempts / (maxNumAttempts / numInstances);
+      public String get(long data) {
+        if (circle.isEmpty()) {
+          return null;
+        }
+        long hash = _hashFunction.hash(data);
+        if (!circle.containsKey(hash)) {
+          SortedMap<Long, String> tailMap = circle.tailMap(hash);
+          hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
+        }
+        return circle.get(hash);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java 
b/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java
new file mode 100644
index 0000000..3ccd1f4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/util/JenkinsHash.java
@@ -0,0 +1,140 @@
+/**
+ * Copyright 2013 Twitter, Inc.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.util;
+
+public class JenkinsHash {
+  // max value to limit it to 4 bytes
+  private static final long MAX_VALUE = 0xFFFFFFFFL;
+  private static final long CRUSH_HASH_SEED = 1315423911L;
+
+  /**
+   * Convert a byte into a long value without making it negative.
+   */
+  private static long byteToLong(byte b) {
+    long val = b & 0x7F;
+    if ((b & 0x80) != 0) {
+      val += 128;
+    }
+    return val;
+  }
+
+  /**
+   * Do addition and turn into 4 bytes.
+   */
+  private static long add(long val, long add) {
+    return (val + add) & MAX_VALUE;
+  }
+
+  /**
+   * Do subtraction and turn into 4 bytes.
+   */
+  private static long subtract(long val, long subtract) {
+    return (val - subtract) & MAX_VALUE;
+  }
+
+  /**
+   * Left shift val by shift bits and turn in 4 bytes.
+   */
+  private static long xor(long val, long xor) {
+    return (val ^ xor) & MAX_VALUE;
+  }
+
+  /**
+   * Left shift val by shift bits.  Cut down to 4 bytes.
+   */
+  private static long leftShift(long val, int shift) {
+    return (val << shift) & MAX_VALUE;
+  }
+
+  /**
+   * Convert 4 bytes from the buffer at offset into a long value.
+   */
+  private static long fourByteToLong(byte[] bytes, int offset) {
+    return (byteToLong(bytes[offset + 0])
+        + (byteToLong(bytes[offset + 1]) << 8)
+        + (byteToLong(bytes[offset + 2]) << 16)
+        + (byteToLong(bytes[offset + 3]) << 24));
+  }
+
+  /**
+   * Mix up the values in the hash function.
+   */
+  private static Triple hashMix(Triple t) {
+    long a = t.a; long b = t.b; long c = t.c;
+    a = subtract(a, b); a = subtract(a, c); a = xor(a, c >> 13);
+    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 8));
+    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 13));
+    a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 12));
+    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 16));
+    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 5));
+    a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 3));
+    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 10));
+    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 15));
+    return new Triple(a, b, c);
+  }
+
+  private static class Triple {
+    long a;
+    long b;
+    long c;
+
+    public Triple(long a, long b, long c) {
+      this.a = a; this.b = b; this.c = c;
+    }
+  }
+
+  public long hash(long a) {
+    long hash = xor(CRUSH_HASH_SEED, a);
+    long b = a;
+    long x = 231232L;
+    long y = 1232L;
+    Triple val = hashMix(new Triple(b, x, hash));
+    b = val.a; x = val.b; hash = val.c;
+    val = hashMix(new Triple(y, a, hash));
+    hash = val.c;
+    return hash;
+  }
+
+  public long hash(long a, long b) {
+    long hash = xor(xor(CRUSH_HASH_SEED, a), b);
+    long x = 231232L;
+    long y = 1232L;
+    Triple val = hashMix(new Triple(a, b, hash));
+    a = val.a; b = val.b; hash = val.c;
+    val = hashMix(new Triple(x, a, hash));
+    x = val.a; a = val.b; hash = val.c;
+    val = hashMix(new Triple(b, y, hash));
+    hash = val.c;
+    return hash;
+  }
+
+  public long hash(long a, long b, long c) {
+    long hash = xor(xor(xor(CRUSH_HASH_SEED, a), b), c);
+    long x = 231232L;
+    long y = 1232L;
+    Triple val = hashMix(new Triple(a, b, hash));
+    a = val.a; b = val.b; hash = val.c;
+    val = hashMix(new Triple(c, x, hash));
+    c = val.a; x = val.b; hash = val.c;
+    val = hashMix(new Triple(y, a, hash));
+    y = val.a; a = val.b; hash = val.c;
+    val = hashMix(new Triple(b, x, hash));
+    b = val.a; x = val.b; hash = val.c;
+    val = hashMix(new Triple(y, c, hash));
+    hash = val.c;
+    return hash;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
new file mode 100644
index 0000000..0410db2
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestGenericTaskAssignmentCalculator.java
@@ -0,0 +1,171 @@
+package org.apache.helix.integration.task;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Sets;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class TestGenericTaskAssignmentCalculator extends TaskTestBase {
+  private Set<String> _invokedClasses = Sets.newHashSet();
+  private Map<String, Integer> _runCounts = Maps.newHashMap();
+  private TaskConfig _taskConfig;
+  private Map<String, String> _jobCommandMap;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursive(namespace);
+    }
+
+    // Setup cluster and instances
+    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
+    setupTool.addCluster(CLUSTER_NAME, true);
+    for (int i = 0; i < _numNodes; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+      setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < _numNodes; i++) {
+      final String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
+
+      // Set task callbacks
+      Map<String, TaskFactory> taskFactoryReg = new HashMap<String, 
TaskFactory>();
+
+      taskFactoryReg.put("TaskOne", new TaskFactory() {
+        @Override public Task createNewTask(TaskCallbackContext context) {
+          return new TaskOne(context, instanceName);
+        }
+      });
+
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, 
instanceName);
+
+      // Register a Task state model factory.
+      StateMachineEngine stateMachine = 
_participants[i].getStateMachineEngine();
+      stateMachine.registerStateModelFactory("Task",
+          new TaskStateModelFactory(_participants[i], taskFactoryReg));
+      _participants[i].syncStart();
+    }
+
+    // Start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    // Start an admin connection
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, 
ZK_ADDR);
+    _manager.connect();
+    _driver = new TaskDriver(_manager);
+
+    Map<String, String> taskConfigMap = Maps.newHashMap();
+    _taskConfig = new TaskConfig("TaskOne", taskConfigMap, false);
+    _jobCommandMap = Maps.newHashMap();
+  }
+
+  @Test
+  public void testMultipleJobAssignment() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(1);
+    taskConfigs.add(_taskConfig);
+    JobConfig.Builder jobBuilder =
+        new 
JobConfig.Builder().setCommand("DummyCommand").addTaskConfigs(taskConfigs)
+            .setJobCommandConfigMap(_jobCommandMap);
+
+    for (int i = 0; i < 25; i++) {
+      workflowBuilder.addJob("JOB" + i, jobBuilder);
+    }
+
+    _driver.start(workflowBuilder.build());
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+    Assert.assertEquals(_runCounts.size(), 5);
+  }
+
+  @Test
+  public void testMultipleTaskAssignment() throws InterruptedException {
+    String workflowName = TestHelper.getTestMethodName();
+    Workflow.Builder workflowBuilder = new Workflow.Builder(workflowName);
+
+    List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(20);
+    for (int i = 0; i < 50; i++) {
+      Map<String, String> taskConfigMap = Maps.newHashMap();
+      taskConfigs.add(new TaskConfig("TaskOne", taskConfigMap, false));
+    }
+    JobConfig.Builder jobBuilder =
+        new 
JobConfig.Builder().setCommand("DummyCommand").setJobCommandConfigMap(_jobCommandMap)
+            .addTaskConfigs(taskConfigs);
+    workflowBuilder.addJob("JOB", jobBuilder);
+    _driver.start(workflowBuilder.build());
+    _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
+
+    Assert.assertEquals(_runCounts.size(), 5);
+  }
+
+  private class TaskOne extends MockTask {
+    private final String _instanceName;
+
+    public TaskOne(TaskCallbackContext context, String instanceName) {
+      super(context);
+
+      // Initialize the count for this instance if not already done
+      if (!_runCounts.containsKey(instanceName)) {
+        _runCounts.put(instanceName, 0);
+      }
+      _instanceName = instanceName;
+    }
+
+    @Override
+    public TaskResult run() {
+      _invokedClasses.add(getClass().getName());
+      _runCounts.put(_instanceName, _runCounts.get(_instanceName) + 1);
+      return new TaskResult(TaskResult.Status.COMPLETED, "");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/0a18726f/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
index 49b4bf4..c4d588c 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/task/TestIndependentTaskRebalancer.java
@@ -215,12 +215,12 @@ public class TestIndependentTaskRebalancer extends 
TaskTestBase {
   }
 
   @Test public void testReassignment() throws Exception {
-    final int NUM_INSTANCES = 2;
+    final int NUM_INSTANCES = 5;
     String jobName = TestHelper.getTestMethodName();
     Workflow.Builder workflowBuilder = new Workflow.Builder(jobName);
     List<TaskConfig> taskConfigs = Lists.newArrayListWithCapacity(2);
-    Map<String, String> taskConfigMap = Maps.newHashMap(
-        ImmutableMap.of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX 
+ '_' + _startPort));
+    Map<String, String> taskConfigMap = Maps.newHashMap(ImmutableMap
+        .of("fail", "" + true, "failInstance", PARTICIPANT_PREFIX + '_' + 
(_startPort + 1)));
     TaskConfig taskConfig1 = new TaskConfig("TaskOne", taskConfigMap, false);
     taskConfigs.add(taskConfig1);
     Map<String, String> jobCommandMap = Maps.newHashMap();
@@ -242,7 +242,7 @@ public class TestIndependentTaskRebalancer extends 
TaskTestBase {
 
     // Ensure that this was tried on two different instances, the first of 
which exhausted the
     // attempts number, and the other passes on the first try
-    Assert.assertEquals(_runCounts.size(), NUM_INSTANCES);
+    Assert.assertEquals(_runCounts.size(), 2);
     Assert.assertTrue(
         _runCounts.values().contains(JobConfig.DEFAULT_MAX_ATTEMPTS_PER_TASK / 
NUM_INSTANCES));
     Assert.assertTrue(_runCounts.values().contains(1));

Reply via email to