Repository: hive Updated Branches: refs/heads/master ea3c79e4f -> c989605d9
HIVE-14574 : use consistent hashing for LLAP consistent splits to alleviate impact from cluster changes (Sergey Shelukhin, reviewed by Prasanth Jayachandran) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c989605d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c989605d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c989605d Branch: refs/heads/master Commit: c989605d995b63393fae834667549ec18b67475b Parents: ea3c79e Author: Sergey Shelukhin <[email protected]> Authored: Wed Aug 24 16:45:45 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Wed Aug 24 16:50:24 2016 -0700 ---------------------------------------------------------------------- .../tez/HostAffinitySplitLocationProvider.java | 26 +++--- .../TestHostAffinitySplitLocationProvider.java | 85 ++++++++++++++++++-- 2 files changed, 92 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c989605d/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java index c06499e..aafc27e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java @@ -17,7 +17,8 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; import com.google.common.base.Preconditions; -import org.apache.hadoop.io.DataOutputBuffer; +import com.google.common.hash.Hashing; + import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.split.SplitLocationProvider; @@ -54,9 +55,7 @@ public class HostAffinitySplitLocationProvider implements SplitLocationProvider public String[] getLocations(InputSplit split) throws IOException { if (split instanceof FileSplit) { FileSplit fsplit = (FileSplit) split; - long hash = generateHash(fsplit.getPath().toString(), fsplit.getStart()); - int indexRaw = (int) (hash % knownLocations.length); - int index = Math.abs(indexRaw); + int index = chooseBucket(fsplit.getPath().toString(), fsplit.getStart()); if (isDebugEnabled) { LOG.debug( "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" + @@ -72,15 +71,14 @@ public class HostAffinitySplitLocationProvider implements SplitLocationProvider } } - private long generateHash(String path, long startOffset) throws IOException { - // Explicitly using only the start offset of a split, and not the length. - // Splits generated on block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node. - // There is the drawback of potentially hashing the same data on multiple nodes though, when a large split - // is sent to 1 node, and a second invocation uses smaller chunks of the previous large split and send them - // to different nodes. - DataOutputBuffer dob = new DataOutputBuffer(); - dob.writeLong(startOffset); - dob.writeUTF(path); - return Murmur3.hash64(dob.getData(), 0, dob.getLength()); + + private int chooseBucket(String path, long startOffset) throws IOException { + // Explicitly using only the start offset of a split, and not the length. Splits generated on + // block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node. + // There is the drawback of potentially hashing the same data on multiple nodes though, when a + // large split is sent to 1 node, and a second invocation uses smaller chunks of the previous + // large split and send them to different nodes. + long hashCode = ((startOffset >> 2) * 37) ^ Murmur3.hash64(path.getBytes()); + return Hashing.consistentHash(hashCode, knownLocations.length); } } http://git-wip-us.apache.org/repos/asf/hive/blob/c989605d/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java index d98a5ff..54f7363 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java @@ -14,15 +14,12 @@ package org.apache.hadoop.hive.ql.exec.tez; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -31,8 +28,11 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSplit; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestHostAffinitySplitLocationProvider { + private final Logger LOG = LoggerFactory.getLogger(TestHostAffinitySplitLocationProvider.class); private static final String[] locations = new String[5]; @@ -90,6 +90,81 @@ public class TestHostAffinitySplitLocationProvider { assertTrue(executorLocationsSet.contains(retLoc3[0])); } + + @Test (timeout = 10000) + public void testConsistentHashing() throws IOException { + final int LOC_COUNT = 20, MIN_LOC_COUNT = 4, SPLIT_COUNT = 100; + + String[] locations = new String[LOC_COUNT]; + for (int i = 0; i < locations.length; ++i) { + locations[i] = String.valueOf(i); + } + InputSplit[] splits = new InputSplit[SPLIT_COUNT]; + for (int i = 0; i < splits.length; ++i) { + splits[i] = createMockFileSplit(true, "path" + i, 0, 1000, new String[] {}); + } + + StringBuilder failBuilder = new StringBuilder("\n"); + String[] lastLocations = new String[splits.length]; + double movedRatioSum = 0, newRatioSum = 0, + movedRatioWorst = 0, newRatioWorst = Double.MAX_VALUE; + for (int locs = MIN_LOC_COUNT; locs <= locations.length; ++locs) { + String[] partLoc = Arrays.copyOf(locations, locs); + HostAffinitySplitLocationProvider lp = new HostAffinitySplitLocationProvider(partLoc); + int moved = 0, newLoc = 0; + String newNode = partLoc[locs - 1]; + for (int splitIx = 0; splitIx < splits.length; ++splitIx) { + String[] splitLocations = lp.getLocations(splits[splitIx]); + assertEquals(1, splitLocations.length); + String splitLocation = splitLocations[0]; + if (locs > MIN_LOC_COUNT && !splitLocation.equals(lastLocations[splitIx])) { + ++moved; + } + if (newNode.equals(splitLocation)) { + ++newLoc; + } + lastLocations[splitIx] = splitLocation; + } + if (locs == MIN_LOC_COUNT) continue; + String msgTail = " when going to " + locs + " locations"; + String movedMsg = moved + " splits moved", + newMsg = newLoc + " splits went to the new node"; + LOG.info(movedMsg + " and " + newMsg + msgTail); + double maxMoved = 1.0f * splits.length / locs, minNew = 1.0f * splits.length / locs; + movedRatioSum += Math.max(moved / maxMoved, 1f); + movedRatioWorst = Math.max(moved / maxMoved, movedRatioWorst); + newRatioSum += Math.min(newLoc / minNew, 1f); + newRatioWorst = Math.min(newLoc / minNew, newRatioWorst); + logBadRatios(failBuilder, moved, newLoc, msgTail, movedMsg, newMsg, maxMoved, minNew); + } + int count = locations.length - MIN_LOC_COUNT; + double moveRatioAvg = movedRatioSum / count, newRatioAvg = newRatioSum / count; + String errorMsg = "Move counts: average " + moveRatioAvg + ", worst " + movedRatioWorst + + "; assigned to new node: average " + newRatioAvg + ", worst " + newRatioWorst; + LOG.info(errorMsg); + // Give it a LOT of slack, since on low numbers consistent hashing is very imprecise. + if (moveRatioAvg > 1.2f || newRatioAvg < 0.8f + || movedRatioWorst > 1.5f || newRatioWorst < 0.5f) { + fail(errorMsg + "; example failures: " + failBuilder.toString()); + } + } + + private void logBadRatios(StringBuilder failBuilder, int moved, int newLoc, String msgTail, + String movedMsg, String newMsg, double maxMoved, double minNew) { + boolean logged = false; + if (moved > maxMoved * 1.33f) { + failBuilder.append(movedMsg).append(" (threshold ").append(maxMoved).append(") "); + logged = true; + } + if (newLoc < minNew * 0.75f) { + failBuilder.append(newMsg).append(" (threshold ").append(minNew).append(") "); + logged = true; + } + if (logged) { + failBuilder.append(msgTail).append(";\n"); + } + } + @Test (timeout = 5000) public void testOrcSplitsLocationAffinity() throws IOException { HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations);
