This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c8c3d9af [core] Introduce PartitionPredicate to improve performance 
for multiple partitions (#1982)
4c8c3d9af is described below

commit 4c8c3d9afb748ac8d9e7b5bbd361189b2578bb10
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 13 17:52:59 2023 +0800

    [core] Introduce PartitionPredicate to improve performance for multiple 
partitions (#1982)
---
 .../paimon/operation/AbstractFileStoreScan.java    |  35 +++--
 .../paimon/partition/PartitionPredicate.java       | 133 +++++++++++++++++++
 .../paimon/partition/PartitionPredicateTest.java   | 142 +++++++++++++++++++++
 3 files changed, 291 insertions(+), 19 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 2ea41bd6b..f92456cf8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -27,8 +27,8 @@ import org.apache.paimon.manifest.ManifestEntrySerializer;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
@@ -38,7 +38,6 @@ import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.ParallellyExecuteUtils;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
@@ -58,7 +57,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkState;
 public abstract class AbstractFileStoreScan implements FileStoreScan {
 
     private final FieldStatsArraySerializer partitionStatsConverter;
-    private final RowDataToObjectArrayConverter partitionConverter;
+    private final RowType partitionType;
     private final SnapshotManager snapshotManager;
     private final ManifestFile.Factory manifestFileFactory;
     private final ManifestList manifestList;
@@ -69,7 +68,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private final SchemaManager schemaManager;
     protected final ScanBucketFilter bucketKeyFilter;
 
-    private Predicate partitionFilter;
+    private PartitionPredicate partitionFilter;
     private Snapshot specifiedSnapshot = null;
     private Filter<Integer> bucketFilter = null;
     private List<ManifestFileMeta> specifiedManifests = null;
@@ -90,7 +89,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             boolean checkNumOfBuckets,
             Integer scanManifestParallelism) {
         this.partitionStatsConverter = new 
FieldStatsArraySerializer(partitionType);
-        this.partitionConverter = new 
RowDataToObjectArrayConverter(partitionType);
+        this.partitionType = partitionType;
         this.bucketKeyFilter = bucketKeyFilter;
         this.snapshotManager = snapshotManager;
         this.schemaManager = schemaManager;
@@ -104,22 +103,22 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     @Override
     public FileStoreScan withPartitionFilter(Predicate predicate) {
-        this.partitionFilter = predicate;
+        if (partitionType.getFieldCount() > 0 && predicate != null) {
+            this.partitionFilter = 
PartitionPredicate.fromPredicate(partitionType, predicate);
+        } else {
+            this.partitionFilter = null;
+        }
         return this;
     }
 
     @Override
     public FileStoreScan withPartitionFilter(List<BinaryRow> partitions) {
-        List<Predicate> predicates =
-                partitions.stream()
-                        .filter(p -> p.getFieldCount() > 0)
-                        .map(partitionConverter::createEqualPredicate)
-                        .collect(Collectors.toList());
-        if (predicates.isEmpty()) {
-            return this;
+        if (partitionType.getFieldCount() > 0 && !partitions.isEmpty()) {
+            this.partitionFilter = 
PartitionPredicate.fromMultiple(partitionType, partitions);
         } else {
-            return withPartitionFilter(PredicateBuilder.or(predicates));
+            this.partitionFilter = null;
         }
+        return this;
     }
 
     @Override
@@ -251,10 +250,10 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) {
             if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
                 String partInfo =
-                        partitionConverter.getArity() > 0
+                        partitionType.getFieldCount() > 0
                                 ? "partition "
                                         + 
FileStorePathFactory.getPartitionComputer(
-                                                        
partitionConverter.rowType(),
+                                                        partitionType,
                                                         
FileStorePathFactory.PARTITION_DEFAULT_NAME
                                                                 
.defaultValue())
                                                 
.generatePartValues(file.partition())
@@ -355,9 +354,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         Function<InternalRow, Integer> totalBucketGetter =
                 ManifestEntrySerializer.totalBucketGetter();
         return row -> {
-            if ((partitionFilter != null
-                    && !partitionFilter.test(
-                            
partitionConverter.convert(partitionGetter.apply(row))))) {
+            if ((partitionFilter != null && 
!partitionFilter.test(partitionGetter.apply(row)))) {
                 return false;
             }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
new file mode 100644
index 000000000..687eac1fc
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** A special predicate to filter partition only, just like {@link Predicate}. 
*/
+public interface PartitionPredicate {
+
+    boolean test(BinaryRow part);
+
+    boolean test(long rowCount, FieldStats[] fieldStats);
+
+    static PartitionPredicate fromPredicate(RowType partitionType, Predicate 
predicate) {
+        return new DefaultPartitionPredicate(
+                new RowDataToObjectArrayConverter(partitionType), predicate);
+    }
+
+    static PartitionPredicate fromMultiple(RowType partitionType, 
List<BinaryRow> partitions) {
+        return new MultiplePartitionPredicate(
+                new RowDataToObjectArrayConverter(partitionType), new 
HashSet<>(partitions));
+    }
+
+    /** A {@link PartitionPredicate} using {@link Predicate}. */
+    class DefaultPartitionPredicate implements PartitionPredicate {
+
+        private final RowDataToObjectArrayConverter converter;
+        private final Predicate predicate;
+
+        private DefaultPartitionPredicate(
+                RowDataToObjectArrayConverter converter, Predicate predicate) {
+            this.converter = converter;
+            this.predicate = predicate;
+        }
+
+        @Override
+        public boolean test(BinaryRow part) {
+            return predicate.test(converter.convert(part));
+        }
+
+        @Override
+        public boolean test(long rowCount, FieldStats[] fieldStats) {
+            return predicate.test(rowCount, fieldStats);
+        }
+    }
+
+    /**
+     * A {@link PartitionPredicate} optimizing for multiple partitions. Its 
FieldStats filtering
+     * effect may not be as good as {@link DefaultPartitionPredicate}.
+     */
+    class MultiplePartitionPredicate implements PartitionPredicate {
+
+        private final Set<BinaryRow> partitions;
+
+        private final Predicate[] min;
+        private final Predicate[] max;
+
+        private MultiplePartitionPredicate(
+                RowDataToObjectArrayConverter converter, Set<BinaryRow> 
partitions) {
+            this.partitions = partitions;
+            RowType partitionType = converter.rowType();
+            int fieldNum = partitionType.getFieldCount();
+            @SuppressWarnings("unchecked")
+            Serializer<Object>[] serializers = new Serializer[fieldNum];
+            FullFieldStatsCollector[] collectors = new 
FullFieldStatsCollector[fieldNum];
+            min = new Predicate[fieldNum];
+            max = new Predicate[fieldNum];
+            for (int i = 0; i < fieldNum; i++) {
+                serializers[i] = 
InternalSerializers.create(partitionType.getTypeAt(i));
+                collectors[i] = new FullFieldStatsCollector();
+            }
+            for (BinaryRow part : partitions) {
+                Object[] fields = converter.convert(part);
+                for (int i = 0; i < fields.length; i++) {
+                    collectors[i].collect(fields[i], serializers[i]);
+                }
+            }
+            PredicateBuilder builder = new PredicateBuilder(partitionType);
+            for (int i = 0; i < collectors.length; i++) {
+                FieldStats stats = collectors[i].result();
+                min[i] = builder.greaterOrEqual(i, stats.minValue());
+                max[i] = builder.lessOrEqual(i, stats.maxValue());
+            }
+        }
+
+        @Override
+        public boolean test(BinaryRow part) {
+            return partitions.contains(part);
+        }
+
+        @Override
+        public boolean test(long rowCount, FieldStats[] fieldStats) {
+            if (fieldStats.length == 0) {
+                return true;
+            }
+
+            for (int i = 0; i < fieldStats.length; i++) {
+                if (min[i].test(rowCount, fieldStats) && max[i].test(rowCount, 
fieldStats)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
new file mode 100644
index 000000000..71c46fb38
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.format.FieldStats;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
+import static org.apache.paimon.predicate.PredicateBuilder.and;
+import static org.apache.paimon.predicate.PredicateBuilder.or;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PartitionPredicate}. */
+public class PartitionPredicateTest {
+
+    @Test
+    public void testNoPartition() {
+        PartitionPredicate predicate =
+                PartitionPredicate.fromMultiple(RowType.of(), 
Collections.singletonList(EMPTY_ROW));
+
+        assertThat(predicate.test(EMPTY_ROW)).isTrue();
+        assertThat(predicate.test(1, new FieldStats[] {})).isTrue();
+    }
+
+    @Test
+    public void testPartition() {
+        RowType type = DataTypes.ROW(DataTypes.INT(), DataTypes.INT());
+        PredicateBuilder builder = new PredicateBuilder(type);
+        Predicate predicate =
+                or(
+                        and(builder.equal(0, 3), builder.equal(1, 5)),
+                        and(builder.equal(0, 4), builder.equal(1, 6)));
+
+        PartitionPredicate p1 = PartitionPredicate.fromPredicate(type, 
predicate);
+        PartitionPredicate p2 =
+                PartitionPredicate.fromMultiple(
+                        type, Arrays.asList(createPart(3, 5), createPart(4, 
6)));
+
+        assertThat(vailidate(p1, p2, createPart(3, 4))).isFalse();
+        assertThat(vailidate(p1, p2, createPart(3, 5))).isTrue();
+        assertThat(vailidate(p1, p2, createPart(4, 6))).isTrue();
+        assertThat(vailidate(p1, p2, createPart(4, 5))).isFalse();
+
+        assertThat(
+                        vailidate(
+                                p1,
+                                new FieldStats[] {
+                                    new FieldStats(4, 8, 0L), new 
FieldStats(10, 12, 0L)
+                                }))
+                .isFalse();
+        assertThat(
+                        vailidate(
+                                p2,
+                                new FieldStats[] {
+                                    new FieldStats(4, 8, 0L), new 
FieldStats(10, 12, 0L)
+                                }))
+                .isTrue();
+        assertThat(
+                        vailidate(
+                                p2,
+                                new FieldStats[] {
+                                    new FieldStats(6, 8, 0L), new 
FieldStats(10, 12, 0L)
+                                }))
+                .isFalse();
+
+        assertThat(
+                        vailidate(
+                                p1,
+                                new FieldStats[] {
+                                    new FieldStats(4, 8, 0L), new 
FieldStats(5, 12, 0L)
+                                }))
+                .isTrue();
+        assertThat(
+                        vailidate(
+                                p2,
+                                new FieldStats[] {
+                                    new FieldStats(4, 8, 0L), new 
FieldStats(5, 12, 0L)
+                                }))
+                .isTrue();
+
+        assertThat(
+                        vailidate(
+                                p1,
+                                new FieldStats[] {
+                                    new FieldStats(1, 2, 0L), new 
FieldStats(2, 3, 0L)
+                                }))
+                .isFalse();
+        assertThat(
+                        vailidate(
+                                p2,
+                                new FieldStats[] {
+                                    new FieldStats(1, 2, 0L), new 
FieldStats(2, 3, 0L)
+                                }))
+                .isFalse();
+    }
+
+    private boolean vailidate(
+            PartitionPredicate predicate1, PartitionPredicate predicate2, 
BinaryRow part) {
+        boolean ret = predicate1.test(part);
+        assertThat(predicate2.test(part)).isEqualTo(ret);
+        return ret;
+    }
+
+    private boolean vailidate(PartitionPredicate predicate, FieldStats[] 
fieldStats) {
+        return predicate.test(3, fieldStats);
+    }
+
+    private static BinaryRow createPart(int i, int j) {
+        BinaryRow row = new BinaryRow(2);
+        BinaryRowWriter writer = new BinaryRowWriter(row);
+        writer.writeInt(0, i);
+        writer.writeInt(1, j);
+        writer.complete();
+        return row;
+    }
+}

Reply via email to