This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 5ed4049 HIVE-25651: Enable LLAP cache affinity for Iceberg ORC splits
(Adam Szita, reviewed by Marton Bod and Peter Vary)
5ed4049 is described below
commit 5ed4049d5e1e325858d3fb45095529d46f9e5c0d
Author: Adam Szita <[email protected]>
AuthorDate: Mon Nov 8 14:12:01 2021 +0100
HIVE-25651: Enable LLAP cache affinity for Iceberg ORC splits (Adam Szita,
reviewed by Marton Bod and Peter Vary)
---
.../iceberg/mr/hive/HiveIcebergInputFormat.java | 11 +++-
.../apache/iceberg/mr/hive/HiveIcebergSplit.java | 22 ++++++-
.../iceberg/mr/mapreduce/IcebergInputFormat.java | 40 +++++++++---
.../apache/iceberg/mr/TestIcebergInputFormats.java | 30 +++++++++
.../positive/llap/vectorized_iceberg_read.q.out | 4 +-
.../hive/ql/exec/tez/HashableInputSplit.java | 25 +++++++
.../tez/HostAffinitySplitLocationProvider.java | 40 ++++++------
.../hive/ql/io/BucketizedHiveInputFormat.java | 2 +-
.../hadoop/hive/ql/io/CombineHiveInputFormat.java | 2 +-
.../apache/hadoop/hive/ql/io/HiveInputFormat.java | 76 ++++++++++++----------
.../ql/io/LlapCacheOnlyInputFormatInterface.java | 7 ++
.../org/apache/hadoop/hive/ql/plan/MapWork.java | 3 +-
.../tez/TestHostAffinitySplitLocationProvider.java | 7 +-
13 files changed, 195 insertions(+), 74 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index fc66890..9d792e5 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -23,12 +23,15 @@ import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DataCache;
+import org.apache.hadoop.hive.common.io.FileMetadataCache;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.LlapCacheOnlyInputFormatInterface;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -55,7 +58,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
- implements CombineHiveInputFormat.AvoidSplitCombination,
VectorizedInputFormatInterface {
+ implements CombineHiveInputFormat.AvoidSplitCombination,
VectorizedInputFormatInterface,
+ LlapCacheOnlyInputFormatInterface.VectorizedOnly {
private static final Logger LOG =
LoggerFactory.getLogger(HiveIcebergInputFormat.class);
private static final String HIVE_VECTORIZED_RECORDREADER_CLASS =
@@ -135,4 +139,9 @@ public class HiveIcebergInputFormat extends
MapredIcebergInputFormat<Record>
return new VectorizedSupport.Support[]{
VectorizedSupport.Support.DECIMAL_64 };
}
+ @Override
+ public void injectCaches(FileMetadataCache metadataCache, DataCache
dataCache, Configuration cacheConf) {
+ // no-op for Iceberg
+ }
+
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
index a1e7b1a..a9827ff 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
@@ -19,17 +19,22 @@
package org.apache.iceberg.mr.hive;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Collection;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.tez.HashableInputSplit;
import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.mr.mapreduce.IcebergSplit;
import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+import org.apache.iceberg.relocated.com.google.common.primitives.Longs;
import org.apache.iceberg.util.SerializationUtil;
// Hive requires file formats to return splits that are instances of
`FileSplit`.
-public class HiveIcebergSplit extends FileSplit implements
IcebergSplitContainer {
+public class HiveIcebergSplit extends FileSplit implements
IcebergSplitContainer, HashableInputSplit {
private IcebergSplit innerSplit;
@@ -69,6 +74,21 @@ public class HiveIcebergSplit extends FileSplit implements
IcebergSplitContainer
}
@Override
+ public byte[] getBytesForHash() {
+ Collection<FileScanTask> fileScanTasks = innerSplit.task().files();
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ for (FileScanTask task : fileScanTasks) {
+ baos.write(task.file().path().toString().getBytes());
+ baos.write(Longs.toByteArray(task.start()));
+ }
+ return baos.toByteArray();
+ } catch (IOException ioe) {
+ throw new RuntimeException("Couldn't produce hash input bytes for
HiveIcebergSplit: " + this, ioe);
+ }
+ }
+
+ @Override
public long getStart() {
return 0;
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index fdba3df..b2176d2 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -29,6 +29,9 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -97,31 +100,40 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
return new InputFormatConfig.ConfigBuilder(job.getConfiguration());
}
- @Override
- public List<InputSplit> getSplits(JobContext context) {
- Configuration conf = context.getConfiguration();
- Table table = Optional
- .ofNullable(HiveIcebergStorageHandler.table(conf,
conf.get(InputFormatConfig.TABLE_IDENTIFIER)))
- .orElseGet(() -> Catalogs.loadTable(conf));
-
+ private static TableScan createTableScan(Table table, Configuration conf) {
TableScan scan = table.newScan()
- .caseSensitive(conf.getBoolean(InputFormatConfig.CASE_SENSITIVE,
InputFormatConfig.CASE_SENSITIVE_DEFAULT));
+ .caseSensitive(conf.getBoolean(InputFormatConfig.CASE_SENSITIVE,
InputFormatConfig.CASE_SENSITIVE_DEFAULT));
long snapshotId = conf.getLong(InputFormatConfig.SNAPSHOT_ID, -1);
if (snapshotId != -1) {
scan = scan.useSnapshot(snapshotId);
}
+
long asOfTime = conf.getLong(InputFormatConfig.AS_OF_TIMESTAMP, -1);
if (asOfTime != -1) {
scan = scan.asOfTime(asOfTime);
}
+
long splitSize = conf.getLong(InputFormatConfig.SPLIT_SIZE, 0);
if (splitSize > 0) {
scan = scan.option(TableProperties.SPLIT_SIZE,
String.valueOf(splitSize));
}
+
+ // In case of LLAP-based execution we ask Iceberg not to combine multiple
fileScanTasks into one split.
+ // This is so that cache affinity can work, and each file(split) is
executed/cached on always the same LLAP daemon.
+ MapWork mapWork = LlapHiveUtils.findMapWork((JobConf) conf);
+ if (mapWork != null && mapWork.getCacheAffinity()) {
+ // Iceberg splits logically consist of buckets, where the bucket size
equals to openFileCost setting if the files
+ // assigned to such bucket are smaller. This is how Iceberg would
combine multiple files into one split, so here
+ // we need to enforce the bucket size to be equal to split size to avoid
file combination.
+ Long openFileCost = splitSize > 0 ? splitSize :
TableProperties.SPLIT_SIZE_DEFAULT;
+ scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST,
String.valueOf(openFileCost));
+ }
+
String schemaStr = conf.get(InputFormatConfig.READ_SCHEMA);
if (schemaStr != null) {
scan.project(SchemaParser.fromJson(schemaStr));
}
+
String[] selectedColumns =
conf.getStrings(InputFormatConfig.SELECTED_COLUMNS);
if (selectedColumns != null) {
scan.select(selectedColumns);
@@ -133,6 +145,18 @@ public class IcebergInputFormat<T> extends
InputFormat<Void, T> {
scan = scan.filter(filter);
}
+ return scan;
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) {
+ Configuration conf = context.getConfiguration();
+ Table table = Optional
+ .ofNullable(HiveIcebergStorageHandler.table(conf,
conf.get(InputFormatConfig.TABLE_IDENTIFIER)))
+ .orElseGet(() -> Catalogs.loadTable(conf));
+
+ TableScan scan = createTableScan(table, conf);
+
List<InputSplit> splits = Lists.newArrayList();
boolean applyResidual =
!conf.getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
InputFormatConfig.InMemoryDataModel model =
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
index bdc1014..bf2f29c 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -29,6 +29,10 @@ import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptID;
@@ -52,6 +56,7 @@ import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
@@ -70,6 +75,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class TestIcebergInputFormats {
@@ -371,6 +378,29 @@ public class TestIcebergInputFormats {
testInputFormat.create(builder.conf()).validate(expectedRecords);
}
+ @Test
+ public void testDeriveLlapSetsCacheAffinityForIcebergInputFormat() {
+ MapWork mapWork = new MapWork();
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.setInputFileFormatClass(HiveIcebergInputFormat.class);
+ mapWork.addPathToPartitionInfo(new Path("/tmp"), partitionDesc);
+ Configuration job = new Configuration(false);
+ HiveConf.setVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, "true");
+ HiveConf.setBoolVar(job,
HiveConf.ConfVars.LLAP_IO_NONVECTOR_WRAPPER_ENABLED, true);
+
+ mapWork.setVectorMode(true);
+ mapWork.deriveLlap(job, false);
+
+ assertTrue("Cache affinity should be set for HiveIcebergInputFormat when
LLAP and vectorization is enabled",
+ mapWork.getCacheAffinity());
+
+ mapWork.setVectorMode(false);
+ mapWork.deriveLlap(job, false);
+
+ assertFalse("Cache affinity should be disabled for HiveIcebergInputFormat
when LLAP is on, but vectorization not",
+ mapWork.getCacheAffinity());
+ }
+
// TODO - Capture template type T in toString method:
https://github.com/apache/iceberg/issues/1542
public abstract static class TestInputFormat<T> {
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read.q.out
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read.q.out
index d4c64f6..cff315c 100644
---
a/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read.q.out
+++
b/iceberg/iceberg-handler/src/test/results/positive/llap/vectorized_iceberg_read.q.out
@@ -68,7 +68,7 @@ STAGE PLANS:
Statistics: Num rows: 5 Data size: 460 Basic stats:
COMPLETE Column stats: COMPLETE
value expressions: _col1 (type: int)
Execution mode: vectorized, llap
- LLAP IO: no inputs
+ LLAP IO: all inputs (cache only)
Reducer 2
Execution mode: vectorized, llap
Reduce Operator Tree:
@@ -185,7 +185,7 @@ STAGE PLANS:
Statistics: Num rows: 1 Data size: 372 Basic stats:
COMPLETE Column stats: COMPLETE
value expressions: _col9 (type: float)
Execution mode: vectorized, llap
- LLAP IO: no inputs
+ LLAP IO: all inputs (cache only)
Reducer 2
Execution mode: vectorized, llap
Reduce Operator Tree:
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashableInputSplit.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashableInputSplit.java
new file mode 100644
index 0000000..8b2754c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashableInputSplit.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+/**
+ * An InputSplit type that has a custom/specific way of producing its
serialized content, to be later used as input
+ * of a hash function. Used for LLAP cache affinity, so that a certain split
always ends up on the same executor.
+ */
+public interface HashableInputSplit {
+
+ byte[] getBytesForHash();
+
+}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
index 6be7226..b75c4da 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.hash.Hashing;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -62,21 +61,17 @@ public class HostAffinitySplitLocationProvider implements
SplitLocationProvider
return split.getLocations();
}
FileSplit fsplit = (FileSplit) split;
- String splitDesc = "Split at " + fsplit.getPath() + " with offset= " +
fsplit.getStart()
- + ", length=" + fsplit.getLength();
- String location = locations.get(determineLocation(
- locations, fsplit.getPath().toString(), fsplit.getStart(), splitDesc));
+ String location = locations.get(determineLocation(locations, fsplit));
return (location != null) ? new String[] { location } : null;
}
@VisibleForTesting
- public static int determineLocation(
- List<String> locations, String path, long start, String desc) {
- byte[] bytes = getHashInputForSplit(path, start);
+ public static int determineLocation(List<String> locations, FileSplit
fsplit) {
+ byte[] bytes = getHashInputForSplit(fsplit);
long hash1 = hash1(bytes);
int index = Hashing.consistentHash(hash1, locations.size());
String location = locations.get(index);
- LOG.debug("{} mapped to index={}, location={}", desc, index, location);
+ LOG.debug("{} mapped to index={}, location={}",
getSplitDescForDebug(fsplit), index, location);
int iter = 1;
long hash2 = 0;
// Since our probing method is totally bogus, give up after some time.
@@ -87,23 +82,18 @@ public class HostAffinitySplitLocationProvider implements
SplitLocationProvider
// Note that this is not real double hashing since we have consistent
hash on top.
index = Hashing.consistentHash(hash1 + iter * hash2, locations.size());
location = locations.get(index);
- LOG.debug("{} remapped to index={}, location={}", desc, index, location);
+ LOG.debug("{} remapped to index={}, location={}",
getSplitDescForDebug(fsplit), index, location);
++iter;
}
return index;
}
- private static byte[] getHashInputForSplit(String path, long start) {
- // Explicitly using only the start offset of a split, and not the length.
Splits generated on
- // block boundaries and stripe boundaries can vary slightly. Try hashing
both to the same node.
- // There is the drawback of potentially hashing the same data on multiple
nodes though, when a
- // large split is sent to 1 node, and a second invocation uses smaller
chunks of the previous
- // large split and send them to different nodes.
- byte[] pathBytes = path.getBytes();
- byte[] allBytes = new byte[pathBytes.length + 8];
- System.arraycopy(pathBytes, 0, allBytes, 0, pathBytes.length);
- SerDeUtils.writeLong(allBytes, pathBytes.length, start >> 3);
- return allBytes;
+ private static byte[] getHashInputForSplit(FileSplit fsplit) {
+ if (fsplit instanceof HashableInputSplit) {
+ return ((HashableInputSplit)fsplit).getBytesForHash();
+ } else {
+ throw new RuntimeException("Split is not a HashableInputSplit: " +
fsplit);
+ }
}
private static long hash1(byte[] bytes) {
@@ -115,4 +105,12 @@ public class HostAffinitySplitLocationProvider implements
SplitLocationProvider
final int PRIME = 1366661;
return Murmur3.hash64(bytes, 0, bytes.length, PRIME);
}
+
+ private static String getSplitDescForDebug(FileSplit fsplit) {
+ if (LOG.isDebugEnabled()) {
+ return "Split at " + fsplit.getPath() + " with offset= " +
fsplit.getStart() + ", length=" + fsplit.getLength();
+ } else {
+ return null;
+ }
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
index b2d1e0d..3926618 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java
@@ -68,7 +68,7 @@ public class BucketizedHiveInputFormat<K extends
WritableComparable, V extends W
throw new IOException("cannot find class " + inputFormatClassName);
}
- pushProjectionsAndFiltersAndAsOf(job, inputFormatClass, hsplit.getPath());
+ pushProjectionsAndFiltersAndAsOf(job, hsplit.getPath());
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index a88424f..5c44960 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -710,7 +710,7 @@ public class CombineHiveInputFormat<K extends
WritableComparable, V extends Writ
throw new IOException("cannot find class " + inputFormatClassName);
}
- pushProjectionsAndFiltersAndAsOf(job, inputFormatClass, hsplit.getPath(0));
+ pushProjectionsAndFiltersAndAsOf(job, hsplit.getPath(0));
return ShimLoader.getHadoopShims().getCombineFileInputFormat()
.getRecordReader(job,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index d7491eb..e0787f5 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.HashableInputSplit;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
import
org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -141,7 +143,7 @@ public class HiveInputFormat<K extends WritableComparable,
V extends Writable>
* "map.input.file" in MapTask.
*/
public static class HiveInputSplit extends FileSplit implements InputSplit,
- Configurable {
+ Configurable, HashableInputSplit {
InputSplit inputSplit;
@@ -238,6 +240,24 @@ public class HiveInputFormat<K extends WritableComparable,
V extends Writable>
public void setConf(Configuration conf) {
this.conf = conf;
}
+
+ @Override
+ public byte[] getBytesForHash() {
+ if (inputSplit instanceof HashableInputSplit) {
+ return ((HashableInputSplit)inputSplit).getBytesForHash();
+ } else {
+ // Explicitly using only the start offset of a split, and not the
length. Splits generated on
+ // block boundaries and stripe boundaries can vary slightly. Try
hashing both to the same node.
+ // There is the drawback of potentially hashing the same data on
multiple nodes though, when a
+ // large split is sent to 1 node, and a second invocation uses smaller
chunks of the previous
+ // large split and send them to different nodes.
+ byte[] pathBytes = getPath().toString().getBytes();
+ byte[] allBytes = new byte[pathBytes.length + 8];
+ System.arraycopy(pathBytes, 0, allBytes, 0, pathBytes.length);
+ SerDeUtils.writeLong(allBytes, pathBytes.length, getStart() >> 3);
+ return allBytes;
+ }
+ }
}
@Override
@@ -337,7 +357,10 @@ public class HiveInputFormat<K extends WritableComparable,
V extends Writable>
(!checkVector || BatchToRowInputFormat.class.isAssignableFrom(clazz));
}
- public static boolean canInjectCaches(Class<? extends InputFormat> clazz) {
+ public static boolean canInjectCaches(Class<? extends InputFormat> clazz,
boolean isVectorized) {
+ if
(LlapCacheOnlyInputFormatInterface.VectorizedOnly.class.isAssignableFrom(clazz))
{
+ return isVectorized;
+ }
return LlapCacheOnlyInputFormatInterface.class.isAssignableFrom(clazz);
}
@@ -407,7 +430,7 @@ public class HiveInputFormat<K extends WritableComparable,
V extends Writable>
}
Path splitPath = hsplit.getPath();
- pushProjectionsAndFiltersAndAsOf(job, inputFormatClass, splitPath,
nonNative);
+ pushProjectionsAndFiltersAndAsOf(job, splitPath);
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
if (HiveConf.getBoolVar(job, ConfVars.LLAP_IO_ENABLED,
LlapProxy.isDaemon())) {
@@ -953,13 +976,7 @@ public class HiveInputFormat<K extends WritableComparable,
V extends Writable>
}
}
- protected void pushProjectionsAndFiltersAndAsOf(JobConf jobConf, Class
inputFormatClass,
- Path splitPath) {
- pushProjectionsAndFiltersAndAsOf(jobConf, inputFormatClass, splitPath,
false);
- }
-
- protected void pushProjectionsAndFiltersAndAsOf(JobConf jobConf, Class
inputFormatClass,
- Path splitPath, boolean nonNative) {
+ protected void pushProjectionsAndFiltersAndAsOf(JobConf jobConf, Path
splitPath) {
Path splitPathWithNoSchema =
Path.getPathWithoutSchemeAndAuthority(splitPath);
if (this.mrwork == null) {
init(job);
@@ -977,32 +994,23 @@ public class HiveInputFormat<K extends
WritableComparable, V extends Writable>
while (iterator.hasNext()) {
Entry<Path, List<String>> entry = iterator.next();
Path key = entry.getKey();
+ // Note for HIVE-1903: for non-native tables we might only see a table
location provided as path in splitPath.
+ // In this case the code part below should still work, as the "key" will
be an exact match for splitPath.
+ // Also: we should not anticipate table paths to be under other tables'
locations.
boolean match;
- if (nonNative) {
- // For non-native tables, we need to do an exact match to avoid
- // HIVE-1903. (The table location contains no files, and the string
- // representation of its path does not have a trailing slash.)
- match =
- splitPath.equals(key) || splitPathWithNoSchema.equals(key);
- } else {
- // But for native tables, we need to do a prefix match for
- // subdirectories. (Unlike non-native tables, prefix mixups don't seem
- // to be a potential problem here since we are always dealing with the
- // path to something deeper than the table location.)
- if (pathsSize > 1) {
- // Comparing paths multiple times creates lots of objects &
- // creates GC pressure for tables having large number of partitions.
- // In such cases, use pre-computed paths for comparison
- if (splitParentPaths == null) {
- splitParentPaths = new HashSet<>();
- FileUtils.populateParentPaths(splitParentPaths, splitPath);
- FileUtils.populateParentPaths(splitParentPaths,
splitPathWithNoSchema);
- }
- match = splitParentPaths.contains(key);
- } else {
- match = FileUtils.isPathWithinSubtree(splitPath, key)
- || FileUtils.isPathWithinSubtree(splitPathWithNoSchema, key);
+ if (pathsSize > 1) {
+ // Comparing paths multiple times creates lots of objects &
+ // creates GC pressure for tables having large number of partitions.
+ // In such cases, use pre-computed paths for comparison
+ if (splitParentPaths == null) {
+ splitParentPaths = new HashSet<>();
+ FileUtils.populateParentPaths(splitParentPaths, splitPath);
+ FileUtils.populateParentPaths(splitParentPaths,
splitPathWithNoSchema);
}
+ match = splitParentPaths.contains(key);
+ } else {
+ match = FileUtils.isPathWithinSubtree(splitPath, key)
+ || FileUtils.isPathWithinSubtree(splitPathWithNoSchema, key);
}
if (match) {
List<String> list = entry.getValue();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
index 84a4f06..18ab79f 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/LlapCacheOnlyInputFormatInterface.java
@@ -25,4 +25,11 @@ import org.apache.hadoop.hive.common.io.FileMetadataCache;
/** Marker interface for LLAP IO. */
public interface LlapCacheOnlyInputFormatInterface {
void injectCaches(FileMetadataCache metadataCache, DataCache dataCache,
Configuration cacheConf);
+
+ /**
+ * For inputformats that can only accept LLAP caching with vectorization
turned on.
+ */
+ interface VectorizedOnly extends LlapCacheOnlyInputFormatInterface {
+
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index dbbd8c9..17e1053 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -311,7 +311,8 @@ public class MapWork extends BaseWork {
} else {
hasLlap = true;
}
- } else if (isLlapOn &&
HiveInputFormat.canInjectCaches(inputFormatClass)) {
+ } else if (isLlapOn &&
HiveInputFormat.canInjectCaches(inputFormatClass,
+ Utilities.getIsVectorized(conf, this))) {
hasCacheOnly = true;
} else {
hasNonLlap = true;
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
index 61c98b7..e40a0a6 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
@@ -215,8 +216,7 @@ public class TestHostAffinitySplitLocationProvider {
int[] hitCounts = new int[locs];
for (int splitIx = 0; splitIx < splits.length; ++splitIx) {
state.set(0);
- int index = HostAffinitySplitLocationProvider.determineLocation(partLocs,
- splits[splitIx].getPath().toString(), splits[splitIx].getStart(),
null);
+ int index =
HostAffinitySplitLocationProvider.determineLocation(partLocs, splits[splitIx]);
++hitCounts[index];
}
SummaryStatistics ss = new SummaryStatistics();
@@ -320,8 +320,7 @@ public class TestHostAffinitySplitLocationProvider {
doReturn(new Path(fakePathString)).when(fileSplit).getPath();
doReturn(locations).when(fileSplit).getLocations();
- doReturn(locations).when(fileSplit).getLocations();
- return fileSplit;
+ return new HiveInputFormat.HiveInputSplit(fileSplit, "unused");
}