szlta commented on code in PR #3368:
URL: https://github.com/apache/hive/pull/3368#discussion_r902603055
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java:
##########
@@ -138,32 +130,53 @@ public class VectorizedParquetRecordReader extends
ParquetRecordReaderBase
private ZoneId writerTimezone;
private final BucketIdentifier bucketIdentifier;
- public VectorizedParquetRecordReader(
- org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf) {
+ // LLAP cache integration
+ // TODO: also support fileKey in splits, like OrcSplit does
+ private Object cacheKey = null;
+ private CacheTag cacheTag = null;
+
+ public VectorizedParquetRecordReader(InputSplit oldInputSplit, JobConf conf)
throws IOException {
this(oldInputSplit, conf, null, null, null);
}
public VectorizedParquetRecordReader(
- org.apache.hadoop.mapred.InputSplit oldInputSplit, JobConf conf,
- FileMetadataCache metadataCache, DataCache dataCache, Configuration
cacheConf) {
+ InputSplit oldInputSplit, JobConf conf,
+ FileMetadataCache metadataCache, DataCache dataCache, Configuration
cacheConf)
+ throws IOException {
+ super(conf, oldInputSplit);
try {
this.metadataCache = metadataCache;
this.cache = dataCache;
this.cacheConf = cacheConf;
- serDeStats = new SerDeStats();
- projectionPusher = new ProjectionPusher();
+
+ if (metadataCache != null) {
+ cacheKey = HdfsUtils.getFileId(filePath.getFileSystem(conf), filePath,
Review Comment:
Yes, there are a couple of places, we can address this in a separate
refactor which is a fileformat independent change.
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java:
##########
@@ -48,115 +49,133 @@
import java.util.List;
import java.util.Map;
-public class ParquetRecordReaderBase {
+public abstract class ParquetRecordReaderBase {
public static final Logger LOG =
LoggerFactory.getLogger(ParquetRecordReaderBase.class);
- protected Path file;
+ protected final FileSplit fileSplit;
+ protected Path filePath;
+ protected ParquetInputSplit parquetInputSplit;
+ protected ParquetMetadata parquetMetadata;
protected ProjectionPusher projectionPusher;
protected boolean skipTimestampConversion = false;
protected Boolean skipProlepticConversion;
protected Boolean legacyConversionEnabled;
protected SerDeStats serDeStats;
- protected JobConf jobConf;
+ protected final JobConf jobConf;
protected int schemaSize;
- protected List<BlockMetaData> filtedBlocks;
+ protected List<BlockMetaData> filteredBlocks;
protected ParquetFileReader reader;
+ protected ParquetRecordReaderBase(JobConf conf, InputSplit oldSplit) throws
IOException {
+ serDeStats = new SerDeStats();
+ projectionPusher = new ProjectionPusher();
+
+ if (!(oldSplit instanceof FileSplit)) {
+ throw new IllegalArgumentException("Unknown split type: " + oldSplit);
+ }
+ this.fileSplit = (FileSplit) oldSplit;
+ this.jobConf = projectionPusher.pushProjectionsAndFilters(conf,
fileSplit.getPath().getParent());
+ this.filePath = fileSplit.getPath();
+ }
+
+ protected void setupMetadataAndParquetSplit(JobConf conf) throws IOException
{
+ // In the case of stat tasks a dummy split is created with -1 length but
real path...
+ if (fileSplit.getLength() != 0) {
+ parquetMetadata = getParquetMetadata(filePath, conf);
+ parquetInputSplit = getSplit(conf);
+ }
+ // having null as parquetInputSplit seems to be a valid case based on this
file's history
+ }
+
/**
* gets a ParquetInputSplit corresponding to a split given by Hive
*
- * @param oldSplit The split given by Hive
* @param conf The JobConf of the Hive job
* @return a ParquetInputSplit corresponding to the oldSplit
* @throws IOException if the config cannot be enhanced or if the footer
cannot be read from the file
*/
@SuppressWarnings("deprecation")
protected ParquetInputSplit getSplit(
- final org.apache.hadoop.mapred.InputSplit oldSplit,
final JobConf conf
) throws IOException {
- if (oldSplit.getLength() == 0) {
- return null;
- }
+
ParquetInputSplit split;
- if (oldSplit instanceof FileSplit) {
- final Path finalPath = ((FileSplit) oldSplit).getPath();
- jobConf = projectionPusher.pushProjectionsAndFilters(conf,
finalPath.getParent());
-
- // TODO enable MetadataFilter by using readFooter(Configuration
configuration, Path file,
- // MetadataFilter filter) API
- final ParquetMetadata parquetMetadata =
ParquetFileReader.readFooter(jobConf, finalPath);
- final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
- final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
-
- final ReadSupport.ReadContext
- readContext = new DataWritableReadSupport().init(new
InitContext(jobConf,
- null, fileMetaData.getSchema()));
-
- // Compute stats
- for (BlockMetaData bmd : blocks) {
- serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
- serDeStats.setRawDataSize(serDeStats.getRawDataSize() +
bmd.getTotalByteSize());
- }
+ final Path finalPath = fileSplit.getPath();
Review Comment:
Discussed offline.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]