This is an automated email from the ASF dual-hosted git repository.
szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new dc6e13d746a HIVE-25827: Parquet file footer is read multiple times,
when multiple splits are created in same file (#3368) (Adam Szita, reviewed by
Peter Vary)
dc6e13d746a is described below
commit dc6e13d746a88456254378455f37af576ee4722b
Author: Adam Szita <[email protected]>
AuthorDate: Wed Jun 22 11:07:43 2022 +0200
HIVE-25827: Parquet file footer is read multiple times, when multiple
splits are created in same file (#3368) (Adam Szita, reviewed by Peter Vary)
---
.../ql/io/parquet/ParquetRecordReaderBase.java | 176 ++++++++++++---------
.../parquet/read/ParquetRecordReaderWrapper.java | 23 +--
.../vector/VectorizedParquetRecordReader.java | 171 +++++++++-----------
.../ql/io/parquet/TestParquetRowGroupFilter.java | 4 +-
.../ql/io/parquet/TestVectorizedColumnReader.java | 10 +-
5 files changed, 187 insertions(+), 197 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
index 4cc32ae4804..a665c2586a3 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.RowGroupFilter;
@@ -48,115 +49,134 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ParquetRecordReaderBase {
+public abstract class ParquetRecordReaderBase {
public static final Logger LOG =
LoggerFactory.getLogger(ParquetRecordReaderBase.class);
- protected Path file;
+ protected final FileSplit fileSplit;
+ protected Path filePath;
+ protected ParquetInputSplit parquetInputSplit;
+ protected ParquetMetadata parquetMetadata;
protected ProjectionPusher projectionPusher;
protected boolean skipTimestampConversion = false;
protected Boolean skipProlepticConversion;
protected Boolean legacyConversionEnabled;
protected SerDeStats serDeStats;
- protected JobConf jobConf;
+ protected final JobConf jobConf;
protected int schemaSize;
- protected List<BlockMetaData> filtedBlocks;
+ protected List<BlockMetaData> filteredBlocks;
protected ParquetFileReader reader;
+ protected ParquetRecordReaderBase(JobConf conf, InputSplit oldSplit) throws
IOException {
+ serDeStats = new SerDeStats();
+ projectionPusher = new ProjectionPusher();
+
+ if (!(oldSplit instanceof FileSplit)) {
+ throw new IllegalArgumentException("Unknown split type: " + oldSplit);
+ }
+ this.fileSplit = (FileSplit) oldSplit;
+ this.jobConf = projectionPusher.pushProjectionsAndFilters(conf,
fileSplit.getPath().getParent());
+ this.filePath = fileSplit.getPath();
+ }
+
+ protected void setupMetadataAndParquetSplit(JobConf conf) throws IOException
{
+ // In the case of stat tasks a dummy split is created with -1 length but
real path...
+ if (fileSplit.getLength() != 0) {
+ parquetMetadata = getParquetMetadata(filePath, conf);
+ parquetInputSplit = getSplit(conf);
+ }
+ // having null as parquetInputSplit seems to be a valid case based on this
file's history
+ }
+
/**
* gets a ParquetInputSplit corresponding to a split given by Hive
*
- * @param oldSplit The split given by Hive
* @param conf The JobConf of the Hive job
* @return a ParquetInputSplit corresponding to the oldSplit
* @throws IOException if the config cannot be enhanced or if the footer
cannot be read from the file
*/
@SuppressWarnings("deprecation")
protected ParquetInputSplit getSplit(
- final org.apache.hadoop.mapred.InputSplit oldSplit,
final JobConf conf
) throws IOException {
- if (oldSplit.getLength() == 0) {
- return null;
- }
+
ParquetInputSplit split;
- if (oldSplit instanceof FileSplit) {
- final Path finalPath = ((FileSplit) oldSplit).getPath();
- jobConf = projectionPusher.pushProjectionsAndFilters(conf,
finalPath.getParent());
-
- // TODO enable MetadataFilter by using readFooter(Configuration
configuration, Path file,
- // MetadataFilter filter) API
- final ParquetMetadata parquetMetadata =
ParquetFileReader.readFooter(jobConf, finalPath);
- final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
- final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
-
- final ReadSupport.ReadContext
- readContext = new DataWritableReadSupport().init(new
InitContext(jobConf,
- null, fileMetaData.getSchema()));
-
- // Compute stats
- for (BlockMetaData bmd : blocks) {
- serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
- serDeStats.setRawDataSize(serDeStats.getRawDataSize() +
bmd.getTotalByteSize());
- }
+ final Path finalPath = fileSplit.getPath();
+
+ // TODO enable MetadataFilter by using readFooter(Configuration
configuration, Path file,
+ // MetadataFilter filter) API
+ final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
+ final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
+
+ final ReadSupport.ReadContext
+ readContext = new DataWritableReadSupport().init(new InitContext(jobConf,
+ null, fileMetaData.getSchema()));
+
+ // Compute stats
+ for (BlockMetaData bmd : blocks) {
+ serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
+ serDeStats.setRawDataSize(serDeStats.getRawDataSize() +
bmd.getTotalByteSize());
+ }
- schemaSize =
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
-
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
- final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
- final long splitStart = ((FileSplit) oldSplit).getStart();
- final long splitLength = ((FileSplit) oldSplit).getLength();
- for (final BlockMetaData block : blocks) {
- final long firstDataPage =
block.getColumns().get(0).getFirstDataPageOffset();
- if (firstDataPage >= splitStart && firstDataPage < splitStart +
splitLength) {
- splitGroup.add(block);
- }
+ schemaSize =
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
+
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
+ final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
+ final long splitStart = fileSplit.getStart();
+ final long splitLength = fileSplit.getLength();
+ for (final BlockMetaData block : blocks) {
+ final long firstDataPage =
block.getColumns().get(0).getFirstDataPageOffset();
+ if (firstDataPage >= splitStart && firstDataPage < splitStart +
splitLength) {
+ splitGroup.add(block);
}
- if (splitGroup.isEmpty()) {
- LOG.warn("Skipping split, could not find row group in: " + oldSplit);
+ }
+ if (splitGroup.isEmpty()) {
+ LOG.warn("Skipping split, could not find row group in: " + fileSplit);
+ return null;
+ }
+
+ FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema());
+ if (filter != null) {
+ filteredBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup,
fileMetaData.getSchema());
+ if (filteredBlocks.isEmpty()) {
+ LOG.debug("All row groups are dropped due to filter predicates");
return null;
}
- FilterCompat.Filter filter = setFilter(jobConf,
fileMetaData.getSchema());
- if (filter != null) {
- filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup,
fileMetaData.getSchema());
- if (filtedBlocks.isEmpty()) {
- LOG.debug("All row groups are dropped due to filter predicates");
- return null;
- }
-
- long droppedBlocks = splitGroup.size() - filtedBlocks.size();
- if (droppedBlocks > 0) {
- LOG.debug("Dropping " + droppedBlocks + " row groups that do not
pass filter predicate");
- }
- } else {
- filtedBlocks = splitGroup;
+ long droppedBlocks = splitGroup.size() - filteredBlocks.size();
+ if (droppedBlocks > 0) {
+ LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass
filter predicate");
}
+ } else {
+ filteredBlocks = splitGroup;
+ }
- if (HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
- skipTimestampConversion =
!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
- }
- skipProlepticConversion = DataWritableReadSupport
- .getWriterDateProleptic(fileMetaData.getKeyValueMetaData());
- if (skipProlepticConversion == null) {
- skipProlepticConversion = HiveConf.getBoolVar(
- conf,
HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
- }
+ if (HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
+ skipTimestampConversion =
!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
+ }
+ skipProlepticConversion = DataWritableReadSupport
+ .getWriterDateProleptic(fileMetaData.getKeyValueMetaData());
+ if (skipProlepticConversion == null) {
+ skipProlepticConversion = HiveConf.getBoolVar(
+ conf,
HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
+ }
legacyConversionEnabled =
DataWritableReadSupport.getZoneConversionLegacy(fileMetaData.getKeyValueMetaData(),
conf);
- split = new ParquetInputSplit(finalPath,
- splitStart,
- splitLength,
- oldSplit.getLocations(),
- filtedBlocks,
- readContext.getRequestedSchema().toString(),
- fileMetaData.getSchema().toString(),
- fileMetaData.getKeyValueMetaData(),
- readContext.getReadSupportMetadata());
- return split;
- } else {
- throw new IllegalArgumentException("Unknown split type: " + oldSplit);
- }
+ split = new ParquetInputSplit(finalPath,
+ splitStart,
+ splitLength,
+ fileSplit.getLocations(),
+ filteredBlocks,
+ readContext.getRequestedSchema().toString(),
+ fileMetaData.getSchema().toString(),
+ fileMetaData.getKeyValueMetaData(),
+ readContext.getReadSupportMetadata());
+ return split;
+ }
+
+ @SuppressWarnings("deprecation")
+ protected ParquetMetadata getParquetMetadata(Path path, JobConf conf) throws
IOException {
+ return ParquetFileReader.readFooter(jobConf, path);
}
public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema)
{
@@ -189,8 +209,8 @@ public class ParquetRecordReaderBase {
}
}
- public List<BlockMetaData> getFiltedBlocks() {
- return filtedBlocks;
+ public List<BlockMetaData> getFilteredBlocks() {
+ return filteredBlocks;
}
public SerDeStats getStats() {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
index 113d61f5f97..aebcd247354 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
@@ -57,22 +57,11 @@ public class ParquetRecordReaderWrapper extends
ParquetRecordReaderBase
final JobConf oldJobConf,
final Reporter reporter)
throws IOException, InterruptedException {
- this(newInputFormat, oldSplit, oldJobConf, reporter, new
ProjectionPusher());
- }
+ super(oldJobConf, oldSplit);
- public ParquetRecordReaderWrapper(
- final ParquetInputFormat<ArrayWritable> newInputFormat,
- final InputSplit oldSplit,
- final JobConf oldJobConf,
- final Reporter reporter,
- final ProjectionPusher pusher)
- throws IOException, InterruptedException {
- this.splitLen = oldSplit.getLength();
- this.projectionPusher = pusher;
- this.serDeStats = new SerDeStats();
+ setupMetadataAndParquetSplit(oldJobConf);
- jobConf = oldJobConf;
- final ParquetInputSplit split = getSplit(oldSplit, jobConf);
+ this.splitLen = oldSplit.getLength();
TaskAttemptID taskAttemptID =
TaskAttemptID.forName(jobConf.get(IOConstants.MAPRED_TASK_ID));
if (taskAttemptID == null) {
@@ -89,10 +78,10 @@ public class ParquetRecordReaderWrapper extends
ParquetRecordReaderBase
}
final TaskAttemptContext taskContext =
ContextUtil.newTaskAttemptContext(conf, taskAttemptID);
- if (split != null) {
+ if (parquetInputSplit != null) {
try {
- realReader = newInputFormat.createRecordReader(split, taskContext);
- realReader.initialize(split, taskContext);
+ realReader = newInputFormat.createRecordReader(parquetInputSplit,
taskContext);
+ realReader.initialize(parquetInputSplit, taskContext);
// read once to gain access to key and value objects
if (realReader.nextKeyValue()) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
index d17ddd5ab11..e0e14863dfd 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java
@@ -37,27 +37,24 @@ import org.apache.hadoop.hive.ql.io.BucketIdentifier;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase;
-import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.parquet.ParquetRuntimeException;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.filter2.compat.FilterCompat;
import
org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
@@ -83,15 +80,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import static org.apache.hadoop.hive.llap.LlapHiveUtils.throwIfCacheOnlyRead;
-import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
import static
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
-import static
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
/**
* This reader is used to read a batch of record from inputsplit, part of the
code is referred
@@ -111,7 +104,6 @@ public class VectorizedParquetRecordReader extends
ParquetRecordReaderBase
private Object[] partitionValues;
private Path cacheFsPath;
private static final int MAP_DEFINITION_LEVEL_MAX = 3;
- private Map<Path, PartitionDesc> parts;
private final boolean isReadCacheOnly;
/**
@@ -138,32 +130,53 @@ public class VectorizedParquetRecordReader extends
ParquetRecordReaderBase
private ZoneId writerTimezone;
private final BucketIdentifier bucketIdentifier;
- public VectorizedParquetRecordReader(
- org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) {
+ // LLAP cache integration
+ // TODO: also support fileKey in splits, like OrcSplit does
+ private Object cacheKey = null;
+ private CacheTag cacheTag = null;
+
+ public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf)
throws IOException {
this(oldInputSplit, conf, null, null, null);
}
public VectorizedParquetRecordReader(
- org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf,
- FileMetadataCache metadataCache, DataCache dataCache, Configuration
cacheConf) {
+ InputSplit oldInputSplit, JobConf conf,
+ FileMetadataCache metadataCache, DataCache dataCache, Configuration
cacheConf)
+ throws IOException {
+ super(conf, oldInputSplit);
try {
this.metadataCache = metadataCache;
this.cache = dataCache;
this.cacheConf = cacheConf;
- serDeStats = new SerDeStats();
- projectionPusher = new ProjectionPusher();
+
+ if (metadataCache != null) {
+ cacheKey = HdfsUtils.getFileId(filePath.getFileSystem(conf), filePath,
+ HiveConf.getBoolVar(cacheConf,
ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
+ HiveConf.getBoolVar(cacheConf,
ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
+ !HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH));
+ // HdfsUtils.getFileId might yield to null in certain configurations
+ if (cacheKey != null) {
+ cacheTag = cacheTagOfParquetFile(filePath, cacheConf, conf);
+ // If we are going to use cache, change the path to depend on file
ID for extra consistency.
+ if (cacheKey instanceof Long && HiveConf.getBoolVar(
+ cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
+ filePath = HdfsUtils.getFileIdPath(filePath, (long)cacheKey);
+ }
+ }
+ }
+
+ setupMetadataAndParquetSplit(conf);
+
colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf);
//initialize the rowbatchContext
- jobConf = conf;
isReadCacheOnly = HiveConf.getBoolVar(jobConf,
ConfVars.LLAP_IO_CACHE_ONLY);
rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf);
- ParquetInputSplit inputSplit = getSplit(oldInputSplit, conf);
- if (inputSplit != null) {
- initialize(inputSplit, conf);
+
+ if (parquetInputSplit != null) {
+ initialize(parquetInputSplit, conf);
}
- FileSplit fileSplit = (FileSplit) oldInputSplit;
initPartitionValues(fileSplit, conf);
- bucketIdentifier = BucketIdentifier.from(conf, fileSplit.getPath());
+ bucketIdentifier = BucketIdentifier.from(conf, filePath);
} catch (Throwable e) {
LOG.error("Failed to create the vectorized reader due to exception " +
e);
throw new RuntimeException(e);
@@ -180,26 +193,20 @@ public class VectorizedParquetRecordReader extends
ParquetRecordReaderBase
}
}
+ @Override
+ protected ParquetMetadata getParquetMetadata(Path path, JobConf conf) throws
IOException {
+ return readSplitFooter(conf, filePath, cacheKey, NO_FILTER, cacheTag);
+ }
+
@SuppressWarnings("deprecation")
public void initialize(
- InputSplit oldSplit,
+ ParquetInputSplit split,
JobConf configuration) throws IOException, InterruptedException,
HiveException {
- // the oldSplit may be null during the split phase
- if (oldSplit == null) {
- return;
- }
- ParquetMetadata footer;
- List<BlockMetaData> blocks;
- MapWork mapWork = LlapHiveUtils.findMapWork(jobConf);
- if (mapWork != null) {
- parts = mapWork.getPathToPartitionInfo();
- }
+ List<BlockMetaData> blocks;
- ParquetInputSplit split = (ParquetInputSplit) oldSplit;
boolean indexAccess =
configuration.getBoolean(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS,
false);
- this.file = split.getPath();
long[] rowGroupOffsets = split.getRowGroupOffsets();
String columnNames = configuration.get(IOConstants.COLUMNS);
@@ -207,83 +214,46 @@ public class VectorizedParquetRecordReader extends
ParquetRecordReaderBase
String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES);
columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes);
- // if task.side.metadata is set, rowGroupOffsets is null
- Object cacheKey = null;
- CacheTag cacheTag = null;
- // TODO: also support fileKey in splits, like OrcSplit does
- if (metadataCache != null) {
- if (cacheKey == null) {
- cacheKey = HdfsUtils.getFileId(file.getFileSystem(configuration), file,
- HiveConf.getBoolVar(cacheConf,
ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID),
- HiveConf.getBoolVar(cacheConf,
ConfVars.LLAP_CACHE_DEFAULT_FS_FILE_ID),
- !HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH));
- }
+ Set<Long> offsets = new HashSet<>();
+ for (long offset : rowGroupOffsets) {
+ offsets.add(offset);
}
- if (cacheKey != null) {
- if (HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE)) {
- PartitionDesc partitionDesc =
LlapHiveUtils.partitionDescForPath(split.getPath(), parts);
- cacheTag = LlapHiveUtils.getDbAndTableNameForMetrics(file, true,
partitionDesc);
- }
- // If we are going to use cache, change the path to depend on file ID
for extra consistency.
- FileSystem fs = file.getFileSystem(configuration);
- if (cacheKey instanceof Long && HiveConf.getBoolVar(
- cacheConf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
- file = HdfsUtils.getFileIdPath(file, (long)cacheKey);
+ blocks = new ArrayList<>();
+ for (BlockMetaData block : parquetMetadata.getBlocks()) {
+ if (offsets.contains(block.getStartingPos())) {
+ blocks.add(block);
}
}
-
- if (rowGroupOffsets == null) {
- //TODO check whether rowGroupOffSets can be null
- // then we need to apply the predicate push down filter
- footer = readSplitFooter(
- configuration, file, cacheKey, range(split.getStart(),
split.getEnd()), cacheTag);
- MessageType fileSchema = footer.getFileMetaData().getSchema();
- FilterCompat.Filter filter = getFilter(configuration);
- blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
- } else {
- // otherwise we find the row groups that were selected on the client
- footer = readSplitFooter(configuration, file, cacheKey, NO_FILTER,
cacheTag);
- Set<Long> offsets = new HashSet<>();
- for (long offset : rowGroupOffsets) {
- offsets.add(offset);
- }
- blocks = new ArrayList<>();
- for (BlockMetaData block : footer.getBlocks()) {
- if (offsets.contains(block.getStartingPos())) {
- blocks.add(block);
- }
- }
- // verify we found them all
- if (blocks.size() != rowGroupOffsets.length) {
- long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
- for (int i = 0; i < foundRowGroupOffsets.length; i++) {
- foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
- }
- // this should never happen.
- // provide a good error message in case there's a bug
- throw new IllegalStateException(
- "All the offsets listed in the split should be found in the file."
- + " expected: " + Arrays.toString(rowGroupOffsets)
- + " found: " + blocks
- + " out of: " + Arrays.toString(foundRowGroupOffsets)
- + " in range " + split.getStart() + ", " + split.getEnd());
+ // verify we found them all
+ if (blocks.size() != rowGroupOffsets.length) {
+ long[] foundRowGroupOffsets = new
long[parquetMetadata.getBlocks().size()];
+ for (int i = 0; i < foundRowGroupOffsets.length; i++) {
+ foundRowGroupOffsets[i] =
parquetMetadata.getBlocks().get(i).getStartingPos();
}
+ // this should never happen.
+ // provide a good error message in case there's a bug
+ throw new IllegalStateException(
+ "All the offsets listed in the split should be found in the file."
+ + " expected: " + Arrays.toString(rowGroupOffsets)
+ + " found: " + blocks
+ + " out of: " + Arrays.toString(foundRowGroupOffsets)
+ + " in range " + split.getStart() + ", " + split.getEnd());
}
for (BlockMetaData block : blocks) {
this.totalRowCount += block.getRowCount();
}
- this.fileSchema = footer.getFileMetaData().getSchema();
+ this.fileSchema = parquetMetadata.getFileMetaData().getSchema();
this.writerTimezone = DataWritableReadSupport
- .getWriterTimeZoneId(footer.getFileMetaData().getKeyValueMetaData());
+
.getWriterTimeZoneId(parquetMetadata.getFileMetaData().getKeyValueMetaData());
colsToInclude = ColumnProjectionUtils.getReadColumnIDs(configuration);
requestedSchema = DataWritableReadSupport
.getRequestedSchema(indexAccess, columnNamesList, columnTypesList,
fileSchema, configuration);
- Path path = wrapPathForCache(file, cacheKey, configuration, blocks,
cacheTag);
+ Path path = wrapPathForCache(filePath, cacheKey, configuration, blocks,
cacheTag);
this.reader = new ParquetFileReader(
- configuration, footer.getFileMetaData(), path, blocks,
requestedSchema.getColumns());
+ configuration, parquetMetadata.getFileMetaData(), path, blocks,
requestedSchema.getColumns());
}
private Path wrapPathForCache(Path path, Object fileKey, JobConf
configuration,
@@ -360,13 +330,22 @@ public class VectorizedParquetRecordReader extends
ParquetRecordReaderBase
return HadoopStreams.wrap(fs.open(file));
}
@Override
- public long getLength() throws IOException {
+ public long getLength() {
return stat.getLen();
}
};
return ParquetFileReader.readFooter(inputFile, filter);
}
+ private static CacheTag cacheTagOfParquetFile(Path path, Configuration
cacheConf, JobConf jobConf) {
+ MapWork mapWork = LlapHiveUtils.findMapWork(jobConf);
+ if (!HiveConf.getBoolVar(cacheConf, ConfVars.LLAP_TRACK_CACHE_USAGE) ||
mapWork == null) {
+ return null;
+ }
+ PartitionDesc partitionDesc = LlapHiveUtils.partitionDescForPath(path,
mapWork.getPathToPartitionInfo());
+ return LlapHiveUtils.getDbAndTableNameForMetrics(path, true,
partitionDesc);
+ }
+
private FileMetadataCache metadataCache;
private DataCache cache;
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
index c33b701d94c..24697691bc9 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRowGroupFilter.java
@@ -108,7 +108,7 @@ public class TestParquetRowGroupFilter extends
AbstractTestParquetDirect {
new MapredParquetInputFormat().getRecordReader(
new FileSplit(testPath, 0, fileLength(testPath), (String[]) null),
conf, null);
- Assert.assertEquals("row group is not filtered correctly", 1,
recordReader.getFiltedBlocks().size());
+ Assert.assertEquals("row group is not filtered correctly", 1,
recordReader.getFilteredBlocks().size());
// > 100
constantDesc = new ExprNodeConstantDesc(100);
@@ -121,7 +121,7 @@ public class TestParquetRowGroupFilter extends
AbstractTestParquetDirect {
new MapredParquetInputFormat().getRecordReader(
new FileSplit(testPath, 0, fileLength(testPath), (String[]) null),
conf, null);
- Assert.assertEquals("row group is not filtered correctly", 0,
recordReader.getFiltedBlocks().size());
+ Assert.assertEquals("row group is not filtered correctly", 0,
recordReader.getFilteredBlocks().size());
}
private ArrayWritableObjectInspector getObjectInspector(final String
columnNames, final String columnTypes) {
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
index 52e6045b631..e290e332e7f 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -119,12 +121,12 @@ public class TestVectorizedColumnReader extends
VectorizedColumnReaderTestBase {
private class TestVectorizedParquetRecordReader extends
VectorizedParquetRecordReader {
public TestVectorizedParquetRecordReader(
- org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) {
+ org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf)
throws IOException {
super(oldInputSplit, conf);
}
+
@Override
- protected ParquetInputSplit getSplit(
- org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) {
+ protected ParquetInputSplit getSplit(JobConf conf) throws IOException {
return null;
}
}
@@ -145,6 +147,6 @@ public class TestVectorizedColumnReader extends
VectorizedColumnReaderTestBase {
FileSplit fsplit = getFileSplit(vectorJob);
JobConf jobConf = new JobConf(conf);
TestVectorizedParquetRecordReader testReader = new
TestVectorizedParquetRecordReader(fsplit, jobConf);
- Assert.assertNull("Test should return null split from getSplit() method",
testReader.getSplit(fsplit, jobConf));
+ Assert.assertNull("Test should return null split from getSplit() method",
testReader.getSplit(null));
}
}