This is an automated email from the ASF dual-hosted git repository.
gopalv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c45751f HIVE-21232: LLAP: Add a cache-miss friendly split affinity
provider (Gopal V, reviewed by Slim Bouguerra)
c45751f is described below
commit c45751fb4029009e08f6389c21a68c3ba26ec6de
Author: Gopal V <[email protected]>
AuthorDate: Thu Feb 21 17:04:26 2019 -0800
HIVE-21232: LLAP: Add a cache-miss friendly split affinity provider (Gopal
V, reviewed by Slim Bouguerra)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 6 +--
.../tez/HostAffinitySplitLocationProvider.java | 45 ++++++++++++++++++----
.../tez/TestHostAffinitySplitLocationProvider.java | 42 ++++++++++++++++++++
3 files changed, 83 insertions(+), 10 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 11f165a..04166db 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4352,9 +4352,9 @@ public class HiveConf extends Configuration {
"Whether LLAP daemon web UI should use SSL.", "llap.daemon.service.ssl"),
LLAP_CLIENT_CONSISTENT_SPLITS("hive.llap.client.consistent.splits", true,
"Whether to setup split locations to match nodes on which llap daemons
are running, " +
- "instead of using the locations provided by the split itself. If there
is no llap daemon " +
- "running, fall back to locations provided by the split. This is
effective only if " +
- "hive.execution.mode is llap"),
+ "preferring one of the locations provided by the split itself. If
there is no llap daemon " +
+ "running on any of those locations (or on the cloud), fall back to a
cache affinity to" +
+ " an LLAP node. This is effective only if hive.execution.mode is
llap."),
LLAP_VALIDATE_ACLS("hive.llap.validate.acls", true,
"Whether LLAP should reject permissive ACLs in some cases (e.g. its
own management\n" +
"protocol or ZK paths), similar to how ssh refuses a key with bad
access permissions."),
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
index c5d96e5..5224429 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
@@ -15,11 +15,11 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.hash.Hashing;
+import java.util.Set;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.mapred.FileSplit;
@@ -29,6 +29,10 @@ import org.apache.hive.common.util.Murmur3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.hash.Hashing;
+
/**
* This maps a split (path + offset) to an index based on the number of
locations provided.
*
@@ -47,18 +51,20 @@ public class HostAffinitySplitLocationProvider implements
SplitLocationProvider
private final boolean isDebugEnabled = LOG.isDebugEnabled();
private final List<String> locations;
+ private final Set<String> locationSet;
public HostAffinitySplitLocationProvider(List<String> knownLocations) {
Preconditions.checkState(knownLocations != null &&
!knownLocations.isEmpty(),
HostAffinitySplitLocationProvider.class.getName() +
" needs at least 1 location to function");
this.locations = knownLocations;
+ this.locationSet = new HashSet<String>(knownLocations);
}
@Override
public String[] getLocations(InputSplit split) throws IOException {
if (!(split instanceof FileSplit)) {
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("Split: " + split + " is not a FileSplit. Using default
locations");
}
return split.getLocations();
@@ -66,14 +72,39 @@ public class HostAffinitySplitLocationProvider implements
SplitLocationProvider
FileSplit fsplit = (FileSplit) split;
String splitDesc = "Split at " + fsplit.getPath() + " with offset= " +
fsplit.getStart()
+ ", length=" + fsplit.getLength();
- String location = locations.get(determineLocation(
- locations, fsplit.getPath().toString(), fsplit.getStart(), splitDesc));
+ List<String> preferredLocations = preferLocations(fsplit);
+ String location =
+ preferredLocations.get(determineLocation(preferredLocations,
fsplit.getPath().toString(),
+ fsplit.getStart(), splitDesc));
return (location != null) ? new String[] { location } : null;
}
+ private List<String> preferLocations(FileSplit fsplit) throws IOException {
+ if (fsplit.getLocations() == null || fsplit.getLocations().length <= 0) {
+ // Cloud FS
+ return this.locations;
+ }
+ String[] datanodes = fsplit.getLocations();
+ Arrays.sort(datanodes);
+ ArrayList<String> targets = new ArrayList<String>(datanodes.length);
+ for (String location : datanodes) {
+ if (locationSet.contains(location)) {
+ targets.add(location);
+ }
+ }
+ if (targets.size() > 0) {
+ return targets;
+ }
+ return this.locations;
+ }
+
@VisibleForTesting
public static int determineLocation(
List<String> locations, String path, long start, String desc) {
+ if (locations.size() == 1) {
+ // skip everything, this is simple
+ return 0;
+ }
byte[] bytes = getHashInputForSplit(path, start);
long hash1 = hash1(bytes);
int index = Hashing.consistentHash(hash1, locations.size());
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
index 13f4676..f37a10c 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -299,6 +299,48 @@ public class TestHostAffinitySplitLocationProvider {
assertArrayEquals(retLoc13, retLoc132);
}
+ @Test (timeout = 90000000)
+ public void testDFSLocalityAwareAffinity() throws IOException {
+ List<String> someLocations = locations.subList(0, 2); // 0,1 locations
+ HostAffinitySplitLocationProvider locationProvider = new
HostAffinitySplitLocationProvider(someLocations);
+
+ // Different base localities
+ InputSplit os1 = createMockFileSplit(true, "path1", 0, 15000, new String[]
{locations.get(0), locations.get(1)}); // 0 or 1
+ InputSplit os2 = createMockFileSplit(true, "path2", 0, 30000, new String[]
{locations.get(2), locations.get(3)}); // 0 or 1
+ InputSplit os3 = createMockFileSplit(true, "path3", 15000, 30000, new
String[] {locations.get(0), locations.get(2)}); // 0
+ InputSplit os4 = createMockFileSplit(true, "path4", 15000, 30000, new
String[] {locations.get(1), locations.get(2)}); // 1
+
+ String[] retLoc1 = locationProvider.getLocations(os1);
+ String[] retLoc2 = locationProvider.getLocations(os2);
+ String[] retLoc3 = locationProvider.getLocations(os3);
+ String[] retLoc4 = locationProvider.getLocations(os4);
+
+ assertEquals(1, retLoc1.length);
+ assertTrue(someLocations.contains(retLoc1[0]));
+
+ assertEquals(1, retLoc2.length);
+ assertTrue(someLocations.contains(retLoc2[0]));
+
+ assertEquals(1, retLoc3.length);
+ assertTrue(someLocations.contains(retLoc3[0]));
+ assertEquals(someLocations.get(0), retLoc3[0]); // is always 0
+
+ assertEquals(1, retLoc4.length);
+ assertTrue(someLocations.contains(retLoc4[0]));
+ assertEquals(someLocations.get(1), retLoc4[0]); // is always 1
+
+ String[] againLoc1 = locationProvider.getLocations(os1);
+ String[] againLoc2 = locationProvider.getLocations(os2);
+ String[] againLoc3 = locationProvider.getLocations(os3);
+ String[] againLoc4 = locationProvider.getLocations(os4);
+
+ assertArrayEquals(retLoc1, againLoc1);
+ assertArrayEquals(retLoc2, againLoc2);
+ assertArrayEquals(retLoc3, againLoc3);
+ assertArrayEquals(retLoc4, againLoc4);
+ }
+
+
private InputSplit createMockInputSplit(String[] locations) throws
IOException {
InputSplit inputSplit = mock(InputSplit.class);