This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 049a5ae27eb22ef31fc1e8353726873af5f8f2f4
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Mon Dec 30 10:21:57 2024 +0800

    [parquet] Support using file index result to filter row ranges (#4780)
---
 .../org/apache/paimon/utils/RoaringBitmap32.java   |   8 +
 .../paimon/table/AppendOnlyFileStoreTableTest.java |  93 ++++++
 .../apache/parquet/hadoop/ParquetFileReader.java   |   3 +-
 .../filter2/columnindex/ColumnIndexFilter.java     | 258 ++++++++++++++
 .../internal/filter2/columnindex/RowRanges.java    | 371 +++++++++++++++++++++
 5 files changed, 732 insertions(+), 1 deletion(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index 6496b7003e..d3b07a2362 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -72,6 +72,10 @@ public class RoaringBitmap32 {
         return roaringBitmap.rangeCardinality(start, end);
     }
 
+    public int first() {
+        return roaringBitmap.first();
+    }
+
     public int last() {
         return roaringBitmap.last();
     }
@@ -138,6 +142,10 @@ public class RoaringBitmap32 {
         return roaringBitmap32;
     }
 
+    public static RoaringBitmap32 bitmapOfRange(long min, long max) {
+        return new RoaringBitmap32(RoaringBitmap.bitmapOfRange(min, max));
+    }
+
     public static RoaringBitmap32 and(final RoaringBitmap32 x1, final 
RoaringBitmap32 x2) {
         return new RoaringBitmap32(RoaringBitmap.and(x1.roaringBitmap, 
x2.roaringBitmap));
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 471b60d3cf..dd85bf8bcf 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -58,6 +58,7 @@ import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
+import org.apache.parquet.hadoop.ParquetOutputFormat;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -70,6 +71,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
@@ -79,8 +81,11 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
 import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
 import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
 import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
+import static org.apache.paimon.CoreOptions.WRITE_ONLY;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket;
 import static 
org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
@@ -722,6 +727,94 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
                 });
     }
 
