Repository: hive Updated Branches: refs/heads/hive-14535 70299dc48 -> 6d9144835
HIVE-14680 : retain consistent splits /during/ (as opposed to across) LLAP failures on top of HIVE-14589 (Sergey Shelukhin, reviewed by Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/83ef6f92 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/83ef6f92 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/83ef6f92 Branch: refs/heads/hive-14535 Commit: 83ef6f9272d71e1918ffc89635709b4f81e8aba9 Parents: 4340d46 Author: Sergey Shelukhin <[email protected]> Authored: Mon Sep 19 16:11:16 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Mon Sep 19 16:11:16 2016 -0700 ---------------------------------------------------------------------- .../hive/llap/registry/ServiceInstanceSet.java | 7 +- .../registry/impl/InactiveServiceInstance.java | 77 ++++++++++ .../registry/impl/LlapFixedRegistryImpl.java | 2 +- .../impl/LlapZookeeperRegistryImpl.java | 34 ++++- .../daemon/services/impl/LlapWebServices.java | 2 +- .../tez/HostAffinitySplitLocationProvider.java | 80 +++++++--- .../apache/hadoop/hive/ql/exec/tez/Utils.java | 8 +- .../TestHostAffinitySplitLocationProvider.java | 150 +++++++++++++++---- .../apache/hadoop/hive/serde2/SerDeUtils.java | 11 ++ 9 files changed, 306 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java index 13b668d..1e8c895 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java @@ -14,7 +14,6 @@ package org.apache.hadoop.hive.llap.registry; import java.util.Collection; -import java.util.List; import java.util.Set; public interface ServiceInstanceSet { @@ -32,9 +31,11 @@ public interface ServiceInstanceSet { /** * Gets a list containing all the instances. This list has the same iteration order across * different processes, assuming the list of registry entries is the same. - * @return + * @param consistentIndexes if true, also try to maintain the same exact index for each node + * across calls, by inserting inactive instances to replace the + * removed ones. */ - public Collection<ServiceInstance> getAllInstancesOrdered(); + public Collection<ServiceInstance> getAllInstancesOrdered(boolean consistentIndexes); /** * Get an instance by worker identity. http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java new file mode 100644 index 0000000..79b7d51 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.registry.impl; + +import java.util.Map; + +import org.apache.hadoop.hive.llap.registry.ServiceInstance; +import org.apache.hadoop.yarn.api.records.Resource; + +public class InactiveServiceInstance implements ServiceInstance { + private final String name; + public InactiveServiceInstance(String name) { + this.name = name; + } + + @Override + public String getWorkerIdentity() { + return name; + } + + @Override + public boolean isAlive() { + return false; + } + + @Override + public String getHost() { + return null; + } + + @Override + public int getRpcPort() { + throw new UnsupportedOperationException(); + } + + @Override + public int getManagementPort() { + throw new UnsupportedOperationException(); + } + + @Override + public int getShufflePort() { + throw new UnsupportedOperationException(); + } + + @Override + public String getServicesAddress() { + throw new UnsupportedOperationException(); + } + + @Override + public int getOutputFormatPort() { + throw new UnsupportedOperationException(); + } + + @Override + public Map<String, String> getProperties() { + throw new UnsupportedOperationException(); + } + + @Override + public Resource getResource() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index de4d7f2..bbfcbf6 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -228,7 +228,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { } @Override - public List<ServiceInstance> getAllInstancesOrdered() { + public List<ServiceInstance> getAllInstancesOrdered(boolean consistentIndexes) { List<ServiceInstance> list = new LinkedList<>(); list.addAll(instances.values()); Collections.sort(list, new Comparator<ServiceInstance>() { http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 5e17ebf..59f7c9e 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -542,8 +542,8 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { } @Override - public Collection<ServiceInstance> getAllInstancesOrdered() { - Map<String, String> slotByWorker = new HashMap<String, String>(); + public Collection<ServiceInstance> getAllInstancesOrdered(boolean consistentIndexes) { + Map<String, Long> slotByWorker = new HashMap<String, Long>(); List<ServiceInstance> unsorted = new LinkedList<ServiceInstance>(); for (ChildData childData : instancesCache.getCurrentData()) { if (childData == null) continue; @@ -560,21 +560,45 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { " Ignoring from current instances list..", childData.getPath()); } } else if (nodeName.startsWith(SLOT_PREFIX)) { - slotByWorker.put(extractWorkerIdFromSlot(childData), nodeName); + slotByWorker.put(extractWorkerIdFromSlot(childData), + Long.parseLong(nodeName.substring(SLOT_PREFIX.length()))); } else { LOG.info("Ignoring unknown node {}", childData.getPath()); } } - TreeMap<String, ServiceInstance> sorted = new TreeMap<>(); + TreeMap<Long, ServiceInstance> sorted = new TreeMap<>(); + long maxSlot = Long.MIN_VALUE; for (ServiceInstance worker : unsorted) { - String slot = slotByWorker.get(worker.getWorkerIdentity()); + Long slot = slotByWorker.get(worker.getWorkerIdentity()); if (slot == null) { LOG.info("Unknown slot for {}", worker.getWorkerIdentity()); continue; } + maxSlot = Math.max(maxSlot, slot); sorted.put(slot, worker); } + + if (consistentIndexes) { + // Add dummy instances to all slots where LLAPs are MIA... I can haz insert_iterator? + TreeMap<Long, ServiceInstance> dummies = new TreeMap<>(); + Iterator<Long> keyIter = sorted.keySet().iterator(); + long expected = 0; + Long ts = null; + while (keyIter.hasNext()) { + Long slot = keyIter.next(); + assert slot >= expected; + while (slot > expected) { + if (ts == null) { + ts = System.nanoTime(); // Inactive nodes restart every call! + } + dummies.put(expected, new InactiveServiceInstance("inactive-" + expected + "-" + ts)); + ++expected; + } + ++expected; + } + sorted.putAll(dummies); + } return sorted.values(); } http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java index f85bbf2..0614355 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java @@ -226,7 +226,7 @@ public class LlapWebServices extends AbstractService { } jg.writeStringField("identity", registry.getWorkerIdentity()); jg.writeArrayFieldStart("peers"); - for (ServiceInstance s : registry.getInstances().getAllInstancesOrdered()) { + for (ServiceInstance s : registry.getInstances().getAllInstancesOrdered(false)) { jg.writeStartObject(); jg.writeStringField("identity", s.getWorkerIdentity()); jg.writeStringField("host", s.getHost()); http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java index aafc27e..dcb985f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java @@ -15,10 +15,13 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.List; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.hash.Hashing; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.split.SplitLocationProvider; @@ -39,46 +42,83 @@ import org.slf4j.LoggerFactory; */ public class HostAffinitySplitLocationProvider implements SplitLocationProvider { - private final Logger LOG = LoggerFactory.getLogger(HostAffinitySplitLocationProvider.class); + private final static Logger LOG = LoggerFactory.getLogger( + HostAffinitySplitLocationProvider.class); private final boolean isDebugEnabled = LOG.isDebugEnabled(); - private final String[] knownLocations; + private final List<String> locations; - public HostAffinitySplitLocationProvider(String[] knownLocations) { - Preconditions.checkState(knownLocations != null && knownLocations.length != 0, + public HostAffinitySplitLocationProvider(List<String> knownLocations) { + Preconditions.checkState(knownLocations != null && !knownLocations.isEmpty(), HostAffinitySplitLocationProvider.class.getName() + - "needs at least 1 location to function"); - this.knownLocations = knownLocations; + " needs at least 1 location to function"); + this.locations = knownLocations; } @Override public String[] getLocations(InputSplit split) throws IOException { - if (split instanceof FileSplit) { - FileSplit fsplit = (FileSplit) split; - int index = chooseBucket(fsplit.getPath().toString(), fsplit.getStart()); - if (isDebugEnabled) { - LOG.debug( - "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" + - fsplit.getLength() + " mapped to index=" + index + ", location=" + - knownLocations[index]); - } - return new String[]{knownLocations[index]}; - } else { + if (!(split instanceof FileSplit)) { if (isDebugEnabled) { LOG.debug("Split: " + split + " is not a FileSplit. Using default locations"); } return split.getLocations(); } + FileSplit fsplit = (FileSplit) split; + String splitDesc = "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + + ", length=" + fsplit.getLength(); + String location = locations.get(determineLocation( + locations, fsplit.getPath().toString(), fsplit.getStart(), splitDesc)); + return (location != null) ? new String[] { location } : null; } + @VisibleForTesting + public static int determineLocation( + List<String> locations, String path, long start, String desc) { + byte[] bytes = getHashInputForSplit(path, start); + long hash1 = hash1(bytes); + int index = Hashing.consistentHash(hash1, locations.size()); + String location = locations.get(index); + if (LOG.isDebugEnabled()) { + LOG.debug(desc + " mapped to index=" + index + ", location=" + location); + } + int iter = 1; + long hash2 = 0; + // Since our probing method is totally bogus, give up after some time. + while (location == null && iter < locations.size() * 2) { + if (iter == 1) { + hash2 = hash2(bytes); + } + // Note that this is not real double hashing since we have consistent hash on top. + index = Hashing.consistentHash(hash1 + iter * hash2, locations.size()); + location = locations.get(index); + if (LOG.isDebugEnabled()) { + LOG.debug(desc + " remapped to index=" + index + ", location=" + location); + } + ++iter; + } + return index; + } - private int chooseBucket(String path, long startOffset) throws IOException { + private static byte[] getHashInputForSplit(String path, long start) { // Explicitly using only the start offset of a split, and not the length. Splits generated on // block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node. // There is the drawback of potentially hashing the same data on multiple nodes though, when a // large split is sent to 1 node, and a second invocation uses smaller chunks of the previous // large split and send them to different nodes. - long hashCode = ((startOffset >> 2) * 37) ^ Murmur3.hash64(path.getBytes()); - return Hashing.consistentHash(hashCode, knownLocations.length); + byte[] pathBytes = path.getBytes(); + byte[] allBytes = new byte[pathBytes.length + 8]; + System.arraycopy(pathBytes, 0, allBytes, 0, pathBytes.length); + SerDeUtils.writeLong(allBytes, pathBytes.length, start >> 3); + return allBytes; + } + + private static long hash1(byte[] bytes) { + final int PRIME = 104729; // Same as hash64's default seed. + return Murmur3.hash64(bytes, 0, bytes.length, PRIME); + } + + private static long hash2(byte[] bytes) { + final int PRIME = 1366661; + return Murmur3.hash64(bytes, 0, bytes.length, PRIME); } } http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java index 2e9918e..113aa49 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import org.apache.commons.lang.ArrayUtils; @@ -41,15 +42,14 @@ public class Utils { serviceRegistry = LlapRegistryService.getClient(conf); Collection<ServiceInstance> serviceInstances = - serviceRegistry.getInstances().getAllInstancesOrdered(); - String[] locations = new String[serviceInstances.size()]; - int i = 0; + serviceRegistry.getInstances().getAllInstancesOrdered(true); + ArrayList<String> locations = new ArrayList<>(serviceInstances.size()); for (ServiceInstance serviceInstance : serviceInstances) { if (LOG.isDebugEnabled()) { LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" + serviceInstance.getHost() + " to list for split locations"); } - locations[i++] = serviceInstance.getHost(); + locations.add(serviceInstance.getHost()); } splitLocationProvider = new HostAffinitySplitLocationProvider(locations); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java index 7ed3df1..f5ca623 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java @@ -19,15 +19,22 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.math.stat.descriptive.SummaryStatistics; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.orc.OrcSplit; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,20 +42,20 @@ public class TestHostAffinitySplitLocationProvider { private final Logger LOG = LoggerFactory.getLogger(TestHostAffinitySplitLocationProvider.class); - private static final String[] locations = new String[5]; + private static final List<String> locations = new ArrayList<>(); private static final Set<String> locationsSet = new HashSet<>(); - private static final String[] executorLocations = new String[9]; + private static final List<String> executorLocations = new ArrayList<>(); private static final Set<String> executorLocationsSet = new HashSet<>(); static { for (int i = 0 ; i < 5 ; i++) { - locations[i] = "location" + i; - locationsSet.add(locations[i]); + locations.add("location" + i); + locationsSet.add(locations.get(i)); } for (int i = 0 ; i < 9 ; i++) { - executorLocations[i] = "execLocation" + i; - executorLocationsSet.add(executorLocations[i]); + executorLocations.add("execLocation" + i); + executorLocationsSet.add(executorLocations.get(i)); } } @@ -58,20 +65,20 @@ public class TestHostAffinitySplitLocationProvider { HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); - InputSplit inputSplit1 = createMockInputSplit(new String[] {locations[0], locations[1]}); - InputSplit inputSplit2 = createMockInputSplit(new String[] {locations[2], locations[3]}); + InputSplit inputSplit1 = createMockInputSplit(new String[] {locations.get(0), locations.get(1)}); + InputSplit inputSplit2 = createMockInputSplit(new String[] {locations.get(2), locations.get(3)}); - assertArrayEquals(new String[] {locations[0], locations[1]}, locationProvider.getLocations(inputSplit1)); - assertArrayEquals(new String[] {locations[2], locations[3]}, locationProvider.getLocations(inputSplit2)); + assertArrayEquals(new String[] {locations.get(0), locations.get(1)}, locationProvider.getLocations(inputSplit1)); + assertArrayEquals(new String[] {locations.get(2), locations.get(3)}, locationProvider.getLocations(inputSplit2)); } @Test (timeout = 5000) public void testOrcSplitsBasic() throws IOException { HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); - InputSplit os1 = createMockFileSplit(true, "path1", 0, 1000, new String[] {locations[0], locations[1]}); - InputSplit os2 = createMockFileSplit(true, "path2", 0, 2000, new String[] {locations[2], locations[3]}); - InputSplit os3 = createMockFileSplit(true, "path3", 1000, 2000, new String[] {locations[0], locations[3]}); + InputSplit os1 = createMockFileSplit(true, "path1", 0, 1000, new String[] {locations.get(0), locations.get(1)}); + InputSplit os2 = createMockFileSplit(true, "path2", 0, 2000, new String[] {locations.get(2), locations.get(3)}); + InputSplit os3 = createMockFileSplit(true, "path3", 1000, 2000, new String[] {locations.get(0), locations.get(3)}); String[] retLoc1 = locationProvider.getLocations(os1); String[] retLoc2 = locationProvider.getLocations(os2); @@ -94,25 +101,18 @@ public class TestHostAffinitySplitLocationProvider { @Test (timeout = 10000) public void testConsistentHashing() throws IOException { final int LOC_COUNT = 20, MIN_LOC_COUNT = 4, SPLIT_COUNT = 100; - - String[] locations = new String[LOC_COUNT]; - for (int i = 0; i < locations.length; ++i) { - locations[i] = String.valueOf(i); - } - InputSplit[] splits = new InputSplit[SPLIT_COUNT]; - for (int i = 0; i < splits.length; ++i) { - splits[i] = createMockFileSplit(true, "path" + i, 0, 1000, new String[] {}); - } + List<String> locations = createLocations(LOC_COUNT); + InputSplit[] splits = createSplits(SPLIT_COUNT); StringBuilder failBuilder = new StringBuilder("\n"); String[] lastLocations = new String[splits.length]; double movedRatioSum = 0, newRatioSum = 0, movedRatioWorst = 0, newRatioWorst = Double.MAX_VALUE; - for (int locs = MIN_LOC_COUNT; locs <= locations.length; ++locs) { - String[] partLoc = Arrays.copyOf(locations, locs); + for (int locs = MIN_LOC_COUNT; locs <= locations.size(); ++locs) { + List<String> partLoc = locations.subList(0, locs); HostAffinitySplitLocationProvider lp = new HostAffinitySplitLocationProvider(partLoc); int moved = 0, newLoc = 0; - String newNode = partLoc[locs - 1]; + String newNode = partLoc.get(locs - 1); for (int splitIx = 0; splitIx < splits.length; ++splitIx) { String[] splitLocations = lp.getLocations(splits[splitIx]); assertEquals(1, splitLocations.length); @@ -137,18 +137,106 @@ public class TestHostAffinitySplitLocationProvider { newRatioWorst = Math.min(newLoc / minNew, newRatioWorst); logBadRatios(failBuilder, moved, newLoc, msgTail, movedMsg, newMsg, maxMoved, minNew); } - int count = locations.length - MIN_LOC_COUNT; + int count = locations.size() - MIN_LOC_COUNT; double moveRatioAvg = movedRatioSum / count, newRatioAvg = newRatioSum / count; String errorMsg = "Move counts: average " + moveRatioAvg + ", worst " + movedRatioWorst + "; assigned to new node: average " + newRatioAvg + ", worst " + newRatioWorst; LOG.info(errorMsg); // Give it a LOT of slack, since on low numbers consistent hashing is very imprecise. if (moveRatioAvg > 1.2f || newRatioAvg < 0.8f - || movedRatioWorst > 1.5f || newRatioWorst < 0.5f) { + || movedRatioWorst > 1.67f || newRatioWorst < 0.5f) { fail(errorMsg + "; example failures: " + failBuilder.toString()); } } + public FileSplit[] createSplits(final int splitCount) throws IOException { + FileSplit[] splits = new FileSplit[splitCount]; + for (int i = 0; i < splits.length; ++i) { + splits[i] = createMockFileSplit(true, "path" + i, 0, 1000, new String[] {}); + } + return splits; + } + + public List<String> createLocations(final int locCount) { + List<String> locations = new ArrayList<>(locCount); + for (int i = 0; i < locCount; ++i) { + locations.add(String.valueOf(i)); + } + return locations; + } + + + @Test (timeout = 10000) + public void testConsistentHashingFallback() throws IOException { + final int LOC_COUNT_TO = 20, SPLIT_COUNT = 500, MAX_MISS_COUNT = 4, + LOC_COUNT_FROM = MAX_MISS_COUNT + 1; + FileSplit[] splits = createSplits(SPLIT_COUNT); + AtomicInteger errorCount = new AtomicInteger(0); + int cvErrorCount = 0; + for (int locs = LOC_COUNT_FROM; locs <= LOC_COUNT_TO; ++locs) { + int aboveAvgCount = 0; + double sum = 0; + double[] cvs = new double[MAX_MISS_COUNT + 1]; + for (int missCount = 0; missCount <= MAX_MISS_COUNT; ++missCount) { + double cv = cvs[missCount] = testHashDistribution(locs, missCount, splits, errorCount); + sum += cv; + if (missCount > 0 && cv > sum / (missCount + 1)) { + ++aboveAvgCount; + } + } + if (aboveAvgCount > 2) { + LOG.info("CVs for " + locs + " locations aren't to our liking: " + Arrays.toString(cvs)); + ++cvErrorCount; + } + } + assertTrue("Found " + errorCount.get() + " abnormalities", errorCount.get() < 3); + // TODO: the way we add hash fns does exhibit some irregularities. + // Seems like the 3rd iter has a better distribution in many cases, even better + // that the original hash. That trips the "above MA" criteria, even if the rest is flat. + assertTrue("Found " + cvErrorCount + " abnormalities", cvErrorCount< 7); + } + + private double testHashDistribution(int locs, final int missCount, FileSplit[] splits, + AtomicInteger errorCount) { + // This relies heavily on what method determineSplits ... calls and doesn't. + // We could do a wrapper with only size() and get() methods instead of List, to be sure. + @SuppressWarnings("unchecked") + List<String> partLocs = (List<String>)Mockito.mock(List.class); + Mockito.when(partLocs.size()).thenReturn(locs); + final AtomicInteger state = new AtomicInteger(0); + Mockito.when(partLocs.get(Mockito.anyInt())).thenAnswer(new Answer<String>() { + @Override + public String answer(InvocationOnMock invocation) throws Throwable { + return (state.getAndIncrement() == missCount) ? "not-null" : null; + } + }); + int[] hitCounts = new int[locs]; + for (int splitIx = 0; splitIx < splits.length; ++splitIx) { + state.set(0); + int index = HostAffinitySplitLocationProvider.determineLocation(partLocs, + splits[splitIx].getPath().toString(), splits[splitIx].getStart(), null); + ++hitCounts[index]; + } + SummaryStatistics ss = new SummaryStatistics(); + for (int hitCount : hitCounts) { + ss.addValue(hitCount); + } + // All of this is completely bogus and mostly captures the following function: + // f(output) = I-eyeballed-the(output) == they-look-ok. + // It's pretty much a golden file... + // The fact that stdev doesn't increase with increasing missCount is captured outside. + double avg = ss.getSum()/ss.getN(), stdev = ss.getStandardDeviation(), cv = stdev/avg; + double allowedMin = avg - 2.5 * stdev, allowedMax = avg + 2.5 * stdev; + if (allowedMin > ss.getMin() || allowedMax < ss.getMax() || cv > 0.22) { + LOG.info("The distribution for " + locs + " locations, " + missCount + " misses isn't to " + + "our liking: avg " + avg + ", stdev " + stdev + ", cv " + cv + ", min " + ss.getMin() + + ", max " + ss.getMax()); + errorCount.incrementAndGet(); + } + return cv; + } + + private void logBadRatios(StringBuilder failBuilder, int moved, int newLoc, String msgTail, String movedMsg, String newMsg, double maxMoved, double minNew) { boolean logged = false; @@ -170,10 +258,10 @@ public class TestHostAffinitySplitLocationProvider { HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations); // Same file, offset, different lengths - InputSplit os11 = createMockFileSplit(true, "path1", 0, 15000, new String[] {locations[0], locations[1]}); - InputSplit os12 = createMockFileSplit(true, "path1", 0, 30000, new String[] {locations[0], locations[1]}); + InputSplit os11 = createMockFileSplit(true, "path1", 0, 15000, new String[] {locations.get(0), locations.get(1)}); + InputSplit os12 = createMockFileSplit(true, "path1", 0, 30000, new String[] {locations.get(0), locations.get(1)}); // Same file, different offset - InputSplit os13 = createMockFileSplit(true, "path1", 15000, 30000, new String[] {locations[0], locations[1]}); + InputSplit os13 = createMockFileSplit(true, "path1", 15000, 30000, new String[] {locations.get(0), locations.get(1)}); String[] retLoc11 = locationProvider.getLocations(os11); String[] retLoc12 = locationProvider.getLocations(os12); @@ -216,7 +304,7 @@ public class TestHostAffinitySplitLocationProvider { return inputSplit; } - private InputSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start, + private FileSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start, long length, String[] locations) throws IOException { FileSplit fileSplit; if (createOrcSplit) { http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java index 6e08dfd..7ffc964 100644 --- a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java +++ b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java @@ -567,4 +567,15 @@ public final class SerDeUtils { public static Text transformTextFromUTF8(Text text, Charset targetCharset) { return new Text(new String(text.getBytes(), 0, text.getLength()).getBytes(targetCharset)); } + + public static void writeLong(byte[] writeBuffer, int offset, long value) { + writeBuffer[offset] = (byte) ((value >> 0) & 0xff); + writeBuffer[offset + 1] = (byte) ((value >> 8) & 0xff); + writeBuffer[offset + 2] = (byte) ((value >> 16) & 0xff); + writeBuffer[offset + 3] = (byte) ((value >> 24) & 0xff); + writeBuffer[offset + 4] = (byte) ((value >> 32) & 0xff); + writeBuffer[offset + 5] = (byte) ((value >> 40) & 0xff); + writeBuffer[offset + 6] = (byte) ((value >> 48) & 0xff); + writeBuffer[offset + 7] = (byte) ((value >> 56) & 0xff); + } }
