pvary commented on code in PR #3368:
URL: https://github.com/apache/hive/pull/3368#discussion_r897941551
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/ParquetRecordReaderBase.java:
##########
@@ -48,115 +49,133 @@
import java.util.List;
import java.util.Map;
-public class ParquetRecordReaderBase {
+public abstract class ParquetRecordReaderBase {
public static final Logger LOG =
LoggerFactory.getLogger(ParquetRecordReaderBase.class);
- protected Path file;
+ protected final FileSplit fileSplit;
+ protected Path filePath;
+ protected ParquetInputSplit parquetInputSplit;
+ protected ParquetMetadata parquetMetadata;
protected ProjectionPusher projectionPusher;
protected boolean skipTimestampConversion = false;
protected Boolean skipProlepticConversion;
protected Boolean legacyConversionEnabled;
protected SerDeStats serDeStats;
- protected JobConf jobConf;
+ protected final JobConf jobConf;
protected int schemaSize;
- protected List<BlockMetaData> filtedBlocks;
+ protected List<BlockMetaData> filteredBlocks;
protected ParquetFileReader reader;
+ protected ParquetRecordReaderBase(JobConf conf, InputSplit oldSplit) throws
IOException {
+ serDeStats = new SerDeStats();
+ projectionPusher = new ProjectionPusher();
+
+ if (!(oldSplit instanceof FileSplit)) {
+ throw new IllegalArgumentException("Unknown split type: " + oldSplit);
+ }
+ this.fileSplit = (FileSplit) oldSplit;
+ this.jobConf = projectionPusher.pushProjectionsAndFilters(conf,
fileSplit.getPath().getParent());
+ this.filePath = fileSplit.getPath();
+ }
+
+ protected void setupMetadataAndParquetSplit(JobConf conf) throws IOException
{
+ // In the case of stat tasks a dummy split is created with -1 length but
real path...
+ if (fileSplit.getLength() != 0) {
+ parquetMetadata = getParquetMetadata(filePath, conf);
+ parquetInputSplit = getSplit(conf);
+ }
+ // having null as parquetInputSplit seems to be a valid case based on this
file's history
+ }
+
/**
* gets a ParquetInputSplit corresponding to a split given by Hive
*
- * @param oldSplit The split given by Hive
* @param conf The JobConf of the Hive job
* @return a ParquetInputSplit corresponding to the oldSplit
* @throws IOException if the config cannot be enhanced or if the footer
cannot be read from the file
*/
@SuppressWarnings("deprecation")
protected ParquetInputSplit getSplit(
- final org.apache.hadoop.mapred.InputSplit oldSplit,
final JobConf conf
) throws IOException {
- if (oldSplit.getLength() == 0) {
- return null;
- }
+
ParquetInputSplit split;
- if (oldSplit instanceof FileSplit) {
- final Path finalPath = ((FileSplit) oldSplit).getPath();
- jobConf = projectionPusher.pushProjectionsAndFilters(conf,
finalPath.getParent());
-
- // TODO enable MetadataFilter by using readFooter(Configuration
configuration, Path file,
- // MetadataFilter filter) API
- final ParquetMetadata parquetMetadata =
ParquetFileReader.readFooter(jobConf, finalPath);
- final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
- final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
-
- final ReadSupport.ReadContext
- readContext = new DataWritableReadSupport().init(new
InitContext(jobConf,
- null, fileMetaData.getSchema()));
-
- // Compute stats
- for (BlockMetaData bmd : blocks) {
- serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
- serDeStats.setRawDataSize(serDeStats.getRawDataSize() +
bmd.getTotalByteSize());
- }
+ final Path finalPath = fileSplit.getPath();
+
+ // TODO enable MetadataFilter by using readFooter(Configuration
configuration, Path file,
+ // MetadataFilter filter) API
+ final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
+ final FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
+
+ final ReadSupport.ReadContext
+ readContext = new DataWritableReadSupport().init(new InitContext(jobConf,
+ null, fileMetaData.getSchema()));
+
+ // Compute stats
+ for (BlockMetaData bmd : blocks) {
+ serDeStats.setRowCount(serDeStats.getRowCount() + bmd.getRowCount());
+ serDeStats.setRawDataSize(serDeStats.getRawDataSize() +
bmd.getTotalByteSize());
+ }
- schemaSize =
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
-
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
- final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
- final long splitStart = ((FileSplit) oldSplit).getStart();
- final long splitLength = ((FileSplit) oldSplit).getLength();
- for (final BlockMetaData block : blocks) {
- final long firstDataPage =
block.getColumns().get(0).getFirstDataPageOffset();
- if (firstDataPage >= splitStart && firstDataPage < splitStart +
splitLength) {
- splitGroup.add(block);
- }
+ schemaSize =
MessageTypeParser.parseMessageType(readContext.getReadSupportMetadata()
+
.get(DataWritableReadSupport.HIVE_TABLE_AS_PARQUET_SCHEMA)).getFieldCount();
+ final List<BlockMetaData> splitGroup = new ArrayList<BlockMetaData>();
+ final long splitStart = fileSplit.getStart();
+ final long splitLength = fileSplit.getLength();
+ for (final BlockMetaData block : blocks) {
+ final long firstDataPage =
block.getColumns().get(0).getFirstDataPageOffset();
+ if (firstDataPage >= splitStart && firstDataPage < splitStart +
splitLength) {
+ splitGroup.add(block);
}
- if (splitGroup.isEmpty()) {
- LOG.warn("Skipping split, could not find row group in: " + oldSplit);
+ }
+ if (splitGroup.isEmpty()) {
+ LOG.warn("Skipping split, could not find row group in: " + fileSplit);
+ return null;
+ }
+
+ FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema());
+ if (filter != null) {
+ filteredBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup,
fileMetaData.getSchema());
+ if (filteredBlocks.isEmpty()) {
+ LOG.debug("All row groups are dropped due to filter predicates");
return null;
}
- FilterCompat.Filter filter = setFilter(jobConf,
fileMetaData.getSchema());
- if (filter != null) {
- filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup,
fileMetaData.getSchema());
- if (filtedBlocks.isEmpty()) {
- LOG.debug("All row groups are dropped due to filter predicates");
- return null;
- }
-
- long droppedBlocks = splitGroup.size() - filtedBlocks.size();
- if (droppedBlocks > 0) {
- LOG.debug("Dropping " + droppedBlocks + " row groups that do not
pass filter predicate");
- }
- } else {
- filtedBlocks = splitGroup;
+ long droppedBlocks = splitGroup.size() - filteredBlocks.size();
+ if (droppedBlocks > 0) {
+ LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass
filter predicate");
}
+ } else {
+ filteredBlocks = splitGroup;
+ }
- if (HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
- skipTimestampConversion =
!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
- }
- skipProlepticConversion = DataWritableReadSupport
- .getWriterDateProleptic(fileMetaData.getKeyValueMetaData());
- if (skipProlepticConversion == null) {
- skipProlepticConversion = HiveConf.getBoolVar(
- conf,
HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
- }
+ if (HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) {
+ skipTimestampConversion =
!Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr");
+ }
+ skipProlepticConversion = DataWritableReadSupport
+ .getWriterDateProleptic(fileMetaData.getKeyValueMetaData());
+ if (skipProlepticConversion == null) {
+ skipProlepticConversion = HiveConf.getBoolVar(
+ conf,
HiveConf.ConfVars.HIVE_PARQUET_DATE_PROLEPTIC_GREGORIAN_DEFAULT);
+ }
legacyConversionEnabled =
DataWritableReadSupport.getZoneConversionLegacy(fileMetaData.getKeyValueMetaData(),
conf);
- split = new ParquetInputSplit(finalPath,
- splitStart,
- splitLength,
- oldSplit.getLocations(),
- filtedBlocks,
- readContext.getRequestedSchema().toString(),
- fileMetaData.getSchema().toString(),
- fileMetaData.getKeyValueMetaData(),
- readContext.getReadSupportMetadata());
- return split;
- } else {
- throw new IllegalArgumentException("Unknown split type: " + oldSplit);
- }
+ split = new ParquetInputSplit(finalPath,
+ splitStart,
+ splitLength,
+ fileSplit.getLocations(), filteredBlocks,
Review Comment:
nit: maybe this is a mistake, and you did not want to remove the `\n` here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]