This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 52e9d19a60 [core] Make PartitionPredicate Public
52e9d19a60 is described below

commit 52e9d19a603a6854e3f1d66f298c420e706e2f38
Author: JingsongLi <[email protected]>
AuthorDate: Mon Jul 14 14:30:56 2025 +0800

    [core] Make PartitionPredicate Public
---
 .../main/java/org/apache/paimon/types/RowType.java |  2 +-
 .../paimon/partition/PartitionPredicate.java       | 25 ++++++++++++++++++++--
 .../paimon/table/source/ReadBuilderImpl.java       |  4 ++--
 .../apache/paimon/flink/action/CompactAction.java  |  8 +++----
 .../apache/paimon/flink/action/RescaleAction.java  |  4 ++--
 .../paimon/flink/action/SortCompactAction.java     |  2 +-
 6 files changed, 33 insertions(+), 12 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java 
b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
index e886c97ca6..a7f867aeb3 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
@@ -323,7 +323,7 @@ public final class RowType extends DataType {
                 .copy(isNullable());
     }
 
-    public int[] projectNames(List<String> names) {
+    public int[] projectIndexes(List<String> names) {
         List<String> fieldNames = 
fields.stream().map(DataField::name).collect(Collectors.toList());
         return names.stream().mapToInt(fieldNames::indexOf).toArray();
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index 4a9b7d646a..f3d9e9030c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -48,11 +48,26 @@ import static 
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecTo
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
-/** A special predicate to filter partition only, just like {@link Predicate}. 
*/
+/**
+ * A special predicate to filter partition only, just like {@link Predicate}.
+ *
+ * @since 1.3.0
+ */
 public interface PartitionPredicate extends Serializable {
 
-    boolean test(BinaryRow part);
+    /**
+     * Test based on the specific partition.
+     *
+     * @return return true when hit, false when not hit.
+     */
+    boolean test(BinaryRow partition);
 
+    /**
+     * Test based on the statistical information to determine whether a hit is 
possible.
+     *
+     * @return return true is likely to hit (there may also be false 
positives), return false is
+     *     absolutely not possible to hit.
+     */
     boolean test(
             long rowCount, InternalRow minValues, InternalRow maxValues, 
InternalArray nullCounts);
 
@@ -69,11 +84,13 @@ public interface PartitionPredicate extends Serializable {
         return new DefaultPartitionPredicate(predicate);
     }
 
+    /** Create {@link PartitionPredicate} from multiple partitions. */
     @Nullable
     static PartitionPredicate fromMultiple(RowType partitionType, 
List<BinaryRow> partitions) {
         return fromMultiple(partitionType, new HashSet<>(partitions));
     }
 
+    /** Create {@link PartitionPredicate} from multiple partitions. */
     @Nullable
     static PartitionPredicate fromMultiple(RowType partitionType, 
Set<BinaryRow> partitions) {
         if (partitionType.getFieldCount() == 0 || partitions.isEmpty()) {
@@ -123,6 +140,8 @@ public interface PartitionPredicate extends Serializable {
     /** A {@link PartitionPredicate} using {@link Predicate}. */
     class DefaultPartitionPredicate implements PartitionPredicate {
 
+        private static final long serialVersionUID = 1L;
+
         private final Predicate predicate;
 
         private DefaultPartitionPredicate(Predicate predicate) {
@@ -150,6 +169,8 @@ public interface PartitionPredicate extends Serializable {
      */
     class MultiplePartitionPredicate implements PartitionPredicate {
 
+        private static final long serialVersionUID = 1L;
+
         private final Set<BinaryRow> partitions;
         private final int fieldNum;
         private final Predicate[] min;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index f12c4fe4be..63be2e8765 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -103,8 +103,8 @@ public class ReadBuilderImpl implements ReadBuilder {
     }
 
     @Override
-    public ReadBuilder withPartitionFilter(@Nullable PartitionPredicate 
partitions) {
-        this.partitionFilter = partitions;
+    public ReadBuilder withPartitionFilter(@Nullable PartitionPredicate 
partitionPredicate) {
+        this.partitionFilter = partitionPredicate;
         return this;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index b0368082aa..5528fb2ee5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -174,7 +174,7 @@ public class CompactAction extends TableActionBase {
                 new CompactorSourceBuilder(identifier.getFullName(), table);
         CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table, 
fullCompaction);
 
-        sourceBuilder.withPartitionPredicate(getPredicate());
+        sourceBuilder.withPartitionPredicate(getPartitionPredicate());
         DataStreamSource<RowData> source =
                 sourceBuilder
                         .withEnv(env)
@@ -189,13 +189,13 @@ public class CompactAction extends TableActionBase {
             throws Exception {
         AppendTableCompactBuilder builder =
                 new AppendTableCompactBuilder(env, identifier.getFullName(), 
table);
-        builder.withPartitionPredicate(getPredicate());
+        builder.withPartitionPredicate(getPartitionPredicate());
         builder.withContinuousMode(isStreaming);
         builder.withPartitionIdleTime(partitionIdleTime);
         builder.build();
     }
 
-    protected PartitionPredicate getPredicate() throws Exception {
+    protected PartitionPredicate getPartitionPredicate() throws Exception {
         Preconditions.checkArgument(
                 partitions == null || whereSql == null,
                 "partitions and where cannot be used together.");
@@ -245,7 +245,7 @@ public class CompactAction extends TableActionBase {
                     predicate
                             .visit(
                                     new PredicateProjectionConverter(
-                                            
table.rowType().projectNames(table.partitionKeys())))
+                                            
table.rowType().projectIndexes(table.partitionKeys())))
                             .orElseThrow(
                                     () ->
                                             new RuntimeException(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
index fb3d068ed1..3855a5115d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
@@ -95,7 +95,7 @@ public class RescaleAction extends TableActionBase {
                 String.valueOf(snapshot.id()));
         fileStoreTable = fileStoreTable.copy(dynamicOptions);
 
-        PartitionPredicate predicate =
+        PartitionPredicate partitionPredicate =
                 PartitionPredicate.fromMap(
                         fileStoreTable.schema().logicalPartitionType(),
                         partition,
@@ -109,7 +109,7 @@ public class RescaleAction extends TableActionBase {
                                 scanParallelism == null
                                         ? currentBucketNum(snapshot)
                                         : scanParallelism)
-                        .partitionPredicate(predicate)
+                        .partitionPredicate(partitionPredicate)
                         .build();
 
         Map<String, String> bucketOptions = new 
HashMap<>(fileStoreTable.options());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index 535cdfc7a9..69d07d238b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -91,7 +91,7 @@ public class SortCompactAction extends CompactAction {
                                                 identifier.getObjectName())
                                         .asSummaryString());
 
-        sourceBuilder.partitionPredicate(getPredicate());
+        sourceBuilder.partitionPredicate(getPartitionPredicate());
 
         String scanParallelism = 
tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
         if (scanParallelism != null) {

Reply via email to