[ 
https://issues.apache.org/jira/browse/HIVE-25827?focusedWorklogId=783379&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-783379
 ]

ASF GitHub Bot logged work on HIVE-25827:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Jun/22 13:12
            Start Date: 21/Jun/22 13:12
    Worklog Time Spent: 10m 
      Work Description: szlta commented on code in PR #3368:
URL: https://github.com/apache/hive/pull/3368#discussion_r902601766


##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java:
##########
@@ -48,115 +49,133 @@
 import java.util.List;
 import java.util.Map;
 
-public class ParquetRecordReaderBase {
+public abstract class ParquetRecordReaderBase {
   public static final Logger LOG = 
LoggerFactory.getLogger(ParquetRecordReaderBase.class);
 
-  protected Path file;
+  protected final FileSplit fileSplit;
+  protected Path filePath;
+  protected ParquetInputSplit parquetInputSplit;
+  protected ParquetMetadata parquetMetadata;
   protected ProjectionPusher projectionPusher;
   protected boolean skipTimestampConversion = false;
   protected Boolean skipProlepticConversion;
   protected Boolean legacyConversionEnabled;
   protected SerDeStats serDeStats;
-  protected JobConf jobConf;
+  protected final JobConf jobConf;
 
   protected int schemaSize;
-  protected List<BlockMetaData> filtedBlocks;
+  protected List<BlockMetaData> filteredBlocks;
   protected ParquetFileReader reader;
 
+  protected ParquetRecordReaderBase(JobConf conf, InputSplit oldSplit) throws 
IOException {
+    serDeStats = new SerDeStats();
+    projectionPusher = new ProjectionPusher();
+
+    if (!(oldSplit instanceof FileSplit)) {
+      throw new IllegalArgumentException("Unknown split type: " + oldSplit);
+    }
+    this.fileSplit = (FileSplit) oldSplit;
+    this.jobConf = projectionPusher.pushProjectionsAndFilters(conf, 
fileSplit.getPath().getParent());
+    this.filePath = fileSplit.getPath();
+  }
+
+  protected void setupMetadataAndParquetSplit(JobConf conf) throws IOException 
{
+    // In the case of stat tasks a dummy split is created with -1 length but 
real path...
+    if (fileSplit.getLength() != 0) {
+      parquetMetadata = getParquetMetadata(filePath, conf);
+      parquetInputSplit = getSplit(conf);
+    }
+    // having null as parquetInputSplit seems to be a valid case based on this 
file's history
+  }
+
   /**
    * gets a ParquetInputSplit corresponding to a split given by Hive
    *
-   * @param oldSplit The split given by Hive
    * @param conf The JobConf of the Hive job
    * @return a ParquetInputSplit corresponding to the oldSplit
    * @throws IOException if the config cannot be enhanced or if the footer 
cannot be read from the file
    */
   @SuppressWarnings("deprecation")
   protected ParquetInputSplit getSplit(
-    final org.apache.hadoop.mapred.InputSplit oldSplit,
     final JobConf conf
   ) throws IOException {
-    if (oldSplit.getLength() == 0) {
-      return null;
-    }
+
     ParquetInputSplit split;
-    if (oldSplit instanceof FileSplit) {
-      final Path finalPath = ((FileSplit) oldSplit).getPath();
-      jobConf = projectionPusher.pushProjectionsAndFilters(conf, 
finalPath.getParent());
-
-      // TODO enable MetadataFilter by using readFooter(Configuration 
configuration, Path file,
-      // MetadataFilter filter) API
-      final ParquetMetadata parquetMetadata = 
ParquetFileReader.readFooter(jobConf, finalPath);
-      final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
-      final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
-
-      final ReadSupport.ReadContext
-        readContext = new DataWritableReadSupport().init(new 
InitContext(jobConf,
-        null, fileMetaData.getSchema()));
-
-      // Compute stats
-      for (BlockMetaData bmd : blocks) {
-        serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
-        serDeStats.setRawDataSize(serDeStats.getRawDataSize() + 
bmd.getTotalByteSize());
-      }
+    final Path finalPath = fileSplit.getPath();
+
+    // TODO enable MetadataFilter by using readFooter(Configuration 
configuration, Path file,
+    // MetadataFilter filter) API
+    final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
+    final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
+
+    final ReadSupport.ReadContext
+      readContext = new DataWritableReadSupport().init(new InitContext(jobConf,
+      null, fileMetaData.getSchema()));
+
+    // Compute stats
+    for (BlockMetaData bmd : blocks) {
+      serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
+      serDeStats.setRawDataSize(serDeStats.getRawDataSize() + 
bmd.getTotalByteSize());
+    }
 
-      schemaSize = 
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
-        
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
-      final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
-      final long splitStart = ((FileSplit) oldSplit).getStart();
-      final long splitLength = ((FileSplit) oldSplit).getLength();
-      for (final BlockMetaData block : blocks) {
-        final long firstDataPage = 
block.getColumns().get(0).getFirstDataPageOffset();
-        if (firstDataPage >= splitStart && firstDataPage < splitStart + 
splitLength) {
-          splitGroup.add(block);
-        }
+    schemaSize = 
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
+      
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
+    final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
+    final long splitStart = fileSplit.getStart();
+    final long splitLength = fileSplit.getLength();
+    for (final BlockMetaData block : blocks) {
+      final long firstDataPage = 
block.getColumns().get(0).getFirstDataPageOffset();
+      if (firstDataPage >= splitStart && firstDataPage < splitStart + 
splitLength) {
+        splitGroup.add(block);
       }
-      if (splitGroup.isEmpty()) {
-        LOG.warn("Skipping split, could not find row group in: " + oldSplit);
+    }
+    if (splitGroup.isEmpty()) {
+      LOG.warn("Skipping split, could not find row group in: " + fileSplit);
+      return null;
+    }
+
+    FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema());
+    if (filter != null) {
+      filteredBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, 
fileMetaData.getSchema());
+      if (filteredBlocks.isEmpty()) {
+        LOG.debug("All row groups are dropped due to filter predicates");
         return null;
       }
 
-      FilterCompat.Filter filter = setFilter(jobConf, 
fileMetaData.getSchema());
-      if (filter != null) {
-        filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, 
fileMetaData.getSchema());
-        if (filtedBlocks.isEmpty()) {
-          LOG.debug("All row groups are dropped due to filter predicates");
-          return null;
-        }
-
-        long droppedBlocks = splitGroup.size() - filtedBlocks.size();
-        if (droppedBlocks > 0) {
-          LOG.debug("Dropping " + droppedBlocks + " row groups that do not 
pass filter predicate");
-        }
-      } else {
-        filtedBlocks = splitGroup;
+      long droppedBlocks = splitGroup.size() - filteredBlocks.size();
+      if (droppedBlocks > 0) {
+        LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass 
filter predicate");
       }
+    } else {
+      filteredBlocks = splitGroup;
+    }
 
-      if (HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
-        skipTimestampConversion = 
!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
-      }
-      skipProlepticConversion = DataWritableReadSupport
-          .getWriterDateProleptic(fileMetaData.getKeyValueMetaData());
-      if (skipProlepticConversion == null) {
-        skipProlepticConversion = HiveConf.getBoolVar(
-            conf, 
HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
-      }
+    if (HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
+      skipTimestampConversion = 
!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
+    }
+    skipProlepticConversion = DataWritableReadSupport
+        .getWriterDateProleptic(fileMetaData.getKeyValueMetaData());
+    if (skipProlepticConversion == null) {
+      skipProlepticConversion = HiveConf.getBoolVar(
+          conf, 
HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
+    }
       legacyConversionEnabled =
           
DataWritableReadSupport.getZoneConversionLegacy(fileMetaData.getKeyValueMetaData(),
 conf);
 
-      split = new ParquetInputSplit(finalPath,
-        splitStart,
-        splitLength,
-        oldSplit.getLocations(),
-        filtedBlocks,
-        readContext.getRequestedSchema().toString(),
-        fileMetaData.getSchema().toString(),
-        fileMetaData.getKeyValueMetaData(),
-        readContext.getReadSupportMetadata());
-      return split;
-    } else {
-      throw new IllegalArgumentException("Unknown split type: " + oldSplit);
-    }
+    split = new ParquetInputSplit(finalPath,
+      splitStart,
+      splitLength,
+      fileSplit.getLocations(), filteredBlocks,

Review Comment:
   fixed





Issue Time Tracking
-------------------

    Worklog Id:     (was: 783379)
    Time Spent: 1h 10m  (was: 1h)

> Parquet file footer is read multiple times, when multiple splits are created 
> in same file
> -----------------------------------------------------------------------------------------
>
>                 Key: HIVE-25827
>                 URL: https://issues.apache.org/jira/browse/HIVE-25827
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Ádám Szita
>            Priority: Major
>              Labels: performance, pull-request-available
>         Attachments: image-2021-12-21-03-19-38-577.png
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> With large files, it is possible that multiple splits are created in the same 
> file. With current codebase, "ParquetRecordReaderBase" ends up reading file 
> footer for each split. 
> It can be optimized not to read footer information multiple times for the 
> same file.
>  
> [https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java#L160]
>  
> [https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java#L91]
>  
>  
> !image-2021-12-21-03-19-38-577.png|width=1363,height=1256!
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to