This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4c8c3d9af [core] Introduce PartitionPredicate to improve performance
for multiple partitions (#1982)
4c8c3d9af is described below
commit 4c8c3d9afb748ac8d9e7b5bbd361189b2578bb10
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 13 17:52:59 2023 +0800
[core] Introduce PartitionPredicate to improve performance for multiple
partitions (#1982)
---
.../paimon/operation/AbstractFileStoreScan.java | 35 +++--
.../paimon/partition/PartitionPredicate.java | 133 +++++++++++++++++++
.../paimon/partition/PartitionPredicateTest.java | 142 +++++++++++++++++++++
3 files changed, 291 insertions(+), 19 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 2ea41bd6b..f92456cf8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -27,8 +27,8 @@ import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.FieldStatsArraySerializer;
@@ -38,7 +38,6 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParallellyExecuteUtils;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
@@ -58,7 +57,7 @@ import static
org.apache.paimon.utils.Preconditions.checkState;
public abstract class AbstractFileStoreScan implements FileStoreScan {
private final FieldStatsArraySerializer partitionStatsConverter;
- private final RowDataToObjectArrayConverter partitionConverter;
+ private final RowType partitionType;
private final SnapshotManager snapshotManager;
private final ManifestFile.Factory manifestFileFactory;
private final ManifestList manifestList;
@@ -69,7 +68,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final SchemaManager schemaManager;
protected final ScanBucketFilter bucketKeyFilter;
- private Predicate partitionFilter;
+ private PartitionPredicate partitionFilter;
private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private List<ManifestFileMeta> specifiedManifests = null;
@@ -90,7 +89,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
boolean checkNumOfBuckets,
Integer scanManifestParallelism) {
this.partitionStatsConverter = new
FieldStatsArraySerializer(partitionType);
- this.partitionConverter = new
RowDataToObjectArrayConverter(partitionType);
+ this.partitionType = partitionType;
this.bucketKeyFilter = bucketKeyFilter;
this.snapshotManager = snapshotManager;
this.schemaManager = schemaManager;
@@ -104,22 +103,22 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public FileStoreScan withPartitionFilter(Predicate predicate) {
- this.partitionFilter = predicate;
+ if (partitionType.getFieldCount() > 0 && predicate != null) {
+ this.partitionFilter =
PartitionPredicate.fromPredicate(partitionType, predicate);
+ } else {
+ this.partitionFilter = null;
+ }
return this;
}
@Override
public FileStoreScan withPartitionFilter(List<BinaryRow> partitions) {
- List<Predicate> predicates =
- partitions.stream()
- .filter(p -> p.getFieldCount() > 0)
- .map(partitionConverter::createEqualPredicate)
- .collect(Collectors.toList());
- if (predicates.isEmpty()) {
- return this;
+ if (partitionType.getFieldCount() > 0 && !partitions.isEmpty()) {
+ this.partitionFilter =
PartitionPredicate.fromMultiple(partitionType, partitions);
} else {
- return withPartitionFilter(PredicateBuilder.or(predicates));
+ this.partitionFilter = null;
}
+ return this;
}
@Override
@@ -251,10 +250,10 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) {
if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) {
String partInfo =
- partitionConverter.getArity() > 0
+ partitionType.getFieldCount() > 0
? "partition "
+
FileStorePathFactory.getPartitionComputer(
-
partitionConverter.rowType(),
+ partitionType,
FileStorePathFactory.PARTITION_DEFAULT_NAME
.defaultValue())
.generatePartValues(file.partition())
@@ -355,9 +354,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
Function<InternalRow, Integer> totalBucketGetter =
ManifestEntrySerializer.totalBucketGetter();
return row -> {
- if ((partitionFilter != null
- && !partitionFilter.test(
-
partitionConverter.convert(partitionGetter.apply(row))))) {
+ if ((partitionFilter != null &&
!partitionFilter.test(partitionGetter.apply(row)))) {
return false;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
new file mode 100644
index 000000000..687eac1fc
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.format.FieldStats;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.statistics.FullFieldStatsCollector;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** A special predicate to filter partition only, just like {@link Predicate}.
*/
+public interface PartitionPredicate {
+
+ boolean test(BinaryRow part);
+
+ boolean test(long rowCount, FieldStats[] fieldStats);
+
+ static PartitionPredicate fromPredicate(RowType partitionType, Predicate
predicate) {
+ return new DefaultPartitionPredicate(
+ new RowDataToObjectArrayConverter(partitionType), predicate);
+ }
+
+ static PartitionPredicate fromMultiple(RowType partitionType,
List<BinaryRow> partitions) {
+ return new MultiplePartitionPredicate(
+ new RowDataToObjectArrayConverter(partitionType), new
HashSet<>(partitions));
+ }
+
+ /** A {@link PartitionPredicate} using {@link Predicate}. */
+ class DefaultPartitionPredicate implements PartitionPredicate {
+
+ private final RowDataToObjectArrayConverter converter;
+ private final Predicate predicate;
+
+ private DefaultPartitionPredicate(
+ RowDataToObjectArrayConverter converter, Predicate predicate) {
+ this.converter = converter;
+ this.predicate = predicate;
+ }
+
+ @Override
+ public boolean test(BinaryRow part) {
+ return predicate.test(converter.convert(part));
+ }
+
+ @Override
+ public boolean test(long rowCount, FieldStats[] fieldStats) {
+ return predicate.test(rowCount, fieldStats);
+ }
+ }
+
+ /**
+ * A {@link PartitionPredicate} optimizing for multiple partitions. Its
FieldStats filtering
+ * effect may not be as good as {@link DefaultPartitionPredicate}.
+ */
+ class MultiplePartitionPredicate implements PartitionPredicate {
+
+ private final Set<BinaryRow> partitions;
+
+ private final Predicate[] min;
+ private final Predicate[] max;
+
+ private MultiplePartitionPredicate(
+ RowDataToObjectArrayConverter converter, Set<BinaryRow>
partitions) {
+ this.partitions = partitions;
+ RowType partitionType = converter.rowType();
+ int fieldNum = partitionType.getFieldCount();
+ @SuppressWarnings("unchecked")
+ Serializer<Object>[] serializers = new Serializer[fieldNum];
+ FullFieldStatsCollector[] collectors = new
FullFieldStatsCollector[fieldNum];
+ min = new Predicate[fieldNum];
+ max = new Predicate[fieldNum];
+ for (int i = 0; i < fieldNum; i++) {
+ serializers[i] =
InternalSerializers.create(partitionType.getTypeAt(i));
+ collectors[i] = new FullFieldStatsCollector();
+ }
+ for (BinaryRow part : partitions) {
+ Object[] fields = converter.convert(part);
+ for (int i = 0; i < fields.length; i++) {
+ collectors[i].collect(fields[i], serializers[i]);
+ }
+ }
+ PredicateBuilder builder = new PredicateBuilder(partitionType);
+ for (int i = 0; i < collectors.length; i++) {
+ FieldStats stats = collectors[i].result();
+ min[i] = builder.greaterOrEqual(i, stats.minValue());
+ max[i] = builder.lessOrEqual(i, stats.maxValue());
+ }
+ }
+
+ @Override
+ public boolean test(BinaryRow part) {
+ return partitions.contains(part);
+ }
+
+ @Override
+ public boolean test(long rowCount, FieldStats[] fieldStats) {
+ if (fieldStats.length == 0) {
+ return true;
+ }
+
+ for (int i = 0; i < fieldStats.length; i++) {
+ if (min[i].test(rowCount, fieldStats) && max[i].test(rowCount,
fieldStats)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
new file mode 100644
index 000000000..71c46fb38
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionPredicateTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.partition;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.format.FieldStats;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
+import static org.apache.paimon.predicate.PredicateBuilder.and;
+import static org.apache.paimon.predicate.PredicateBuilder.or;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link PartitionPredicate}. */
+public class PartitionPredicateTest {
+
+ @Test
+ public void testNoPartition() {
+ PartitionPredicate predicate =
+ PartitionPredicate.fromMultiple(RowType.of(),
Collections.singletonList(EMPTY_ROW));
+
+ assertThat(predicate.test(EMPTY_ROW)).isTrue();
+ assertThat(predicate.test(1, new FieldStats[] {})).isTrue();
+ }
+
+ @Test
+ public void testPartition() {
+ RowType type = DataTypes.ROW(DataTypes.INT(), DataTypes.INT());
+ PredicateBuilder builder = new PredicateBuilder(type);
+ Predicate predicate =
+ or(
+ and(builder.equal(0, 3), builder.equal(1, 5)),
+ and(builder.equal(0, 4), builder.equal(1, 6)));
+
+ PartitionPredicate p1 = PartitionPredicate.fromPredicate(type,
predicate);
+ PartitionPredicate p2 =
+ PartitionPredicate.fromMultiple(
+ type, Arrays.asList(createPart(3, 5), createPart(4,
6)));
+
+ assertThat(vailidate(p1, p2, createPart(3, 4))).isFalse();
+ assertThat(vailidate(p1, p2, createPart(3, 5))).isTrue();
+ assertThat(vailidate(p1, p2, createPart(4, 6))).isTrue();
+ assertThat(vailidate(p1, p2, createPart(4, 5))).isFalse();
+
+ assertThat(
+ vailidate(
+ p1,
+ new FieldStats[] {
+ new FieldStats(4, 8, 0L), new
FieldStats(10, 12, 0L)
+ }))
+ .isFalse();
+ assertThat(
+ vailidate(
+ p2,
+ new FieldStats[] {
+ new FieldStats(4, 8, 0L), new
FieldStats(10, 12, 0L)
+ }))
+ .isTrue();
+ assertThat(
+ vailidate(
+ p2,
+ new FieldStats[] {
+ new FieldStats(6, 8, 0L), new
FieldStats(10, 12, 0L)
+ }))
+ .isFalse();
+
+ assertThat(
+ vailidate(
+ p1,
+ new FieldStats[] {
+ new FieldStats(4, 8, 0L), new
FieldStats(5, 12, 0L)
+ }))
+ .isTrue();
+ assertThat(
+ vailidate(
+ p2,
+ new FieldStats[] {
+ new FieldStats(4, 8, 0L), new
FieldStats(5, 12, 0L)
+ }))
+ .isTrue();
+
+ assertThat(
+ vailidate(
+ p1,
+ new FieldStats[] {
+ new FieldStats(1, 2, 0L), new
FieldStats(2, 3, 0L)
+ }))
+ .isFalse();
+ assertThat(
+ vailidate(
+ p2,
+ new FieldStats[] {
+ new FieldStats(1, 2, 0L), new
FieldStats(2, 3, 0L)
+ }))
+ .isFalse();
+ }
+
+ private boolean vailidate(
+ PartitionPredicate predicate1, PartitionPredicate predicate2,
BinaryRow part) {
+ boolean ret = predicate1.test(part);
+ assertThat(predicate2.test(part)).isEqualTo(ret);
+ return ret;
+ }
+
+ private boolean vailidate(PartitionPredicate predicate, FieldStats[]
fieldStats) {
+ return predicate.test(3, fieldStats);
+ }
+
+ private static BinaryRow createPart(int i, int j) {
+ BinaryRow row = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeInt(0, i);
+ writer.writeInt(1, j);
+ writer.complete();
+ return row;
+ }
+}