shardulm94 commented on a change in pull request #1566:
URL: https://github.com/apache/iceberg/pull/1566#discussion_r505691518
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
##########
@@ -38,9 +43,71 @@
class ParquetFilters {
+ private static final Set<Operation> SUPPORTED_OPS = ImmutableSet.of(
+ Operation.IS_NULL,
+ Operation.NOT_NULL,
+ Operation.EQ,
+ Operation.NOT_EQ,
+ Operation.GT,
+ Operation.GT_EQ,
+ Operation.LT,
+ Operation.LT_EQ);
+
+ private static final Set<Type.TypeID> SUPPORTED_TYPES = ImmutableSet.of(
Review comment:
Any reason why other types like `TIMESTAMP`, `STRING` and other binary
columns are not supported? Seems like `ParquetFilters` is able to convert
expressions for these types too.
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
##########
@@ -582,6 +582,13 @@ public ReadBuilder withNameMapping(NameMapping
newNameMapping) {
optionsBuilder = ParquetReadOptions.builder();
}
+ if (filter != null &&
+ schema.getAliases() != null &&
Review comment:
What is the rationale behind `schema.getAliases() != null` check?
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
##########
@@ -582,6 +582,13 @@ public ReadBuilder withNameMapping(NameMapping
newNameMapping) {
optionsBuilder = ParquetReadOptions.builder();
}
+ if (filter != null &&
+ schema.getAliases() != null &&
+ ParquetFilters.isSupportedFilter(filter, schema,
caseSensitive)) {
+ optionsBuilder.useRecordFilter(filterRecords);
+ optionsBuilder.withRecordFilter(ParquetFilters.convert(schema,
filter, caseSensitive));
Review comment:
I think the first argument to `ParquetFilters.convert` should be the
file schema and not the table schema. Since Iceberg allows column renaming, a
column in the table schema maybe named differently in the file. If you look at
https://github.com/apache/iceberg/blob/a238a90eb987bfbf5d14b5cab8d109e53a75e861/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java#L120
it will use field ids to get the corresponding column name from the provided
schema (file). You can also see this pattern at
https://github.com/apache/iceberg/blob/a238a90eb987bfbf5d14b5cab8d109e53a75e861/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java#L628
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
##########
@@ -38,9 +43,71 @@
class ParquetFilters {
+ private static final Set<Operation> SUPPORTED_OPS = ImmutableSet.of(
+ Operation.IS_NULL,
+ Operation.NOT_NULL,
+ Operation.EQ,
+ Operation.NOT_EQ,
+ Operation.GT,
+ Operation.GT_EQ,
+ Operation.LT,
+ Operation.LT_EQ);
+
+ private static final Set<Type.TypeID> SUPPORTED_TYPES = ImmutableSet.of(
+ Type.TypeID.BOOLEAN,
+ Type.TypeID.INTEGER,
+ Type.TypeID.LONG,
+ Type.TypeID.FLOAT,
+ Type.TypeID.DOUBLE,
+ Type.TypeID.DATE,
+ Type.TypeID.TIME
+ );
+
private ParquetFilters() {
}
+ public static boolean isSupportedFilter(Expression expr, Schema schema,
boolean caseSensitive) {
Review comment:
Rather than introducing more code to check if the filter is supported,
can we just eagerly try to convert filter and use the exception as a signal
that the filter is unsupported?
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
##########
@@ -130,13 +139,23 @@ private void advance() {
PageReadStore pages;
try {
- pages = reader.readNextRowGroup();
+ // Because of the issue of PARQUET-1901, we cannot blindly call
readNextFilteredRowGroup()
+ if (hasRecordFilter) {
+ pages = reader.readNextFilteredRowGroup();
Review comment:
Iceberg is evaluating filters for row groups itself and skipping them
(just a few lines before this), how does this change improve upon what Iceberg
already does? (I am not too familiar with Parquet, so I may be totally missing
the point)
##########
File path: parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
##########
@@ -130,13 +139,23 @@ private void advance() {
PageReadStore pages;
try {
- pages = reader.readNextRowGroup();
+ // Because of the issue of PARQUET-1901, we cannot blindly call
readNextFilteredRowGroup()
+ if (hasRecordFilter) {
+ pages = reader.readNextFilteredRowGroup();
+ } else {
+ pages = reader.readNextRowGroup();
+ }
} catch (IOException e) {
throw new RuntimeIOException(e);
}
+ long blockRowCount = blocks.get(nextRowGroup).getRowCount();
+ Preconditions.checkState(blockRowCount >= pages.getRowCount(),
+ "Number of values in the block, %s, does not great or equal
number of values after filtering, %s",
+ blockRowCount, pages.getRowCount());
long rowPosition = rowGroupsStartRowPos[nextRowGroup];
Review comment:
I am not sure if the `nextRowGroup` counter and consequently
`rowPosition` will be accurate in case of skipped row groups. Can you add a
test to `TestSparkParquetReadMetadataColumns` which would invoke page skipping?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]