This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c7fb2050 [core] Unify Partition Predicate generation in 
PartitionPredicate (#3427)
7c7fb2050 is described below

commit 7c7fb2050bf29cab56e724a9c10b515057ed1454
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 30 10:13:32 2024 +0800

    [core] Unify Partition Predicate generation in PartitionPredicate (#3427)
---
 .../apache/paimon/predicate/PredicateBuilder.java  | 40 +++---------
 .../paimon/utils/InternalRowPartitionComputer.java | 21 ++++++-
 .../utils/RowDataToObjectArrayConverter.java       | 16 -----
 .../java/org/apache/paimon/AbstractFileStore.java  |  1 +
 .../apache/paimon/manifest/ManifestFileMeta.java   |  4 +-
 .../paimon/operation/FileStoreCommitImpl.java      | 14 ++++-
 .../paimon/partition/PartitionPredicate.java       | 72 ++++++++++++++++++++--
 .../table/source/snapshot/SnapshotReaderImpl.java  | 29 +++------
 .../apache/paimon/utils/FileStorePathFactory.java  |  6 +-
 .../paimon/operation/FileStoreCommitTest.java      |  7 ++-
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  4 +-
 .../paimon/flink/action/SortCompactAction.java     | 11 +++-
 .../paimon/flink/action/TableActionBase.java       |  3 +-
 .../UnawareBucketCompactionTopoBuilder.java        |  8 ++-
 .../flink/lookup/FileStoreLookupFunction.java      | 23 ++++---
 .../flink/sink/partition/PartitionMarkDone.java    | 10 +--
 .../flink/source/CompactorSourceBuilder.java       | 10 ++-
 .../apache/paimon/hive/HiveMetastoreClient.java    |  6 +-
 .../paimon/hive/utils/HiveSplitGenerator.java      | 14 ++++-
 .../paimon/spark/PaimonPartitionManagement.scala   |  4 +-
 .../commands/DeleteFromPaimonTableCommand.scala    |  4 +-
 .../apache/paimon/spark/sources/StreamHelper.scala | 13 ++--
 22 files changed, 196 insertions(+), 124 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index 2f50c05eb..acb861b1c 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.predicate;
 
 import org.apache.paimon.annotation.Public;
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.Timestamp;
@@ -28,8 +27,6 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
-import org.apache.paimon.utils.TypeUtils;
 
 import javax.annotation.Nullable;
 
@@ -52,6 +49,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.singletonList;
+import static 
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
 
 /**
  * A utility class to create {@link Predicate} object for common filter 
conditions.
@@ -360,14 +358,15 @@ public class PredicateBuilder {
     }
 
     @Nullable
-    public static Predicate partition(Map<String, String> map, RowType 
rowType) {
-        // TODO: It is somewhat misleading that an empty map creates a null 
predicate filter
+    public static Predicate partition(
+            Map<String, String> map, RowType rowType, String defaultPartValue) 
{
+        Map<String, Object> internalValues = convertSpecToInternal(map, 
rowType, defaultPartValue);
         List<String> fieldNames = rowType.getFieldNames();
         Predicate predicate = null;
         PredicateBuilder builder = new PredicateBuilder(rowType);
-        for (Map.Entry<String, String> entry : map.entrySet()) {
+        for (Map.Entry<String, Object> entry : internalValues.entrySet()) {
             int idx = fieldNames.indexOf(entry.getKey());
-            Object literal = TypeUtils.castFromString(entry.getValue(), 
rowType.getTypeAt(idx));
+            Object literal = internalValues.get(entry.getKey());
             Predicate predicateTemp =
                     literal == null ? builder.isNull(idx) : builder.equal(idx, 
literal);
             if (predicate == null) {
@@ -379,32 +378,11 @@ public class PredicateBuilder {
         return predicate;
     }
 
-    public static Predicate partitions(List<Map<String, String>> partitions, 
RowType rowType) {
+    public static Predicate partitions(
+            List<Map<String, String>> partitions, RowType rowType, String 
defaultPartValue) {
         return PredicateBuilder.or(
                 partitions.stream()
-                        .map(p -> PredicateBuilder.partition(p, rowType))
+                        .map(p -> PredicateBuilder.partition(p, rowType, 
defaultPartValue))
                         .toArray(Predicate[]::new));
     }
-
-    public static Predicate equalPartition(BinaryRow partition, RowType 
partitionType) {
-        Preconditions.checkArgument(
-                partition.getFieldCount() == partitionType.getFieldCount(),
-                "Partition's field count should be equal to partitionType's 
field count.");
-
-        RowDataToObjectArrayConverter converter = new 
RowDataToObjectArrayConverter(partitionType);
-        Predicate predicate = null;
-        PredicateBuilder builder = new PredicateBuilder(partitionType);
-        Object[] literals = converter.convert(partition);
-        for (int i = 0; i < literals.length; i++) {
-            Predicate predicateTemp =
-                    literals[i] == null ? builder.isNull(i) : builder.equal(i, 
literals[i]);
-            if (predicate == null) {
-                predicate = predicateTemp;
-            } else {
-                predicate = PredicateBuilder.and(predicate, predicateTemp);
-            }
-        }
-
-        return predicate;
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/RowDataPartitionComputer.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
similarity index 75%
rename from 
paimon-core/src/main/java/org/apache/paimon/utils/RowDataPartitionComputer.java
rename to 
paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
index a5a9c21fe..bdc5197fe 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/RowDataPartitionComputer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
@@ -24,15 +24,18 @@ import org.apache.paimon.types.RowType;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.TypeUtils.castFromString;
 
 /** PartitionComputer for {@link InternalRow}. */
-public class RowDataPartitionComputer {
+public class InternalRowPartitionComputer {
 
     protected final String defaultPartValue;
     protected final String[] partitionColumns;
     protected final InternalRow.FieldGetter[] partitionFieldGetters;
 
-    public RowDataPartitionComputer(
+    public InternalRowPartitionComputer(
             String defaultPartValue, RowType rowType, String[] 
partitionColumns) {
         this.defaultPartValue = defaultPartValue;
         this.partitionColumns = partitionColumns;
@@ -60,4 +63,18 @@ public class RowDataPartitionComputer {
         }
         return partSpec;
     }
+
+    public static Map<String, Object> convertSpecToInternal(
+            Map<String, String> spec, RowType partType, String 
defaultPartValue) {
+        Map<String, Object> partValues = new LinkedHashMap<>();
+        for (Map.Entry<String, String> entry : spec.entrySet()) {
+            partValues.put(
+                    entry.getKey(),
+                    defaultPartValue.equals(entry.getValue())
+                            ? null
+                            : castFromString(
+                                    entry.getValue(), 
partType.getField(entry.getKey()).type()));
+        }
+        return partValues;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
index 436a51ada..679278018 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
@@ -18,16 +18,11 @@
 
 package org.apache.paimon.utils;
 
-import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.types.RowType;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.stream.IntStream;
 
 /** Convert {@link InternalRow} to object array. */
@@ -68,15 +63,4 @@ public class RowDataToObjectArrayConverter implements 
Serializable {
         }
         return result;
     }
-
-    public Predicate createEqualPredicate(BinaryRow binaryRow) {
-        PredicateBuilder builder = new PredicateBuilder(rowType);
-        List<Predicate> fieldPredicates = new ArrayList<>();
-        Object[] partitionObjects = convert(binaryRow);
-        for (int i = 0; i < getArity(); i++) {
-            Object o = partitionObjects[i];
-            fieldPredicates.add(o == null ? builder.isNull(i) : 
builder.equal(i, o));
-        }
-        return PredicateBuilder.and(fieldPredicates);
-    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 75c532a5c..a5cba5c87 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -179,6 +179,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 schemaManager,
                 commitUser,
                 partitionType,
+                options.partitionDefaultName(),
                 pathFactory(),
                 snapshotManager(),
                 manifestFileFactory(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 3049e44c7..2dc091d88 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -46,6 +46,7 @@ import java.util.Set;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Metadata of a manifest file. */
@@ -405,7 +406,8 @@ public class ManifestFileMeta {
 
             List<Predicate> predicateList =
                     partitions.stream()
-                            .map(rowArrayConverter::createEqualPredicate)
+                            .map(rowArrayConverter::convert)
+                            .map(values -> 
createPartitionPredicate(partitionType, values))
                             .collect(Collectors.toList());
             predicateOpt = Optional.of(PredicateBuilder.or(predicateList));
         } else {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 0001d5b19..e5efbbac6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -75,6 +75,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 
 /**
@@ -105,6 +106,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private final SchemaManager schemaManager;
     private final String commitUser;
     private final RowType partitionType;
+    private final String partitionDefaultName;
     private final FileStorePathFactory pathFactory;
     private final SnapshotManager snapshotManager;
     private final ManifestFile manifestFile;
@@ -133,6 +135,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             SchemaManager schemaManager,
             String commitUser,
             RowType partitionType,
+            String partitionDefaultName,
             FileStorePathFactory pathFactory,
             SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
@@ -152,6 +155,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.schemaManager = schemaManager;
         this.commitUser = commitUser;
         this.partitionType = partitionType;
+        this.partitionDefaultName = partitionDefaultName;
         this.pathFactory = pathFactory;
         this.snapshotManager = snapshotManager;
         this.manifestFile = manifestFileFactory.create();
@@ -413,7 +417,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                     .map(ManifestEntry::partition)
                                     .distinct()
                                     // partition filter is built from new 
data's partitions
-                                    .map(p -> 
PredicateBuilder.equalPartition(p, partitionType))
+                                    .map(p -> 
createPartitionPredicate(partitionType, p))
                                     .reduce(PredicateBuilder::or)
                                     .orElseThrow(
                                             () ->
@@ -421,7 +425,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                                             "Failed to get 
dynamic partition filter. This is unexpected."));
                 }
             } else {
-                partitionFilter = PredicateBuilder.partition(partition, 
partitionType);
+                partitionFilter =
+                        createPartitionPredicate(partition, partitionType, 
partitionDefaultName);
                 // sanity check, all changes must be done within the given 
partition
                 if (partitionFilter != null) {
                     for (ManifestEntry entry : appendTableFiles) {
@@ -492,7 +497,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
         Predicate partitionFilter =
                 partitions.stream()
-                        .map(partition -> 
PredicateBuilder.partition(partition, partitionType))
+                        .map(
+                                partition ->
+                                        createPartitionPredicate(
+                                                partition, partitionType, 
partitionDefaultName))
                         .reduce(PredicateBuilder::or)
                         .orElseThrow(() -> new RuntimeException("Failed to get 
partition filter."));
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index b8893db6b..c2434d4b3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -28,14 +28,20 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.statistics.FullSimpleColStatsCollector;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
 /** A special predicate to filter partition only, just like {@link Predicate}. 
*/
 public interface PartitionPredicate {
 
@@ -121,11 +127,13 @@ public interface PartitionPredicate {
             PredicateBuilder builder = new PredicateBuilder(partitionType);
             for (int i = 0; i < collectors.length; i++) {
                 SimpleColStats stats = collectors[i].result();
-                Object minValue = stats.min();
-                Object maxValue = stats.max();
-
-                min[i] = minValue == null ? builder.isNull(i) : 
builder.greaterOrEqual(i, minValue);
-                max[i] = maxValue == null ? builder.isNull(i) : 
builder.lessOrEqual(i, maxValue);
+                if (stats.nullCount() == partitions.size()) {
+                    min[i] = builder.isNull(i);
+                    max[i] = builder.isNull(i);
+                } else {
+                    min[i] = builder.greaterOrEqual(i, 
checkNotNull(stats.min()));
+                    max[i] = builder.lessOrEqual(i, checkNotNull(stats.max()));
+                }
             }
         }
 
@@ -153,4 +161,58 @@ public interface PartitionPredicate {
             return true;
         }
     }
+
+    static Predicate createPartitionPredicate(RowType rowType, Map<String, 
Object> partition) {
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        List<String> fieldNames = rowType.getFieldNames();
+        Predicate predicate = null;
+        for (Map.Entry<String, Object> entry : partition.entrySet()) {
+            Object literal = entry.getValue();
+            int idx = fieldNames.indexOf(entry.getKey());
+            Predicate predicateTemp =
+                    literal == null ? builder.isNull(idx) : builder.equal(idx, 
literal);
+            if (predicate == null) {
+                predicate = predicateTemp;
+            } else {
+                predicate = PredicateBuilder.and(predicate, predicateTemp);
+            }
+        }
+        return predicate;
+    }
+
+    static Predicate createPartitionPredicate(RowType partitionType, Object[] 
partition) {
+        Preconditions.checkArgument(
+                partition.length == partitionType.getFieldCount(),
+                "Partition's field count should be equal to partitionType's 
field count.");
+
+        Map<String, Object> partitionMap = new HashMap<>(partition.length);
+        for (int i = 0; i < partition.length; i++) {
+            partitionMap.put(partitionType.getFields().get(i).name(), 
partition[i]);
+        }
+
+        return createPartitionPredicate(partitionType, partitionMap);
+    }
+
+    static Predicate createPartitionPredicate(RowType partitionType, BinaryRow 
partition) {
+        Preconditions.checkArgument(
+                partition.getFieldCount() == partitionType.getFieldCount(),
+                "Partition's field count should be equal to partitionType's 
field count.");
+        RowDataToObjectArrayConverter converter = new 
RowDataToObjectArrayConverter(partitionType);
+        return createPartitionPredicate(partitionType, 
converter.convert(partition));
+    }
+
+    @Nullable
+    static Predicate createPartitionPredicate(
+            Map<String, String> spec, RowType rowType, String 
defaultPartValue) {
+        Map<String, Object> internalValues = convertSpecToInternal(spec, 
rowType, defaultPartValue);
+        return createPartitionPredicate(rowType, internalValues);
+    }
+
+    static Predicate createPartitionPredicate(
+            List<Map<String, String>> partitions, RowType rowType, String 
defaultPartValue) {
+        return PredicateBuilder.or(
+                partitions.stream()
+                        .map(p -> createPartitionPredicate(p, rowType, 
defaultPartValue))
+                        .toArray(Predicate[]::new));
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index a6082bdcd..be91096ab 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -41,12 +41,10 @@ import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.PlanImpl;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.TypeUtils;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -58,10 +56,10 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
 
 /** Implementation of {@link SnapshotReader}. */
@@ -140,25 +138,12 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
     @Override
     public SnapshotReader withPartitionFilter(Map<String, String> 
partitionSpec) {
         if (partitionSpec != null) {
-            List<String> partitionKeys = tableSchema.partitionKeys();
-            RowType rowType = tableSchema.logicalPartitionType();
-            PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
-            List<Predicate> partitionFilters =
-                    partitionSpec.entrySet().stream()
-                            .map(
-                                    m -> {
-                                        int index = 
partitionKeys.indexOf(m.getKey());
-                                        Object value =
-                                                
TypeUtils.castFromStringInternal(
-                                                        m.getValue(),
-                                                        
rowType.getTypeAt(index),
-                                                        false);
-                                        return value == null
-                                                ? 
predicateBuilder.isNull(index)
-                                                : 
predicateBuilder.equal(index, value);
-                                    })
-                            .collect(Collectors.toList());
-            scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
+            Predicate partitionPredicate =
+                    createPartitionPredicate(
+                            partitionSpec,
+                            tableSchema.logicalPartitionType(),
+                            options.partitionDefaultName());
+            scan.withPartitionFilter(partitionPredicate);
         }
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 0f3ad7fec..7696f9ada 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -39,7 +39,7 @@ public class FileStorePathFactory {
 
     private final Path root;
     private final String uuid;
-    private final RowDataPartitionComputer partitionComputer;
+    private final InternalRowPartitionComputer partitionComputer;
     private final String formatIdentifier;
 
     private final AtomicInteger manifestFileCount;
@@ -68,10 +68,10 @@ public class FileStorePathFactory {
     }
 
     @VisibleForTesting
-    public static RowDataPartitionComputer getPartitionComputer(
+    public static InternalRowPartitionComputer getPartitionComputer(
             RowType partitionType, String defaultPartValue) {
         String[] partitionColumns = partitionType.getFieldNames().toArray(new 
String[0]);
-        return new RowDataPartitionComputer(defaultPartValue, partitionType, 
partitionColumns);
+        return new InternalRowPartitionComputer(defaultPartValue, 
partitionType, partitionColumns);
     }
 
     public Path newManifestFile() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 7953bd6c6..695b023aa 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -79,6 +79,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -658,8 +659,10 @@ public class FileStoreCommitTest {
                 partitions.stream()
                         .map(
                                 partition ->
-                                        PredicateBuilder.partition(
-                                                partition, 
TestKeyValueGenerator.DEFAULT_PART_TYPE))
+                                        createPartitionPredicate(
+                                                partition,
+                                                
TestKeyValueGenerator.DEFAULT_PART_TYPE,
+                                                
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue()))
                         .reduce(PredicateBuilder::or)
                         .get();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 90effe57f..7896ec923 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -32,8 +32,8 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.RowDataPartitionComputer;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.table.api.TableSchema;
@@ -860,7 +860,7 @@ public class FlinkCatalog extends AbstractCatalog {
             org.apache.paimon.types.RowType partitionRowType =
                     fileStoreTable.schema().logicalPartitionType();
 
-            RowDataPartitionComputer partitionComputer =
+            InternalRowPartitionComputer partitionComputer =
                     FileStorePathFactory.getPartitionComputer(
                             partitionRowType,
                             new 
CoreOptions(table.options()).partitionDefaultName());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index a9d283a61..efa2f386d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -42,6 +42,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+
 /** Compact with sort action. */
 public class SortCompactAction extends CompactAction {
 
@@ -96,7 +98,14 @@ public class SortCompactAction extends CompactAction {
             Predicate partitionPredicate =
                     PredicateBuilder.or(
                             getPartitions().stream()
-                                    .map(p -> PredicateBuilder.partition(p, 
table.rowType()))
+                                    .map(
+                                            p ->
+                                                    createPartitionPredicate(
+                                                            p,
+                                                            table.rowType(),
+                                                            ((FileStoreTable) 
table)
+                                                                    
.coreOptions()
+                                                                    
.partitionDefaultName()))
                                     .toArray(Predicate[]::new));
             sourceBuilder.predicate(partitionPredicate);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 6a9793f44..c525b133b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.action;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 
 import org.apache.flink.api.dag.Transformation;
@@ -60,7 +59,7 @@ public abstract class TableActionBase extends ActionBase {
     public TableResult batchSink(DataStream<RowData> dataStream) {
         List<Transformation<?>> transformations =
                 Collections.singletonList(
-                        new FlinkSinkBuilder((FileStoreTable) table)
+                        new FlinkSinkBuilder(table)
                                 .forRowData(dataStream)
                                 .build()
                                 .getTransformation());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
index 7d986463d..edbca53b2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -24,7 +24,6 @@ import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
 import org.apache.paimon.flink.source.BucketUnawareCompactSource;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -38,6 +37,8 @@ import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+
 /**
  * Build for unaware-bucket table flink compaction job.
  *
@@ -96,7 +97,10 @@ public class UnawareBucketCompactionTopoBuilder {
                         isContinuous,
                         scanInterval,
                         specifiedPartitions != null
-                                ? 
PredicateBuilder.partitions(specifiedPartitions, table.rowType())
+                                ? createPartitionPredicate(
+                                        specifiedPartitions,
+                                        table.rowType(),
+                                        
table.coreOptions().partitionDefaultName())
                                 : null);
 
         return BucketUnawareCompactSource.buildSource(env, source, 
isContinuous, tableIdentifier);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 3f81fdf9a..e2efbcbc5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -28,13 +28,12 @@ import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.OutOfRangeException;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileIOUtils;
-import org.apache.paimon.utils.InternalRowUtils;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
 import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
 
@@ -57,8 +56,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
@@ -69,6 +70,7 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
 import static 
org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
 import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
 import static 
org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
 
 /** A lookup {@link TableFunction} for file store. */
@@ -252,16 +254,17 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
     private Predicate createSpecificPartFilter(BinaryRow partition) {
         RowType rowType = table.rowType();
-        List<String> fieldNames = rowType.getFieldNames();
         List<String> partitionKeys = table.partitionKeys();
-        PredicateBuilder builder = new PredicateBuilder(rowType);
-        List<Predicate> predicates = new ArrayList<>();
-        for (int i = 0; i < partitionKeys.size(); i++) {
-            int index = fieldNames.indexOf(partitionKeys.get(i));
-            Object value = InternalRowUtils.get(partition, i, 
rowType.getTypeAt(index));
-            predicates.add(value == null ? builder.isNull(index) : 
builder.equal(index, value));
+        Object[] partitionSpec =
+                new 
RowDataToObjectArrayConverter(rowType.project(partitionKeys))
+                        .convert(partition);
+        Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
+        for (int i = 0; i < partitionSpec.length; i++) {
+            partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
         }
-        return PredicateBuilder.and(predicates);
+
+        // create partition predicate base on rowType instead of partitionType
+        return createPartitionPredicate(rowType, partitionMap);
     }
 
     private void reopen() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index b4b4aa0bd..e39c9c807 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -26,7 +26,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionTimeExtractor;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.utils.RowDataPartitionComputer;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -63,7 +63,7 @@ public class PartitionMarkDone implements Closeable {
                     "mark-done-pending-partitions",
                     new ListSerializer<>(StringSerializer.INSTANCE));
 
-    private final RowDataPartitionComputer partitionComputer;
+    private final InternalRowPartitionComputer partitionComputer;
     private final PartitionMarkDoneTrigger trigger;
     private final List<PartitionMarkDoneAction> actions;
 
@@ -100,8 +100,8 @@ public class PartitionMarkDone implements Closeable {
                 "Table should enable %s",
                 METASTORE_PARTITIONED_TABLE.key());
 
-        RowDataPartitionComputer partitionComputer =
-                new RowDataPartitionComputer(
+        InternalRowPartitionComputer partitionComputer =
+                new InternalRowPartitionComputer(
                         coreOptions.partitionDefaultName(),
                         table.schema().logicalPartitionType(),
                         partitionKeys.toArray(new String[0]));
@@ -136,7 +136,7 @@ public class PartitionMarkDone implements Closeable {
     }
 
     public PartitionMarkDone(
-            RowDataPartitionComputer partitionComputer,
+            InternalRowPartitionComputer partitionComputer,
             PartitionMarkDoneTrigger trigger,
             List<PartitionMarkDoneAction> actions) {
         this.partitionComputer = partitionComputer;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index e963c92fc..2b9e52466 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -43,6 +43,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+
 /**
  * Source builder to build a Flink {@link StaticFileStoreSource} or {@link
  * ContinuousFileStoreSource}. This is for dedicated compactor jobs.
@@ -89,7 +91,13 @@ public class CompactorSourceBuilder {
             partitionPredicate =
                     PredicateBuilder.or(
                             specifiedPartitions.stream()
-                                    .map(p -> PredicateBuilder.partition(p, 
table.rowType()))
+                                    .map(
+                                            p ->
+                                                    createPartitionPredicate(
+                                                            p,
+                                                            table.rowType(),
+                                                            table.coreOptions()
+                                                                    
.partitionDefaultName()))
                                     .toArray(Predicate[]::new));
         }
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index 76b1f3cf2..e2771c0e6 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -23,8 +23,8 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.PartitionPathUtils;
-import org.apache.paimon.utils.RowDataPartitionComputer;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -40,7 +40,7 @@ import java.util.List;
 public class HiveMetastoreClient implements MetastoreClient {
 
     private final Identifier identifier;
-    private final RowDataPartitionComputer partitionComputer;
+    private final InternalRowPartitionComputer partitionComputer;
 
     private final IMetaStoreClient client;
     private final StorageDescriptor sd;
@@ -49,7 +49,7 @@ public class HiveMetastoreClient implements MetastoreClient {
             throws Exception {
         this.identifier = identifier;
         this.partitionComputer =
-                new RowDataPartitionComputer(
+                new InternalRowPartitionComputer(
                         new 
CoreOptions(schema.options()).partitionDefaultName(),
                         schema.logicalPartitionType(),
                         schema.partitionKeys().toArray(new String[0]));
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index 6dab4d4af..33cbc19e0 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.hive.utils;
 
 import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
@@ -45,6 +46,7 @@ import static java.util.Collections.singletonMap;
 import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
 import static org.apache.paimon.hive.utils.HiveUtils.createPredicate;
 import static org.apache.paimon.hive.utils.HiveUtils.extractTagName;
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 
 /** Generator to generate hive input splits. */
 public class HiveSplitGenerator {
@@ -85,7 +87,8 @@ public class HiveSplitGenerator {
                 createPartitionPredicate(
                                 table.schema().logicalRowType(),
                                 table.schema().partitionKeys(),
-                                location)
+                                location,
+                                table.coreOptions().partitionDefaultName())
                         .ifPresent(predicatePerPartition::add);
 
                 scan = table.newScan();
@@ -105,7 +108,10 @@ public class HiveSplitGenerator {
     }
 
     private static Optional<Predicate> createPartitionPredicate(
-            RowType rowType, List<String> partitionKeys, String partitionDir) {
+            RowType rowType,
+            List<String> partitionKeys,
+            String partitionDir,
+            String defaultPartName) {
         Set<String> partitionKeySet = new HashSet<>(partitionKeys);
         LinkedHashMap<String, String> partition = new LinkedHashMap<>();
         for (String s : partitionDir.split("/")) {
@@ -124,7 +130,9 @@ public class HiveSplitGenerator {
         if (partition.isEmpty() || partition.size() != partitionKeys.size()) {
             return Optional.empty();
         } else {
-            return Optional.ofNullable(PredicateBuilder.partition(partition, 
rowType));
+            return Optional.ofNullable(
+                    PartitionPredicate.createPartitionPredicate(
+                            partition, rowType, defaultPartName));
         }
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index bebb1ac4a..a803e1b62 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.operation.FileStoreCommit
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.BatchWriteBuilder
 import org.apache.paimon.types.RowType
-import org.apache.paimon.utils.RowDataPartitionComputer
+import org.apache.paimon.utils.InternalRowPartitionComputer
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -55,7 +55,7 @@ trait PaimonPartitionManagement extends 
SupportsPartitionManagement {
       
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema()))
       .apply(internalRow)
       .asInstanceOf[Row]
-    val rowDataPartitionComputer = new RowDataPartitionComputer(
+    val rowDataPartitionComputer = new InternalRowPartitionComputer(
       CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
       tableRowType,
       partitionKeys.asScala.toArray)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 86d2aee8b..a2d807ed1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -28,7 +28,7 @@ import 
org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
 import org.apache.paimon.types.RowKind
-import org.apache.paimon.utils.RowDataPartitionComputer
+import org.apache.paimon.utils.InternalRowPartitionComputer
 
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils.createDataset
@@ -82,7 +82,7 @@ case class DeleteFromPaimonTableCommand(
       ) {
         val matchedPartitions =
           
table.newSnapshotReader().withPartitionFilter(partitionPredicate.get).partitions().asScala
-        val rowDataPartitionComputer = new RowDataPartitionComputer(
+        val rowDataPartitionComputer = new InternalRowPartitionComputer(
           CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
           table.schema().logicalPartitionType(),
           table.partitionKeys.asScala.toArray
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index ea23790ca..dbce04719 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -25,7 +25,7 @@ import org.apache.paimon.table.DataTable
 import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan}
 import org.apache.paimon.table.source.TableScan.Plan
 import org.apache.paimon.table.source.snapshot.StartingContext
-import org.apache.paimon.utils.{RowDataPartitionComputer, TypeUtils}
+import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
 
 import org.apache.spark.sql.connector.read.streaming.ReadLimit
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
@@ -49,11 +49,12 @@ private[spark] trait StreamHelper {
   private lazy val partitionSchema: StructType =
     SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(), 
table.partitionKeys()))
 
-  private lazy val partitionComputer: RowDataPartitionComputer = new 
RowDataPartitionComputer(
-    new CoreOptions(table.options).partitionDefaultName,
-    TypeUtils.project(table.rowType(), table.partitionKeys()),
-    table.partitionKeys().asScala.toArray
-  )
+  private lazy val partitionComputer: InternalRowPartitionComputer =
+    new InternalRowPartitionComputer(
+      new CoreOptions(table.options).partitionDefaultName,
+      TypeUtils.project(table.rowType(), table.partitionKeys()),
+      table.partitionKeys().asScala.toArray
+    )
 
   // Used to get the initial offset.
   lazy val streamScanStartingContext: StartingContext = 
streamScan.startingContext()

Reply via email to