This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a35a6ea HIVE-25628: Avoid unnecessary file ops if Iceberg table is
LLAP cached (Adam Szita, reviewed by Marton Bod)
a35a6ea is described below
commit a35a6ea7c16888664a631470e1773319cab59ff6
Author: Adam Szita <[email protected]>
AuthorDate: Thu Oct 28 15:12:08 2021 +0200
HIVE-25628: Avoid unnecessary file ops if Iceberg table is LLAP cached
(Adam Szita, reviewed by Marton Bod)
---
.../mr/hive/vector/HiveVectorizedReader.java | 12 ++-
.../apache/iceberg/orc/VectorizedReadUtils.java | 113 +++++++++++++++------
2 files changed, 90 insertions(+), 35 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
index 4f5acea..1375ee8 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcInputFormat;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -114,19 +115,24 @@ public class HiveVectorizedReader {
// Need to turn positional schema evolution off since we use column
name based schema evolution for projection
// and Iceberg will make a mapping between the file schema and the
current reading schema.
job.setBoolean(OrcConf.FORCE_POSITIONAL_EVOLUTION.getHiveConfName(),
false);
- VectorizedReadUtils.handleIcebergProjection(inputFile, task, job);
+
+ // TODO: Iceberg currently does not track the last modification time
of a file. Until that's added,
+ // we need to set Long.MIN_VALUE as last modification time in the
fileId triplet.
+ SyntheticFileId fileId = new SyntheticFileId(path,
task.file().fileSizeInBytes(), Long.MIN_VALUE);
+
+ VectorizedReadUtils.handleIcebergProjection(inputFile, task, job,
fileId);
RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
// If LLAP enabled, try to retrieve an LLAP record reader - this
might yield to null in some special cases
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED,
LlapProxy.isDaemon()) &&
LlapProxy.getIo() != null) {
- recordReader =
LlapProxy.getIo().llapVectorizedOrcReaderForPath(null, path, null,
readColumnIds,
+ recordReader =
LlapProxy.getIo().llapVectorizedOrcReaderForPath(fileId, path, null,
readColumnIds,
job, task.start(), task.length(), reporter);
}
if (recordReader == null) {
- InputSplit split = new OrcSplit(path, null, task.start(),
task.length(), (String[]) null, null, false,
+ InputSplit split = new OrcSplit(path, fileId, task.start(),
task.length(), (String[]) null, null, false,
false, com.google.common.collect.Lists.newArrayList(), 0,
task.length(), path.getParent(), null);
recordReader = new
VectorizedOrcInputFormat().getRecordReader(split, job, reporter);
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
index addf202..30f66b4 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/orc/VectorizedReadUtils.java
@@ -20,73 +20,122 @@
package org.apache.iceberg.orc;
import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.CacheTag;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapHiveUtils;
+import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hive.iceberg.org.apache.orc.Reader;
import org.apache.hive.iceberg.org.apache.orc.TypeDescription;
+import org.apache.hive.iceberg.org.apache.orc.impl.ReaderImpl;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.orc.impl.BufferChunk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Utilities that rely on Iceberg code from org.apache.iceberg.orc package.
*/
public class VectorizedReadUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(VectorizedReadUtils.class);
+
private VectorizedReadUtils() {
}
+ private static TypeDescription getSchemaForFile(InputFile inputFile,
SyntheticFileId fileId, JobConf job)
+ throws IOException {
+ TypeDescription schema = null;
+
+ if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED,
LlapProxy.isDaemon()) &&
+ LlapProxy.getIo() != null) {
+ MapWork mapWork = LlapHiveUtils.findMapWork(job);
+ Path path = new Path(inputFile.location());
+ PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path,
mapWork.getPathToPartitionInfo());
+
+ // Note: Since Hive doesn't know about partition information of Iceberg
tables, partitionDesc is only used to
+ // deduct the table (and DB) name here.
+ CacheTag cacheTag = HiveConf.getBoolVar(job,
HiveConf.ConfVars.LLAP_TRACK_CACHE_USAGE) ?
+ LlapHiveUtils.getDbAndTableNameForMetrics(path, true, partitionDesc)
: null;
+
+ try {
+ // Schema has to be serialized and deserialized as it is passed
between different packages of TypeDescription:
+ // Iceberg expects
org.apache.hive.iceberg.org.apache.orc.TypeDescription as it shades ORC, while
LLAP provides
+ // the unshaded org.apache.orc.TypeDescription type.
+ BufferChunk tailBuffer = LlapProxy.getIo().getOrcTailFromCache(path,
job, cacheTag, fileId).getTailBuffer();
+ schema = ReaderImpl.extractFileTail(tailBuffer.getData()).getSchema();
+ } catch (IOException ioe) {
+ LOG.warn("LLAP is turned on but was unable to get file metadata
information through its cache for {}",
+ path, ioe);
+ }
+
+ }
+
+ // Fallback to simple ORC reader file opening method in lack of or failure
of LLAP.
+ if (schema == null) {
+ try (Reader orcFileReader = ORC.newFileReader(inputFile, job)) {
+ schema = orcFileReader.getSchema();
+ }
+ }
+
+ return schema;
+
+ }
+
/**
* Adjusts the jobConf so that column reorders and renames that might have
happened since this ORC file was written
* are properly mapped to the schema of the original file.
* @param inputFile - the original ORC file - this needs to be accessed to
retrieve the original schema for mapping
* @param task - Iceberg task - required for
* @param job - JobConf instance to adjust
+ * @param fileId - FileID for the input file, serves as cache key in an LLAP
setup
* @throws IOException - errors relating to accessing the ORC file
*/
- public static void handleIcebergProjection(InputFile inputFile, FileScanTask
task, JobConf job)
- throws IOException {
- Reader orcFileReader = ORC.newFileReader(inputFile, job);
-
- try {
- // We need to map with the current (i.e. current Hive table columns)
full schema (without projections),
- // as OrcInputFormat will take care of the projections by the use of an
include boolean array
- Schema currentSchema = task.spec().schema();
- TypeDescription fileSchema = orcFileReader.getSchema();
-
- TypeDescription readOrcSchema;
- if (ORCSchemaUtil.hasIds(fileSchema)) {
- readOrcSchema = ORCSchemaUtil.buildOrcProjection(currentSchema,
fileSchema);
- } else {
- TypeDescription typeWithIds =
- ORCSchemaUtil.applyNameMapping(fileSchema,
MappingUtil.create(currentSchema));
- readOrcSchema = ORCSchemaUtil.buildOrcProjection(currentSchema,
typeWithIds);
- }
+ public static void handleIcebergProjection(InputFile inputFile, FileScanTask
task, JobConf job,
+ SyntheticFileId fileId) throws IOException {
+
+ // We need to map with the current (i.e. current Hive table columns) full
schema (without projections),
+ // as OrcInputFormat will take care of the projections by the use of an
include boolean array
+ Schema currentSchema = task.spec().schema();
+ TypeDescription fileSchema = getSchemaForFile(inputFile, fileId, job);
+
+ TypeDescription readOrcSchema;
+ if (ORCSchemaUtil.hasIds(fileSchema)) {
+ readOrcSchema = ORCSchemaUtil.buildOrcProjection(currentSchema,
fileSchema);
+ } else {
+ TypeDescription typeWithIds =
+ ORCSchemaUtil.applyNameMapping(fileSchema,
MappingUtil.create(currentSchema));
+ readOrcSchema = ORCSchemaUtil.buildOrcProjection(currentSchema,
typeWithIds);
+ }
- job.set(ColumnProjectionUtils.ORC_SCHEMA_STRING,
readOrcSchema.toString());
+ job.set(ColumnProjectionUtils.ORC_SCHEMA_STRING, readOrcSchema.toString());
- // Predicate pushdowns needs to be adjusted too in case of column
renames, we let Iceberg generate this into job
- if (task.residual() != null) {
- Expression boundFilter = Binder.bind(currentSchema.asStruct(),
task.residual(), false);
+ // Predicate pushdowns needs to be adjusted too in case of column renames,
we let Iceberg generate this into job
+ if (task.residual() != null) {
+ Expression boundFilter = Binder.bind(currentSchema.asStruct(),
task.residual(), false);
- // Note the use of the unshaded version of this class here (required
for SARG deseralization later)
- org.apache.hadoop.hive.ql.io.sarg.SearchArgument sarg =
- ExpressionToOrcSearchArgument.convert(boundFilter, readOrcSchema);
- if (sarg != null) {
- job.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
- job.unset(ConvertAstToSearchArg.SARG_PUSHDOWN);
+ // Note the use of the unshaded version of this class here (required for
SARG deseralization later)
+ org.apache.hadoop.hive.ql.io.sarg.SearchArgument sarg =
+ ExpressionToOrcSearchArgument.convert(boundFilter, readOrcSchema);
+ if (sarg != null) {
+ job.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
+ job.unset(ConvertAstToSearchArg.SARG_PUSHDOWN);
- job.set(ConvertAstToSearchArg.SARG_PUSHDOWN,
ConvertAstToSearchArg.sargToKryo(sarg));
- }
+ job.set(ConvertAstToSearchArg.SARG_PUSHDOWN,
ConvertAstToSearchArg.sargToKryo(sarg));
}
- } finally {
- orcFileReader.close();
}
}
}