+    @Test
+    public void testBitmapIndexResultFilterParquetRowRanges() throws Exception 
{
+        RowType rowType =
+                RowType.builder()
+                        .field("id", DataTypes.INT())
+                        .field("event", DataTypes.STRING())
+                        .field("price", DataTypes.INT())
+                        .build();
+        // in unaware-bucket mode, we split files into splits all the time
+        FileStoreTable table =
+                createUnawareBucketFileStoreTable(
+                        rowType,
+                        options -> {
+                            options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
+                            options.set(WRITE_ONLY, true);
+                            options.set(
+                                    FileIndexOptions.FILE_INDEX
+                                            + "."
+                                            + 
BitSliceIndexBitmapFileIndexFactory.BSI_INDEX
+                                            + "."
+                                            + CoreOptions.COLUMNS,
+                                    "price");
+                            options.set(
+                                    
ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
+                            
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
+                        });
+
+        int bound = 3000;
+        Random random = new Random();
+        Map<Integer, Integer> expectedMap = new HashMap<>();
+        for (int i = 0; i < 5; i++) {
+            StreamTableWrite write = table.newWrite(commitUser);
+            StreamTableCommit commit = table.newCommit(commitUser);
+            for (int j = 0; j < 10000; j++) {
+                int next = random.nextInt(bound);
+                expectedMap.compute(next, (key, value) -> value == null ? 1 : 
value + 1);
+                write.write(GenericRow.of(1, BinaryString.fromString("A"), 
next));
+            }
+            commit.commit(i, write.prepareCommit(true, i));
+            write.close();
+            commit.close();
+        }
+
+        // test eq
+        for (int i = 0; i < 10; i++) {
+            int key = random.nextInt(bound);
+            Predicate predicate = new PredicateBuilder(rowType).equal(2, key);
+            TableScan.Plan plan = table.newScan().plan();
+            RecordReader<InternalRow> reader =
+                    
table.newRead().withFilter(predicate).createReader(plan.splits());
+            AtomicInteger cnt = new AtomicInteger(0);
+            reader.forEachRemaining(
+                    row -> {
+                        cnt.incrementAndGet();
+                        assertThat(row.getInt(2)).isEqualTo(key);
+                    });
+            assertThat(cnt.get()).isEqualTo(expectedMap.getOrDefault(key, 0));
+            reader.close();
+        }
+
+        //  test between
+        for (int i = 0; i < 10; i++) {
+            int max = random.nextInt(bound);
+            int min = random.nextInt(max);
+            Predicate predicate =
+                    PredicateBuilder.and(
+                            new PredicateBuilder(rowType).greaterOrEqual(2, 
min),
+                            new PredicateBuilder(rowType).lessOrEqual(2, max));
+            TableScan.Plan plan = table.newScan().plan();
+            RecordReader<InternalRow> reader =
+                    
table.newRead().withFilter(predicate).createReader(plan.splits());
+            AtomicInteger cnt = new AtomicInteger(0);
+            reader.forEachRemaining(
+                    row -> {
+                        cnt.addAndGet(1);
+                        assertThat(row.getInt(2)).isGreaterThanOrEqualTo(min);
+                        assertThat(row.getInt(2)).isLessThanOrEqualTo(max);
+                    });
+            Optional<Integer> reduce =
+                    expectedMap.entrySet().stream()
+                            .filter(x -> x.getKey() >= min && x.getKey() <= 
max)
+                            .map(Map.Entry::getValue)
+                            .reduce(Integer::sum);
+            assertThat(cnt.get()).isEqualTo(reduce.orElse(0));
+            reader.close();
+        }
+    }
+
     @Test
     public void testWithShardAppendTable() throws Exception {
         FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, 
-1));
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index e3fc118ad6..e9f757126a 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -760,7 +760,8 @@ public class ParquetFileReader implements Closeable {
                             options.getRecordFilter(),
                             getColumnIndexStore(blockIndex),
                             paths.keySet(),
-                            blocks.get(blockIndex).getRowCount());
+                            blocks.get(blockIndex).getRowCount(),
+                            fileIndexResult);
             blockRowRanges.set(blockIndex, rowRanges);
         }
         return rowRanges;
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
new file mode 100644
index 0000000000..b2c9365bd6
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.internal.filter2.columnindex;
+
+import org.apache.paimon.fileindex.FileIndexResult;
+
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
+import 
org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.apache.parquet.filter2.predicate.Operators.LtEq;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import 
org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore.MissingOffsetIndexException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.PrimitiveIterator;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Filter implementation based on column indexes. No filtering will be applied 
for columns where no
+ * column index is available. Offset index is required for all the columns in 
the projection,
+ * therefore a {@link MissingOffsetIndexException} will be thrown from any 
{@code visit} methods if
+ * any of the required offset indexes is missing.
+ *
+ * <p>Note: The class was copied over to support using {@link FileIndexResult} 
to filter {@link
+ * RowRanges}.
+ */
+public class ColumnIndexFilter implements Visitor<RowRanges> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ColumnIndexFilter.class);
+    private final ColumnIndexStore columnIndexStore;
+    private final Set<ColumnPath> columns;
+    private final long rowCount;
+    @Nullable private final FileIndexResult fileIndexResult;
+    private RowRanges allRows;
+
+    /**
+     * Calculates the row ranges containing the indexes of the rows might 
match the specified
+     * filter.
+     *
+     * @param filter to be used for filtering the rows
+     * @param columnIndexStore the store for providing column/offset indexes
+     * @param paths the paths of the columns used in the actual projection; a 
column not being part
+     *     of the projection will be handled as containing {@code null} values 
only even if the
+     *     column has values written in the file
+     * @param fileIndexResult the file index result; it will use to filter row 
ranges
+     * @param rowCount the total number of rows in the row-group
+     * @return the ranges of the possible matching row indexes; the returned 
ranges will contain all
+     *     the rows if any of the required offset index is missing
+     */
+    public static RowRanges calculateRowRanges(
+            FilterCompat.Filter filter,
+            ColumnIndexStore columnIndexStore,
+            Set<ColumnPath> paths,
+            long rowCount,
+            @Nullable FileIndexResult fileIndexResult) {
+        return filter.accept(
+                new FilterCompat.Visitor<RowRanges>() {
+                    @Override
+                    public RowRanges visit(FilterPredicateCompat 
filterPredicateCompat) {
+                        try {
+                            return filterPredicateCompat
+                                    .getFilterPredicate()
+                                    .accept(
+                                            new ColumnIndexFilter(
+                                                    columnIndexStore,
+                                                    paths,
+                                                    rowCount,
+                                                    fileIndexResult));
+                        } catch (MissingOffsetIndexException e) {
+                            LOGGER.info(e.getMessage());
+                            return RowRanges.createSingle(rowCount);
+                        }
+                    }
+
+                    @Override
+                    public RowRanges visit(UnboundRecordFilterCompat 
unboundRecordFilterCompat) {
+                        return RowRanges.createSingle(rowCount);
+                    }
+
+                    @Override
+                    public RowRanges visit(NoOpFilter noOpFilter) {
+                        return RowRanges.createSingle(rowCount);
+                    }
+                });
+    }
+
+    private ColumnIndexFilter(
+            ColumnIndexStore columnIndexStore,
+            Set<ColumnPath> paths,
+            long rowCount,
+            @Nullable FileIndexResult fileIndexResult) {
+        this.columnIndexStore = columnIndexStore;
+        this.columns = paths;
+        this.rowCount = rowCount;
+        this.fileIndexResult = fileIndexResult;
+    }
+
+    private RowRanges allRows() {
+        if (allRows == null) {
+            allRows = RowRanges.createSingle(rowCount);
+        }
+        return allRows;
+    }
+
+    @Override
+    public <T extends Comparable<T>> RowRanges visit(Eq<T> eq) {
+        return applyPredicate(
+                eq.getColumn(),
+                ci -> ci.visit(eq),
+                eq.getValue() == null ? allRows() : RowRanges.EMPTY);
+    }
+
+    @Override
+    public <T extends Comparable<T>> RowRanges visit(NotEq<T> notEq) {
+        return applyPredicate(
+                notEq.getColumn(),
+                ci -> ci.visit(notEq),
+                notEq.getValue() == null ? RowRanges.EMPTY : allRows());
+    }
+
+    @Override
+    public <T extends Comparable<T>> RowRanges visit(Lt<T> lt) {
+        return applyPredicate(lt.getColumn(), ci -> ci.visit(lt), 
RowRanges.EMPTY);
+    }
+
+    @Override
+    public <T extends Comparable<T>> RowRanges visit(LtEq<T> ltEq) {
+        return applyPredicate(ltEq.getColumn(), ci -> ci.visit(ltEq), 
RowRanges.EMPTY);
+    }
+
+    @Override
+    public <T extends Comparable<T>> RowRanges visit(Gt<T> gt) {
+        return applyPredicate(gt.getColumn(), ci -> ci.visit(gt), 
RowRanges.EMPTY);
+    }
+
+    @Override
+    public <T extends Comparable<T>> RowRanges visit(GtEq<T> gtEq) {
+        return applyPredicate(gtEq.getColumn(), ci -> ci.visit(gtEq), 
RowRanges.EMPTY);
+    }
+
+    @Override
+    public <T extends Comparable<T>> RowRanges visit(Operators.In<T> in) {
+        boolean isNull = in.getValues().contains(null);
+        return applyPredicate(
+                in.getColumn(), ci -> ci.visit(in), isNull ? allRows() : 
RowRanges.EMPTY);
+    }
+
+    @Override
+    public <T extends Comparable<T>> RowRanges visit(Operators.NotIn<T> notIn) 
{
+        boolean isNull = notIn.getValues().contains(null);
+        return applyPredicate(
+                notIn.getColumn(), ci -> ci.visit(notIn), isNull ? 
RowRanges.EMPTY : allRows());
+    }
+
+    @Override
+    public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> 
RowRanges visit(
+            UserDefined<T, U> udp) {
+        return applyPredicate(
+                udp.getColumn(),
+                ci -> ci.visit(udp),
+                udp.getUserDefinedPredicate().acceptsNullValue() ? allRows() : 
RowRanges.EMPTY);
+    }
+
+    @Override
+    public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> 
RowRanges visit(
+            LogicalNotUserDefined<T, U> udp) {
+        return applyPredicate(
+                udp.getUserDefined().getColumn(),
+                ci -> ci.visit(udp),
+                
udp.getUserDefined().getUserDefinedPredicate().acceptsNullValue()
+                        ? RowRanges.EMPTY
+                        : allRows());
+    }
+
+    private RowRanges applyPredicate(
+            Column<?> column,
+            Function<ColumnIndex, PrimitiveIterator.OfInt> func,
+            RowRanges rangesForMissingColumns) {
+        ColumnPath columnPath = column.getColumnPath();
+        if (!columns.contains(columnPath)) {
+            return rangesForMissingColumns;
+        }
+
+        OffsetIndex oi = columnIndexStore.getOffsetIndex(columnPath);
+        ColumnIndex ci = columnIndexStore.getColumnIndex(columnPath);
+        if (ci == null) {
+            LOGGER.info(
+                    "No column index for column {} is available; Unable to 
filter on this column",
+                    columnPath);
+            return allRows();
+        }
+
+        return RowRanges.create(rowCount, func.apply(ci), oi, fileIndexResult);
+    }
+
+    @Override
+    public RowRanges visit(And and) {
+        RowRanges leftResult = and.getLeft().accept(this);
+        if (leftResult.getRanges().size() == 0) {
+            return leftResult;
+        }
+
+        return RowRanges.intersection(leftResult, and.getRight().accept(this));
+    }
+
+    @Override
+    public RowRanges visit(Or or) {
+        RowRanges leftResult = or.getLeft().accept(this);
+        if (leftResult.getRanges().size() == 1 && leftResult.rowCount() == 
rowCount) {
+            return leftResult;
+        }
+
+        return RowRanges.union(leftResult, or.getRight().accept(this));
+    }
+
+    @Override
+    public RowRanges visit(Not not) {
+        throw new IllegalArgumentException(
+                "Predicates containing a NOT must be run through 
LogicalInverseRewriter. " + not);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
new file mode 100644
index 0000000000..6963814831
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.internal.filter2.columnindex;
+
+import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PrimitiveIterator;
+import java.util.Set;
+
+/**
+ * Class representing row ranges in a row-group. These row ranges are 
calculated as a result of the
+ * column index based filtering. To be used iterate over the matching row 
indexes to be read from a
+ * row-group, retrieve the count of the matching rows or check overlapping of 
a row index range.
+ *
+ * <p>Note: The class was copied over to support using {@link FileIndexResult} 
to filter {@link
+ * RowRanges}. Added a new method {@link RowRanges#create(long, 
PrimitiveIterator.OfInt,
+ * OffsetIndex, FileIndexResult)}
+ *
+ * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, 
long, FileIndexResult)
+ */
+public class RowRanges {
+
+    /** Make it public because some uppler layer application need to access 
it. */
+    public static class Range {
+
+        // Returns the union of the two ranges or null if there are elements 
between them.
+        private static Range union(Range left, Range right) {
+            if (left.from <= right.from) {
+                if (left.to + 1 >= right.from) {
+                    return new Range(left.from, Math.max(left.to, right.to));
+                }
+            } else if (right.to + 1 >= left.from) {
+                return new Range(right.from, Math.max(left.to, right.to));
+            }
+            return null;
+        }
+
+        // Returns the intersection of the two ranges of null if they are not 
overlapped.
+        private static Range intersection(Range left, Range right) {
+            if (left.from <= right.from) {
+                if (left.to >= right.from) {
+                    return new Range(right.from, Math.min(left.to, right.to));
+                }
+            } else if (right.to >= left.from) {
+                return new Range(left.from, Math.min(left.to, right.to));
+            }
+            return null;
+        }
+
+        public final long from;
+        public final long to;
+
+        // Creates a range of [from, to] (from and to are inclusive; empty 
ranges are not valid)
+        Range(long from, long to) {
+            assert from <= to;
+            this.from = from;
+            this.to = to;
+        }
+
+        long count() {
+            return to - from + 1;
+        }
+
+        boolean isBefore(Range other) {
+            return to < other.from;
+        }
+
+        boolean isAfter(Range other) {
+            return from > other.to;
+        }
+
+        @Override
+        public String toString() {
+            return "[" + from + ", " + to + ']';
+        }
+    }
+
+    public static final RowRanges EMPTY = new 
RowRanges(Collections.emptyList());
+
+    private final List<Range> ranges;
+
+    private RowRanges() {
+        this(new ArrayList<>());
+    }
+
+    private RowRanges(Range range) {
+        this(Collections.singletonList(range));
+    }
+
+    private RowRanges(List<Range> ranges) {
+        this.ranges = ranges;
+    }
+
+    /**
+     * Creates an immutable RowRanges object with the single range [0, 
rowCount - 1].
+     *
+     * @param rowCount a single row count
+     * @return an immutable RowRanges
+     */
+    public static RowRanges createSingle(long rowCount) {
+        return new RowRanges(new Range(0L, rowCount - 1L));
+    }
+
+    /**
+     * Creates a mutable RowRanges object with the following ranges:
+     *
+     * <pre>
+     * [firstRowIndex[0], lastRowIndex[0]],
+     * [firstRowIndex[1], lastRowIndex[1]],
+     * ...,
+     * [firstRowIndex[n], lastRowIndex[n]]
+     * </pre>
+     *
+     * <p>(See OffsetIndex.getFirstRowIndex and OffsetIndex.getLastRowIndex 
for details.)
+     *
+     * <p>The union of the ranges are calculated so the result ranges always 
contain the disjunct
+     * ranges. See union for details.
+     *
+     * @param rowCount row count
+     * @param pageIndexes pageIndexes
+     * @param offsetIndex offsetIndex
+     * @return a mutable RowRanges
+     */
+    public static RowRanges create(
+            long rowCount, PrimitiveIterator.OfInt pageIndexes, OffsetIndex 
offsetIndex) {
+        RowRanges ranges = new RowRanges();
+        while (pageIndexes.hasNext()) {
+            int pageIndex = pageIndexes.nextInt();
+            ranges.add(
+                    new Range(
+                            offsetIndex.getFirstRowIndex(pageIndex),
+                            offsetIndex.getLastRowIndex(pageIndex, rowCount)));
+        }
+        return ranges;
+    }
+
+    /** Support using {@link FileIndexResult} to filter the row ranges. */
+    public static RowRanges create(
+            long rowCount,
+            PrimitiveIterator.OfInt pageIndexes,
+            OffsetIndex offsetIndex,
+            @Nullable FileIndexResult fileIndexResult) {
+        RowRanges ranges = new RowRanges();
+        while (pageIndexes.hasNext()) {
+            int pageIndex = pageIndexes.nextInt();
+            long firstRowIndex = offsetIndex.getFirstRowIndex(pageIndex);
+            long lastRowIndex = offsetIndex.getLastRowIndex(pageIndex, 
rowCount);
+
+            // using file index result to filter or narrow the row ranges
+            if (fileIndexResult instanceof BitmapIndexResult) {
+                RoaringBitmap32 bitmap = ((BitmapIndexResult) 
fileIndexResult).get();
+                RoaringBitmap32 range =
+                        RoaringBitmap32.bitmapOfRange(firstRowIndex, 
lastRowIndex + 1);
+                RoaringBitmap32 result = RoaringBitmap32.and(bitmap, range);
+                if (result.isEmpty()) {
+                    continue;
+                }
+                firstRowIndex = result.first();
+                lastRowIndex = result.last();
+            }
+
+            ranges.add(new Range(firstRowIndex, lastRowIndex));
+        }
+        return ranges;
+    }
+
+    /**
+     * Calculates the union of the two specified RowRanges object. The union 
of two range is
+     * calculated if there are no elements between them. Otherwise, the two 
disjunct ranges are
+     * stored separately.
+     *
+     * <pre>
+     * For example:
+     * [113, 241] ∪ [221, 340] = [113, 340]
+     * [113, 230] ∪ [231, 340] = [113, 340]
+     * while
+     * [113, 230] ∪ [232, 340] = [113, 230], [232, 340]
+     * </pre>
+     *
+     * <p>The result RowRanges object will contain all the row indexes that 
were contained in one of
+     * the specified objects.
+     *
+     * @param left left RowRanges
+     * @param right right RowRanges
+     * @return a mutable RowRanges contains all the row indexes that were 
contained in one of the
+     *     specified objects
+     */
+    public static RowRanges union(RowRanges left, RowRanges right) {
+        RowRanges result = new RowRanges();
+        Iterator<Range> it1 = left.ranges.iterator();
+        Iterator<Range> it2 = right.ranges.iterator();
+        if (it2.hasNext()) {
+            Range range2 = it2.next();
+            while (it1.hasNext()) {
+                Range range1 = it1.next();
+                if (range1.isAfter(range2)) {
+                    result.add(range2);
+                    range2 = range1;
+                    Iterator<Range> tmp = it1;
+                    it1 = it2;
+                    it2 = tmp;
+                } else {
+                    result.add(range1);
+                }
+            }
+            result.add(range2);
+        } else {
+            it2 = it1;
+        }
+        while (it2.hasNext()) {
+            result.add(it2.next());
+        }
+
+        return result;
+    }
+
+    /**
+     * Calculates the intersection of the two specified RowRanges object. Two 
ranges intersect if
+     * they have common elements otherwise the result is empty.
+     *
+     * <pre>
+     * For example:
+     * [113, 241] ∩ [221, 340] = [221, 241]
+     * while
+     * [113, 230] ∩ [231, 340] = &lt;EMPTY&gt;
+     * </pre>
+     *
+     * @param left left RowRanges
+     * @param right right RowRanges
+     * @return a mutable RowRanges contains all the row indexes that were 
contained in both of the
+     *     specified objects
+     */
+    public static RowRanges intersection(RowRanges left, RowRanges right) {
+        RowRanges result = new RowRanges();
+
+        int rightIndex = 0;
+        for (Range l : left.ranges) {
+            for (int i = rightIndex, n = right.ranges.size(); i < n; ++i) {
+                Range r = right.ranges.get(i);
+                if (l.isBefore(r)) {
+                    break;
+                } else if (l.isAfter(r)) {
+                    rightIndex = i + 1;
+                    continue;
+                }
+                result.add(Range.intersection(l, r));
+            }
+        }
+
+        return result;
+    }
+
+    /*
+     * Adds a range to the end of the list of ranges. It maintains the 
disjunct ascending order(*) of the ranges by
+     * trying to union the specified range to the last ranges in the list. The 
specified range shall be larger(*) than
+     * the last one or might be overlapped with some of the last ones.
+     * (*) [a, b] < [c, d] if b < c
+     */
+    private void add(Range range) {
+        Range rangeToAdd = range;
+        for (int i = ranges.size() - 1; i >= 0; --i) {
+            Range last = ranges.get(i);
+            assert !last.isAfter(range);
+            Range u = Range.union(last, rangeToAdd);
+            if (u == null) {
+                break;
+            }
+            rangeToAdd = u;
+            ranges.remove(i);
+        }
+        ranges.add(rangeToAdd);
+    }
+
+    /** @return the number of rows in the ranges */
+    public long rowCount() {
+        long cnt = 0;
+        for (Range range : ranges) {
+            cnt += range.count();
+        }
+        return cnt;
+    }
+
+    /** @return the ascending iterator of the row indexes contained in the 
ranges */
+    public PrimitiveIterator.OfLong iterator() {
+        return new PrimitiveIterator.OfLong() {
+            private int currentRangeIndex = -1;
+            private Range currentRange;
+            private long next = findNext();
+
+            private long findNext() {
+                if (currentRange == null || next + 1 > currentRange.to) {
+                    if (currentRangeIndex + 1 < ranges.size()) {
+                        currentRange = ranges.get(++currentRangeIndex);
+                        next = currentRange.from;
+                    } else {
+                        return -1;
+                    }
+                } else {
+                    ++next;
+                }
+                return next;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return next >= 0;
+            }
+
+            @Override
+            public long nextLong() {
+                long ret = next;
+                if (ret < 0) {
+                    throw new NoSuchElementException();
+                }
+                next = findNext();
+                return ret;
+            }
+        };
+    }
+
+    /**
+     * @param from the first row of the range to be checked for connection
+     * @param to the last row of the range to be checked for connection
+     * @return {@code true} if the specified range is overlapping (have common 
elements) with one of
+     *     the ranges
+     */
+    public boolean isOverlapping(long from, long to) {
+        return Collections.binarySearch(
+                        ranges,
+                        new Range(from, to),
+                        (r1, r2) -> r1.isBefore(r2) ? -1 : r1.isAfter(r2) ? 1 
: 0)
+                >= 0;
+    }
+
+    public List<Range> getRanges() {
+        return ranges;
+    }
+
+    @Override
+    public String toString() {
+        return ranges.toString();
+    }
+}

Reply via email to