[
https://issues.apache.org/jira/browse/HIVE-25827?focusedWorklogId=783379&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783379
]
ASF GitHub Bot logged work on HIVE-25827:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Jun/22 13:12
Start Date: 21/Jun/22 13:12
Worklog Time Spent: 10m
Work Description: szlta commented on code in PR #3368:
URL: https://github.com/apache/hive/pull/3368#discussion_r902601766
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java:
##########
@@ -48,115 +49,133 @@
import java.util.List;
import java.util.Map;
-public class ParquetRecordReaderBase {
+public abstract class ParquetRecordReaderBase {
public static final Logger LOG =
LoggerFactory.getLogger(ParquetRecordReaderBase.class);
- protected Path file;
+ protected final FileSplit fileSplit;
+ protected Path filePath;
+ protected ParquetInputSplit parquetInputSplit;
+ protected ParquetMetadata parquetMetadata;
protected ProjectionPusher projectionPusher;
protected boolean skipTimestampConversion = false;
protected Boolean skipProlepticConversion;
protected Boolean legacyConversionEnabled;
protected SerDeStats serDeStats;
- protected JobConf jobConf;
+ protected final JobConf jobConf;
protected int schemaSize;
- protected List<BlockMetaData> filtedBlocks;
+ protected List<BlockMetaData> filteredBlocks;
protected ParquetFileReader reader;
+ protected ParquetRecordReaderBase(JobConf conf, InputSplit oldSplit) throws
IOException {
+ serDeStats = new SerDeStats();
+ projectionPusher = new ProjectionPusher();
+
+ if (!(oldSplit instanceof FileSplit)) {
+ throw new IllegalArgumentException("Unknown split type: " + oldSplit);
+ }
+ this.fileSplit = (FileSplit) oldSplit;
+ this.jobConf = projectionPusher.pushProjectionsAndFilters(conf,
fileSplit.getPath().getParent());
+ this.filePath = fileSplit.getPath();
+ }
+
+ protected void setupMetadataAndParquetSplit(JobConf conf) throws IOException
{
+ // In the case of stat tasks a dummy split is created with -1 length but
real path...
+ if (fileSplit.getLength() != 0) {
+ parquetMetadata = getParquetMetadata(filePath, conf);
+ parquetInputSplit = getSplit(conf);
+ }
+ // having null as parquetInputSplit seems to be a valid case based on this
file's history
+ }
+
/**
* gets a ParquetInputSplit corresponding to a split given by Hive
*
- * @param oldSplit The split given by Hive
* @param conf The JobConf of the Hive job
* @return a ParquetInputSplit corresponding to the oldSplit
* @throws IOException if the config cannot be enhanced or if the footer
cannot be read from the file
*/
@SuppressWarnings("deprecation")
protected ParquetInputSplit getSplit(
- final org.apache.hadoop.mapred.InputSplit oldSplit,
final JobConf conf
) throws IOException {
- if (oldSplit.getLength() == 0) {
- return null;
- }
+
ParquetInputSplit split;
- if (oldSplit instanceof FileSplit) {
- final Path finalPath = ((FileSplit) oldSplit).getPath();
- jobConf = projectionPusher.pushProjectionsAndFilters(conf,
finalPath.getParent());
-
- // TODO enable MetadataFilter by using readFooter(Configuration
configuration, Path file,
- // MetadataFilter filter) API
- final ParquetMetadata parquetMetadata =
ParquetFileReader.readFooter(jobConf, finalPath);
- final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
- final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
-
- final ReadSupport.ReadContext
- readContext = new DataWritableReadSupport().init(new
InitContext(jobConf,
- null, fileMetaData.getSchema()));
-
- // Compute stats
- for (BlockMetaData bmd : blocks) {
- serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
- serDeStats.setRawDataSize(serDeStats.getRawDataSize() +
bmd.getTotalByteSize());
- }
+ final Path finalPath = fileSplit.getPath();
+
+ // TODO enable MetadataFilter by using readFooter(Configuration
configuration, Path file,
+ // MetadataFilter filter) API
+ final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
+ final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
+
+ final ReadSupport.ReadContext
+ readContext = new DataWritableReadSupport().init(new InitContext(jobConf,
+ null, fileMetaData.getSchema()));
+
+ // Compute stats
+ for (BlockMetaData bmd : blocks) {
+ serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
+ serDeStats.setRawDataSize(serDeStats.getRawDataSize() +
bmd.getTotalByteSize());
+ }
- schemaSize =
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
-
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
- final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
- final long splitStart = ((FileSplit) oldSplit).getStart();
- final long splitLength = ((FileSplit) oldSplit).getLength();
- for (final BlockMetaData block : blocks) {
- final long firstDataPage =
block.getColumns().get(0).getFirstDataPageOffset();
- if (firstDataPage >= splitStart && firstDataPage < splitStart +
splitLength) {
- splitGroup.add(block);
- }
+ schemaSize =
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
+
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
+ final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
+ final long splitStart = fileSplit.getStart();
+ final long splitLength = fileSplit.getLength();
+ for (final BlockMetaData block : blocks) {
+ final long firstDataPage =
block.getColumns().get(0).getFirstDataPageOffset();
+ if (firstDataPage >= splitStart && firstDataPage < splitStart +
splitLength) {
+ splitGroup.add(block);
}
- if (splitGroup.isEmpty()) {
- LOG.warn("Skipping split, could not find row group in: " + oldSplit);
+ }
+ if (splitGroup.isEmpty()) {
+ LOG.warn("Skipping split, could not find row group in: " + fileSplit);
+ return null;
+ }
+
+ FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema());
+ if (filter != null) {
+ filteredBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup,
fileMetaData.getSchema());
+ if (filteredBlocks.isEmpty()) {
+ LOG.debug("All row groups are dropped due to filter predicates");
return null;
}
- FilterCompat.Filter filter = setFilter(jobConf,
fileMetaData.getSchema());
- if (filter != null) {
- filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup,
fileMetaData.getSchema());
- if (filtedBlocks.isEmpty()) {
- LOG.debug("All row groups are dropped due to filter predicates");
- return null;
- }
-
- long droppedBlocks = splitGroup.size() - filtedBlocks.size();
- if (droppedBlocks > 0) {
- LOG.debug("Dropping " + droppedBlocks + " row groups that do not
pass filter predicate");
- }
- } else {
- filtedBlocks = splitGroup;
+ long droppedBlocks = splitGroup.size() - filteredBlocks.size();
+ if (droppedBlocks > 0) {
+ LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass
filter predicate");
}
+ } else {
+ filteredBlocks = splitGroup;
+ }
- if (HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
- skipTimestampConversion =
!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
- }
- skipProlepticConversion = DataWritableReadSupport
- .getWriterDateProleptic(fileMetaData.getKeyValueMetaData());
- if (skipProlepticConversion == null) {
- skipProlepticConversion = HiveConf.getBoolVar(
- conf,
HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
- }
+ if (HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
+ skipTimestampConversion =
!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
+ }
+ skipProlepticConversion = DataWritableReadSupport
+ .getWriterDateProleptic(fileMetaData.getKeyValueMetaData());
+ if (skipProlepticConversion == null) {
+ skipProlepticConversion = HiveConf.getBoolVar(
+ conf,
HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
+ }
legacyConversionEnabled =
DataWritableReadSupport.getZoneConversionLegacy(fileMetaData.getKeyValueMetaData(),
conf);
- split = new ParquetInputSplit(finalPath,
- splitStart,
- splitLength,
- oldSplit.getLocations(),
- filtedBlocks,
- readContext.getRequestedSchema().toString(),
- fileMetaData.getSchema().toString(),
- fileMetaData.getKeyValueMetaData(),
- readContext.getReadSupportMetadata());
- return split;
- } else {
- throw new IllegalArgumentException("Unknown split type: " + oldSplit);
- }
+ split = new ParquetInputSplit(finalPath,
+ splitStart,
+ splitLength,
+ fileSplit.getLocations(), filteredBlocks,
Review Comment:
fixed
Issue Time Tracking
-------------------
Worklog Id: (was: 783379)
Time Spent: 1h 10m (was: 1h)
> Parquet file footer is read multiple times, when multiple splits are created
> in same file
> -----------------------------------------------------------------------------------------
>
> Key: HIVE-25827
> URL: https://issues.apache.org/jira/browse/HIVE-25827
> Project: Hive
> Issue Type: Improvement
> Reporter: Rajesh Balamohan
> Assignee: Ádám Szita
> Priority: Major
> Labels: performance, pull-request-available
> Attachments: image-2021-12-21-03-19-38-577.png
>
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> With large files, it is possible that multiple splits are created in the same
> file. With current codebase, "ParquetRecordReaderBase" ends up reading file
> footer for each split.
> It can be optimized not to read footer information multiple times for the
> same file.
>
> [https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java#L160]
>
> [https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java#L91]
>
>
> !image-2021-12-21-03-19-38-577.png|width=1363,height=1256!
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)