JingsongLi commented on code in PR #280:
URL: https://github.com/apache/flink-table-store/pull/280#discussion_r959231586
##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreRead.java:
##########
@@ -87,34 +108,78 @@ public KeyValueFileStoreRead withValueProjection(int[][]
projectedFields) {
@Override
public FileStoreRead<KeyValue> withFilter(Predicate predicate) {
- this.filters = splitAnd(predicate);
+ allFilters = new ArrayList<>();
+ List<String> primaryKeys = tableSchema.trimmedPrimaryKeys();
+ Set<String> nonPrimaryKeys =
+ tableSchema.fieldNames().stream()
+ .filter(name -> !primaryKeys.contains(name))
+ .collect(Collectors.toSet());
+ for (Predicate sub : splitAnd(predicate)) {
+ allFilters.add(sub);
+ if (!containsFields(sub, nonPrimaryKeys)) {
+ if (keyFilters == null) {
+ keyFilters = new ArrayList<>();
+ }
+ // TODO Actually, the index is wrong, but it is OK.
+ // The orc filter just use name instead of index.
+ keyFilters.add(sub);
+ }
+ }
return this;
}
@Override
public RecordReader<KeyValue> createReader(Split split) throws IOException
{
if (split.isIncremental()) {
- DataFileReader dataFileReader =
- dataFileReaderFactory.create(split.partition(),
split.bucket(), true, filters);
+ // incremental mode cannot push down value filters, because the
update for the same key
+ // may occur in the next split
+ DataFileReader dataFileReader = createDataFileReader(split, true,
false);
// Return the raw file contents without merging
List<ConcatRecordReader.ReaderSupplier<KeyValue>> suppliers = new
ArrayList<>();
for (DataFileMeta file : split.files()) {
- suppliers.add(
- () ->
dataFileReader.read(changelogFile(file).orElse(file.fileName())));
+ if (acceptFilter(false).test(file)) {
+ suppliers.add(
+ () ->
dataFileReader.read(changelogFile(file).orElse(file.fileName())));
+ }
}
return ConcatRecordReader.create(suppliers);
} else {
// in this case merge tree should merge records with same key
// Do not project key in MergeTreeReader.
- DataFileReader dataFileReader =
- dataFileReaderFactory.create(split.partition(),
split.bucket(), false, filters);
- MergeTreeReader reader =
- new MergeTreeReader(
- new IntervalPartition(split.files(),
keyComparator).partition(),
- true,
- dataFileReader,
- keyComparator,
- mergeFunction.copy());
+ List<List<SortedRun>> sections =
+ new IntervalPartition(split.files(),
keyComparator).partition();
+ DataFileReader dataFileReaderWithAllFilters =
createDataFileReader(split, false, true);
+ DataFileReader dataFileReaderWithKeyFilters =
createDataFileReader(split, false, false);
+ MergeFunction mergeFunc = mergeFunction.copy();
+ List<ConcatRecordReader.ReaderSupplier<KeyValue>> readers = new
ArrayList<>();
+ for (List<SortedRun> section : sections) {
+ // if key ranges do not have overlap, value filter can be
pushed down as well
+ boolean acceptAll = section.size() == 1;
+ List<SortedRun> hitSection = new ArrayList<>();
+ for (SortedRun run : section) {
+ List<DataFileMeta> hitFiles = new ArrayList<>();
+ for (DataFileMeta file : run.files()) {
Review Comment:
Maybe use `Stream.filter` is better to read.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]