Repository: hive
Updated Branches:
  refs/heads/master 4340d4619 -> 83ef6f927


HIVE-14680 : retain consistent splits /during/ (as opposed to across) LLAP 
failures on top of HIVE-14589 (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/83ef6f92
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/83ef6f92
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/83ef6f92

Branch: refs/heads/master
Commit: 83ef6f9272d71e1918ffc89635709b4f81e8aba9
Parents: 4340d46
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Mon Sep 19 16:11:16 2016 -0700
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Mon Sep 19 16:11:16 2016 -0700

----------------------------------------------------------------------
 .../hive/llap/registry/ServiceInstanceSet.java  |   7 +-
 .../registry/impl/InactiveServiceInstance.java  |  77 ++++++++++
 .../registry/impl/LlapFixedRegistryImpl.java    |   2 +-
 .../impl/LlapZookeeperRegistryImpl.java         |  34 ++++-
 .../daemon/services/impl/LlapWebServices.java   |   2 +-
 .../tez/HostAffinitySplitLocationProvider.java  |  80 +++++++---
 .../apache/hadoop/hive/ql/exec/tez/Utils.java   |   8 +-
 .../TestHostAffinitySplitLocationProvider.java  | 150 +++++++++++++++----
 .../apache/hadoop/hive/serde2/SerDeUtils.java   |  11 ++
 9 files changed, 306 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
index 13b668d..1e8c895 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
@@ -14,7 +14,6 @@
 package org.apache.hadoop.hive.llap.registry;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Set;
 
 public interface ServiceInstanceSet {
@@ -32,9 +31,11 @@ public interface ServiceInstanceSet {
   /**
    * Gets a list containing all the instances. This list has the same 
iteration order across
    * different processes, assuming the list of registry entries is the same.
-   * @return
+   * @param consistentIndexes if true, also try to maintain the same exact 
index for each node
+   *                          across calls, by inserting inactive instances to 
replace the
+   *                          removed ones.
    */
-  public Collection<ServiceInstance> getAllInstancesOrdered();
+  public Collection<ServiceInstance> getAllInstancesOrdered(boolean 
consistentIndexes);
 
   /**
    * Get an instance by worker identity.

http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
new file mode 100644
index 0000000..79b7d51
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class InactiveServiceInstance implements ServiceInstance {
+  private final String name;
+  public InactiveServiceInstance(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public String getWorkerIdentity() {
+    return name;
+  }
+
+  @Override
+  public boolean isAlive() {
+    return false;
+  }
+
+  @Override
+  public String getHost() {
+    return null;
+  }
+
+  @Override
+  public int getRpcPort() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getManagementPort() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getShufflePort() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getServicesAddress() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getOutputFormatPort() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<String, String> getProperties() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Resource getResource() {
+    throw new UnsupportedOperationException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index de4d7f2..bbfcbf6 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -228,7 +228,7 @@ public class LlapFixedRegistryImpl implements 
ServiceRegistry {
     }
 
     @Override
-    public List<ServiceInstance> getAllInstancesOrdered() {
+    public List<ServiceInstance> getAllInstancesOrdered(boolean 
consistentIndexes) {
       List<ServiceInstance> list = new LinkedList<>();
       list.addAll(instances.values());
       Collections.sort(list, new Comparator<ServiceInstance>() {

http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 5e17ebf..59f7c9e 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -542,8 +542,8 @@ public class LlapZookeeperRegistryImpl implements 
ServiceRegistry {
     }
 
     @Override
-    public Collection<ServiceInstance> getAllInstancesOrdered() {
-      Map<String, String> slotByWorker = new HashMap<String, String>();
+    public Collection<ServiceInstance> getAllInstancesOrdered(boolean 
consistentIndexes) {
+      Map<String, Long> slotByWorker = new HashMap<String, Long>();
       List<ServiceInstance> unsorted = new LinkedList<ServiceInstance>();
       for (ChildData childData : instancesCache.getCurrentData()) {
         if (childData == null) continue;
@@ -560,21 +560,45 @@ public class LlapZookeeperRegistryImpl implements 
ServiceRegistry {
                 " Ignoring from current instances list..", 
childData.getPath());
           }
         } else if (nodeName.startsWith(SLOT_PREFIX)) {
-          slotByWorker.put(extractWorkerIdFromSlot(childData), nodeName);
+          slotByWorker.put(extractWorkerIdFromSlot(childData),
+              Long.parseLong(nodeName.substring(SLOT_PREFIX.length())));
         } else {
           LOG.info("Ignoring unknown node {}", childData.getPath());
         }
       }
 
-      TreeMap<String, ServiceInstance> sorted = new TreeMap<>();
+      TreeMap<Long, ServiceInstance> sorted = new TreeMap<>();
+      long maxSlot = Long.MIN_VALUE;
       for (ServiceInstance worker : unsorted) {
-        String slot = slotByWorker.get(worker.getWorkerIdentity());
+        Long slot = slotByWorker.get(worker.getWorkerIdentity());
         if (slot == null) {
           LOG.info("Unknown slot for {}", worker.getWorkerIdentity());
           continue;
         }
+        maxSlot = Math.max(maxSlot, slot);
         sorted.put(slot, worker);
       }
+
+      if (consistentIndexes) {
+        // Add dummy instances to all slots where LLAPs are MIA... I can haz 
insert_iterator? 
+        TreeMap<Long, ServiceInstance> dummies = new TreeMap<>();
+        Iterator<Long> keyIter = sorted.keySet().iterator();
+        long expected = 0;
+        Long ts = null;
+        while (keyIter.hasNext()) {
+          Long slot = keyIter.next();
+          assert slot >= expected;
+          while (slot > expected) {
+            if (ts == null) {
+              ts = System.nanoTime(); // Inactive nodes restart every call!
+            }
+            dummies.put(expected, new InactiveServiceInstance("inactive-" + 
expected + "-" + ts));
+            ++expected;
+          }
+          ++expected;
+        }
+        sorted.putAll(dummies);
+      }
       return sorted.values();
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index f85bbf2..0614355 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -226,7 +226,7 @@ public class LlapWebServices extends AbstractService {
           }
           jg.writeStringField("identity", registry.getWorkerIdentity());
           jg.writeArrayFieldStart("peers");
-          for (ServiceInstance s : 
registry.getInstances().getAllInstancesOrdered()) {
+          for (ServiceInstance s : 
registry.getInstances().getAllInstancesOrdered(false)) {
             jg.writeStartObject();
             jg.writeStringField("identity", s.getWorkerIdentity());
             jg.writeStringField("host", s.getHost());

http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
index aafc27e..dcb985f 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
@@ -15,10 +15,13 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.hash.Hashing;
 
+import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -39,46 +42,83 @@ import org.slf4j.LoggerFactory;
  */
 public class HostAffinitySplitLocationProvider implements 
SplitLocationProvider {
 
-  private final Logger LOG = 
LoggerFactory.getLogger(HostAffinitySplitLocationProvider.class);
+  private final static Logger LOG = LoggerFactory.getLogger(
+      HostAffinitySplitLocationProvider.class);
   private final boolean isDebugEnabled = LOG.isDebugEnabled();
 
-  private final String[] knownLocations;
+  private final List<String> locations;
 
-  public HostAffinitySplitLocationProvider(String[] knownLocations) {
-    Preconditions.checkState(knownLocations != null && knownLocations.length 
!= 0,
+  public HostAffinitySplitLocationProvider(List<String> knownLocations) {
+    Preconditions.checkState(knownLocations != null && 
!knownLocations.isEmpty(),
         HostAffinitySplitLocationProvider.class.getName() +
-            "needs at least 1 location to function");
-    this.knownLocations = knownLocations;
+            " needs at least 1 location to function");
+    this.locations = knownLocations;
   }
 
   @Override
   public String[] getLocations(InputSplit split) throws IOException {
-    if (split instanceof FileSplit) {
-      FileSplit fsplit = (FileSplit) split;
-      int index = chooseBucket(fsplit.getPath().toString(), fsplit.getStart());
-      if (isDebugEnabled) {
-        LOG.debug(
-            "Split at " + fsplit.getPath() + " with offset= " + 
fsplit.getStart() + ", length=" +
-                fsplit.getLength() + " mapped to index=" + index + ", 
location=" +
-                knownLocations[index]);
-      }
-      return new String[]{knownLocations[index]};
-    } else {
+    if (!(split instanceof FileSplit)) {
       if (isDebugEnabled) {
         LOG.debug("Split: " + split + " is not a FileSplit. Using default 
locations");
       }
       return split.getLocations();
     }
+    FileSplit fsplit = (FileSplit) split;
+    String splitDesc = "Split at " + fsplit.getPath() + " with offset= " + 
fsplit.getStart()
+        + ", length=" + fsplit.getLength();
+    String location = locations.get(determineLocation(
+        locations, fsplit.getPath().toString(), fsplit.getStart(), splitDesc));
+    return (location != null) ? new String[] { location } : null;
   }
 
+  @VisibleForTesting
+  public static int determineLocation(
+      List<String> locations, String path, long start, String desc) {
+    byte[] bytes = getHashInputForSplit(path, start);
+    long hash1 = hash1(bytes);
+    int index = Hashing.consistentHash(hash1, locations.size());
+    String location = locations.get(index);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(desc + " mapped to index=" + index + ", location=" + location);
+    }
+    int iter = 1;
+    long hash2 = 0;
+    // Since our probing method is totally bogus, give up after some time.
+    while (location == null && iter < locations.size() * 2) {
+      if (iter == 1) {
+        hash2 = hash2(bytes);
+      }
+      // Note that this is not real double hashing since we have consistent 
hash on top.
+      index = Hashing.consistentHash(hash1 + iter * hash2, locations.size());
+      location = locations.get(index);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(desc + " remapped to index=" + index + ", location=" + 
location);
+      }
+      ++iter;
+    }
+    return index;
+  }
 
-  private int chooseBucket(String path, long startOffset) throws IOException {
+  private static byte[] getHashInputForSplit(String path, long start) {
     // Explicitly using only the start offset of a split, and not the length. 
Splits generated on
     // block boundaries and stripe boundaries can vary slightly. Try hashing 
both to the same node.
     // There is the drawback of potentially hashing the same data on multiple 
nodes though, when a
     // large split is sent to 1 node, and a second invocation uses smaller 
chunks of the previous
     // large split and send them to different nodes.
-    long hashCode = ((startOffset >> 2) * 37) ^ 
Murmur3.hash64(path.getBytes());
-    return Hashing.consistentHash(hashCode, knownLocations.length);
+    byte[] pathBytes = path.getBytes();
+    byte[] allBytes = new byte[pathBytes.length + 8];
+    System.arraycopy(pathBytes, 0, allBytes, 0, pathBytes.length);
+    SerDeUtils.writeLong(allBytes, pathBytes.length, start >> 3);
+    return allBytes;
+  }
+
+  private static long hash1(byte[] bytes) {
+    final int PRIME = 104729; // Same as hash64's default seed.
+    return Murmur3.hash64(bytes, 0, bytes.length, PRIME);
+  }
+
+  private static long hash2(byte[] bytes) {
+    final int PRIME = 1366661;
+    return Murmur3.hash64(bytes, 0, bytes.length, PRIME);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
index 2e9918e..113aa49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 
 import org.apache.commons.lang.ArrayUtils;
@@ -41,15 +42,14 @@ public class Utils {
       serviceRegistry = LlapRegistryService.getClient(conf);
 
       Collection<ServiceInstance> serviceInstances =
-          serviceRegistry.getInstances().getAllInstancesOrdered();
-      String[] locations = new String[serviceInstances.size()];
-      int i = 0;
+          serviceRegistry.getInstances().getAllInstancesOrdered(true);
+      ArrayList<String> locations = new ArrayList<>(serviceInstances.size());
       for (ServiceInstance serviceInstance : serviceInstances) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with 
hostname=" +
               serviceInstance.getHost() + " to list for split locations");
         }
-        locations[i++] = serviceInstance.getHost();
+        locations.add(serviceInstance.getHost());
       }
       splitLocationProvider = new HostAffinitySplitLocationProvider(locations);
     } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
index 7ed3df1..f5ca623 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -19,15 +19,22 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.math.stat.descriptive.SummaryStatistics;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,20 +42,20 @@ public class TestHostAffinitySplitLocationProvider {
   private final Logger LOG = 
LoggerFactory.getLogger(TestHostAffinitySplitLocationProvider.class);
 
 
-  private static final String[] locations = new String[5];
+  private static final List<String> locations = new ArrayList<>();
   private static final Set<String> locationsSet = new HashSet<>();
-  private static final String[] executorLocations = new String[9];
+  private static final List<String> executorLocations = new ArrayList<>();
   private static final Set<String> executorLocationsSet = new HashSet<>();
 
   static {
     for (int i = 0 ; i < 5 ; i++) {
-      locations[i] = "location" + i;
-      locationsSet.add(locations[i]);
+      locations.add("location" + i);
+      locationsSet.add(locations.get(i));
     }
 
     for (int i = 0 ; i < 9 ; i++) {
-      executorLocations[i] = "execLocation" + i;
-      executorLocationsSet.add(executorLocations[i]);
+      executorLocations.add("execLocation" + i);
+      executorLocationsSet.add(executorLocations.get(i));
     }
 
   }
@@ -58,20 +65,20 @@ public class TestHostAffinitySplitLocationProvider {
 
     HostAffinitySplitLocationProvider locationProvider = new 
HostAffinitySplitLocationProvider(executorLocations);
 
-    InputSplit inputSplit1 = createMockInputSplit(new String[] {locations[0], 
locations[1]});
-    InputSplit inputSplit2 = createMockInputSplit(new String[] {locations[2], 
locations[3]});
+    InputSplit inputSplit1 = createMockInputSplit(new String[] 
{locations.get(0), locations.get(1)});
+    InputSplit inputSplit2 = createMockInputSplit(new String[] 
{locations.get(2), locations.get(3)});
 
-    assertArrayEquals(new String[] {locations[0], locations[1]}, 
locationProvider.getLocations(inputSplit1));
-    assertArrayEquals(new String[] {locations[2], locations[3]}, 
locationProvider.getLocations(inputSplit2));
+    assertArrayEquals(new String[] {locations.get(0), locations.get(1)}, 
locationProvider.getLocations(inputSplit1));
+    assertArrayEquals(new String[] {locations.get(2), locations.get(3)}, 
locationProvider.getLocations(inputSplit2));
   }
 
   @Test (timeout = 5000)
   public void testOrcSplitsBasic() throws IOException {
     HostAffinitySplitLocationProvider locationProvider = new 
HostAffinitySplitLocationProvider(executorLocations);
 
-    InputSplit os1 = createMockFileSplit(true, "path1", 0, 1000, new String[] 
{locations[0], locations[1]});
-    InputSplit os2 = createMockFileSplit(true, "path2", 0, 2000, new String[] 
{locations[2], locations[3]});
-    InputSplit os3 = createMockFileSplit(true, "path3", 1000, 2000, new 
String[] {locations[0], locations[3]});
+    InputSplit os1 = createMockFileSplit(true, "path1", 0, 1000, new String[] 
{locations.get(0), locations.get(1)});
+    InputSplit os2 = createMockFileSplit(true, "path2", 0, 2000, new String[] 
{locations.get(2), locations.get(3)});
+    InputSplit os3 = createMockFileSplit(true, "path3", 1000, 2000, new 
String[] {locations.get(0), locations.get(3)});
 
     String[] retLoc1 = locationProvider.getLocations(os1);
     String[] retLoc2 = locationProvider.getLocations(os2);
@@ -94,25 +101,18 @@ public class TestHostAffinitySplitLocationProvider {
   @Test (timeout = 10000)
   public void testConsistentHashing() throws IOException {
     final int LOC_COUNT = 20, MIN_LOC_COUNT = 4, SPLIT_COUNT = 100;
-
-    String[] locations = new String[LOC_COUNT];
-    for (int i = 0; i < locations.length; ++i) {
-      locations[i] = String.valueOf(i);
-    }
-    InputSplit[] splits = new InputSplit[SPLIT_COUNT];
-    for (int i = 0; i < splits.length; ++i) {
-      splits[i] = createMockFileSplit(true, "path" + i, 0, 1000, new String[] 
{});
-    }
+    List<String> locations = createLocations(LOC_COUNT);
+    InputSplit[] splits = createSplits(SPLIT_COUNT);
 
     StringBuilder failBuilder = new StringBuilder("\n");
     String[] lastLocations = new String[splits.length];
     double movedRatioSum = 0, newRatioSum = 0,
         movedRatioWorst = 0, newRatioWorst = Double.MAX_VALUE;
-    for (int locs = MIN_LOC_COUNT; locs <= locations.length; ++locs) {
-      String[] partLoc = Arrays.copyOf(locations, locs);
+    for (int locs = MIN_LOC_COUNT; locs <= locations.size(); ++locs) {
+      List<String> partLoc = locations.subList(0, locs);
       HostAffinitySplitLocationProvider lp = new 
HostAffinitySplitLocationProvider(partLoc);
       int moved = 0, newLoc = 0;
-      String newNode = partLoc[locs - 1];
+      String newNode = partLoc.get(locs - 1);
       for (int splitIx = 0; splitIx < splits.length; ++splitIx) {
         String[] splitLocations = lp.getLocations(splits[splitIx]);
         assertEquals(1, splitLocations.length);
@@ -137,18 +137,106 @@ public class TestHostAffinitySplitLocationProvider {
       newRatioWorst = Math.min(newLoc / minNew, newRatioWorst);
       logBadRatios(failBuilder, moved, newLoc, msgTail, movedMsg, newMsg, 
maxMoved, minNew);
     }
-    int count = locations.length - MIN_LOC_COUNT;
+    int count = locations.size() - MIN_LOC_COUNT;
     double moveRatioAvg = movedRatioSum / count, newRatioAvg = newRatioSum / 
count;
     String errorMsg = "Move counts: average " + moveRatioAvg + ", worst " + 
movedRatioWorst
         + "; assigned to new node: average " + newRatioAvg + ", worst " + 
newRatioWorst;
     LOG.info(errorMsg);
     // Give it a LOT of slack, since on low numbers consistent hashing is very 
imprecise.
     if (moveRatioAvg > 1.2f || newRatioAvg < 0.8f
-        || movedRatioWorst > 1.5f || newRatioWorst < 0.5f) {
+        || movedRatioWorst > 1.67f || newRatioWorst < 0.5f) {
       fail(errorMsg + "; example failures: " + failBuilder.toString());
     }
   }
 
+  public FileSplit[] createSplits(final int splitCount) throws IOException {
+    FileSplit[] splits = new FileSplit[splitCount];
+    for (int i = 0; i < splits.length; ++i) {
+      splits[i] = createMockFileSplit(true, "path" + i, 0, 1000, new String[] 
{});
+    }
+    return splits;
+  }
+
+  public List<String> createLocations(final int locCount) {
+    List<String> locations = new ArrayList<>(locCount);
+    for (int i = 0; i < locCount; ++i) {
+      locations.add(String.valueOf(i));
+    }
+    return locations;
+  }
+
+
+  @Test (timeout = 10000)
+  public void testConsistentHashingFallback() throws IOException {
+    final int LOC_COUNT_TO = 20, SPLIT_COUNT = 500, MAX_MISS_COUNT = 4,
+        LOC_COUNT_FROM = MAX_MISS_COUNT + 1;
+    FileSplit[] splits = createSplits(SPLIT_COUNT);
+    AtomicInteger errorCount = new AtomicInteger(0);
+    int cvErrorCount = 0;
+    for (int locs = LOC_COUNT_FROM; locs <= LOC_COUNT_TO; ++locs) {
+      int aboveAvgCount = 0;
+      double sum = 0;
+      double[] cvs = new double[MAX_MISS_COUNT + 1];
+      for (int missCount = 0; missCount <= MAX_MISS_COUNT; ++missCount) {
+        double cv = cvs[missCount] = testHashDistribution(locs, missCount, 
splits, errorCount);
+        sum += cv;
+        if (missCount > 0 && cv > sum / (missCount + 1)) {
+          ++aboveAvgCount;
+        }
+      }
+      if (aboveAvgCount > 2) {
+        LOG.info("CVs for " + locs + " locations aren't to our liking: " + 
Arrays.toString(cvs));
+        ++cvErrorCount;
+      }
+    }
+    assertTrue("Found " + errorCount.get() + " abnormalities", 
errorCount.get() < 3);
+    // TODO: the way we add hash fns does exhibit some irregularities.
+    //       Seems like the 3rd iter has a better distribution in many cases, 
even better
+    //       that the original hash. That trips the "above MA" criteria, even 
if the rest is flat.
+    assertTrue("Found " + cvErrorCount + " abnormalities", cvErrorCount< 7);
+  }
+
+  private double testHashDistribution(int locs, final int missCount, 
FileSplit[] splits,
+      AtomicInteger errorCount) {
+    // This relies heavily on what method determineSplits ... calls and 
doesn't.
+    // We could do a wrapper with only size() and get() methods instead of 
List, to be sure.
+    @SuppressWarnings("unchecked")
+    List<String> partLocs = (List<String>)Mockito.mock(List.class);
+    Mockito.when(partLocs.size()).thenReturn(locs);
+    final AtomicInteger state = new AtomicInteger(0);
+    Mockito.when(partLocs.get(Mockito.anyInt())).thenAnswer(new 
Answer<String>() {
+      @Override
+      public String answer(InvocationOnMock invocation) throws Throwable {
+        return (state.getAndIncrement() == missCount) ? "not-null" : null;
+      }
+    });
+    int[] hitCounts = new int[locs];
+    for (int splitIx = 0; splitIx < splits.length; ++splitIx) {
+      state.set(0);
+      int index = HostAffinitySplitLocationProvider.determineLocation(partLocs,
+          splits[splitIx].getPath().toString(), splits[splitIx].getStart(), 
null);
+      ++hitCounts[index];
+    }
+    SummaryStatistics ss = new SummaryStatistics();
+    for (int hitCount : hitCounts) {
+      ss.addValue(hitCount);
+    }
+    // All of this is completely bogus and mostly captures the following 
function:
+    // f(output) = I-eyeballed-the(output) == they-look-ok.
+    // It's pretty much a golden file... 
+    // The fact that stdev doesn't increase with increasing missCount is 
captured outside.
+    double avg = ss.getSum()/ss.getN(), stdev = ss.getStandardDeviation(), cv 
= stdev/avg;
+    double allowedMin = avg - 2.5 * stdev, allowedMax = avg + 2.5 * stdev;
+    if (allowedMin > ss.getMin() || allowedMax < ss.getMax() || cv > 0.22) {
+      LOG.info("The distribution for " + locs + " locations, " + missCount + " 
misses isn't to "
+          + "our liking: avg " + avg + ", stdev " + stdev  + ", cv " + cv + ", 
min " + ss.getMin()
+          + ", max " + ss.getMax());
+      errorCount.incrementAndGet();
+    }
+    return cv;
+  }
+
+
   private void logBadRatios(StringBuilder failBuilder, int moved, int newLoc, 
String msgTail,
       String movedMsg, String newMsg, double maxMoved, double minNew) {
     boolean logged = false;
@@ -170,10 +258,10 @@ public class TestHostAffinitySplitLocationProvider {
     HostAffinitySplitLocationProvider locationProvider = new 
HostAffinitySplitLocationProvider(executorLocations);
 
     // Same file, offset, different lengths
-    InputSplit os11 = createMockFileSplit(true, "path1", 0, 15000, new 
String[] {locations[0], locations[1]});
-    InputSplit os12 = createMockFileSplit(true, "path1", 0, 30000, new 
String[] {locations[0], locations[1]});
+    InputSplit os11 = createMockFileSplit(true, "path1", 0, 15000, new 
String[] {locations.get(0), locations.get(1)});
+    InputSplit os12 = createMockFileSplit(true, "path1", 0, 30000, new 
String[] {locations.get(0), locations.get(1)});
     // Same file, different offset
-    InputSplit os13 = createMockFileSplit(true, "path1", 15000, 30000, new 
String[] {locations[0], locations[1]});
+    InputSplit os13 = createMockFileSplit(true, "path1", 15000, 30000, new 
String[] {locations.get(0), locations.get(1)});
 
     String[] retLoc11 = locationProvider.getLocations(os11);
     String[] retLoc12 = locationProvider.getLocations(os12);
@@ -216,7 +304,7 @@ public class TestHostAffinitySplitLocationProvider {
     return inputSplit;
   }
 
-  private InputSplit createMockFileSplit(boolean createOrcSplit, String 
fakePathString, long start,
+  private FileSplit createMockFileSplit(boolean createOrcSplit, String 
fakePathString, long start,
                                          long length, String[] locations) 
throws IOException {
     FileSplit fileSplit;
     if (createOrcSplit) {

http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java 
b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
index 6e08dfd..7ffc964 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
@@ -567,4 +567,15 @@ public final class SerDeUtils {
   public static Text transformTextFromUTF8(Text text, Charset targetCharset) {
     return new Text(new String(text.getBytes(), 0, 
text.getLength()).getBytes(targetCharset));
   }
+
+  public static void writeLong(byte[] writeBuffer, int offset, long value) {
+    writeBuffer[offset] = (byte) ((value >> 0)  & 0xff);
+    writeBuffer[offset + 1] = (byte) ((value >> 8)  & 0xff);
+    writeBuffer[offset + 2] = (byte) ((value >> 16) & 0xff);
+    writeBuffer[offset + 3] = (byte) ((value >> 24) & 0xff);
+    writeBuffer[offset + 4] = (byte) ((value >> 32) & 0xff);
+    writeBuffer[offset + 5] = (byte) ((value >> 40) & 0xff);
+    writeBuffer[offset + 6] = (byte) ((value >> 48) & 0xff);
+    writeBuffer[offset + 7] = (byte) ((value >> 56) & 0xff);
+  }
 }

Reply via email to