This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7c7fb2050 [core] Unify Partition Predicate generation in
PartitionPredicate (#3427)
7c7fb2050 is described below
commit 7c7fb2050bf29cab56e724a9c10b515057ed1454
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu May 30 10:13:32 2024 +0800
[core] Unify Partition Predicate generation in PartitionPredicate (#3427)
---
.../apache/paimon/predicate/PredicateBuilder.java | 40 +++---------
.../paimon/utils/InternalRowPartitionComputer.java | 21 ++++++-
.../utils/RowDataToObjectArrayConverter.java | 16 -----
.../java/org/apache/paimon/AbstractFileStore.java | 1 +
.../apache/paimon/manifest/ManifestFileMeta.java | 4 +-
.../paimon/operation/FileStoreCommitImpl.java | 14 ++++-
.../paimon/partition/PartitionPredicate.java | 72 ++++++++++++++++++++--
.../table/source/snapshot/SnapshotReaderImpl.java | 29 +++------
.../apache/paimon/utils/FileStorePathFactory.java | 6 +-
.../paimon/operation/FileStoreCommitTest.java | 7 ++-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 4 +-
.../paimon/flink/action/SortCompactAction.java | 11 +++-
.../paimon/flink/action/TableActionBase.java | 3 +-
.../UnawareBucketCompactionTopoBuilder.java | 8 ++-
.../flink/lookup/FileStoreLookupFunction.java | 23 ++++---
.../flink/sink/partition/PartitionMarkDone.java | 10 +--
.../flink/source/CompactorSourceBuilder.java | 10 ++-
.../apache/paimon/hive/HiveMetastoreClient.java | 6 +-
.../paimon/hive/utils/HiveSplitGenerator.java | 14 ++++-
.../paimon/spark/PaimonPartitionManagement.scala | 4 +-
.../commands/DeleteFromPaimonTableCommand.scala | 4 +-
.../apache/paimon/spark/sources/StreamHelper.scala | 13 ++--
22 files changed, 196 insertions(+), 124 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index 2f50c05eb..acb861b1c 100644
---
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -19,7 +19,6 @@
package org.apache.paimon.predicate;
import org.apache.paimon.annotation.Public;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.Timestamp;
@@ -28,8 +27,6 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
-import org.apache.paimon.utils.TypeUtils;
import javax.annotation.Nullable;
@@ -52,6 +49,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Collections.singletonList;
+import static
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
/**
* A utility class to create {@link Predicate} object for common filter
conditions.
@@ -360,14 +358,15 @@ public class PredicateBuilder {
}
@Nullable
- public static Predicate partition(Map<String, String> map, RowType
rowType) {
- // TODO: It is somewhat misleading that an empty map creates a null
predicate filter
+ public static Predicate partition(
+ Map<String, String> map, RowType rowType, String defaultPartValue)
{
+ Map<String, Object> internalValues = convertSpecToInternal(map,
rowType, defaultPartValue);
List<String> fieldNames = rowType.getFieldNames();
Predicate predicate = null;
PredicateBuilder builder = new PredicateBuilder(rowType);
- for (Map.Entry<String, String> entry : map.entrySet()) {
+ for (Map.Entry<String, Object> entry : internalValues.entrySet()) {
int idx = fieldNames.indexOf(entry.getKey());
- Object literal = TypeUtils.castFromString(entry.getValue(),
rowType.getTypeAt(idx));
+ Object literal = internalValues.get(entry.getKey());
Predicate predicateTemp =
literal == null ? builder.isNull(idx) : builder.equal(idx,
literal);
if (predicate == null) {
@@ -379,32 +378,11 @@ public class PredicateBuilder {
return predicate;
}
- public static Predicate partitions(List<Map<String, String>> partitions,
RowType rowType) {
+ public static Predicate partitions(
+ List<Map<String, String>> partitions, RowType rowType, String
defaultPartValue) {
return PredicateBuilder.or(
partitions.stream()
- .map(p -> PredicateBuilder.partition(p, rowType))
+ .map(p -> PredicateBuilder.partition(p, rowType,
defaultPartValue))
.toArray(Predicate[]::new));
}
-
- public static Predicate equalPartition(BinaryRow partition, RowType
partitionType) {
- Preconditions.checkArgument(
- partition.getFieldCount() == partitionType.getFieldCount(),
- "Partition's field count should be equal to partitionType's
field count.");
-
- RowDataToObjectArrayConverter converter = new
RowDataToObjectArrayConverter(partitionType);
- Predicate predicate = null;
- PredicateBuilder builder = new PredicateBuilder(partitionType);
- Object[] literals = converter.convert(partition);
- for (int i = 0; i < literals.length; i++) {
- Predicate predicateTemp =
- literals[i] == null ? builder.isNull(i) : builder.equal(i,
literals[i]);
- if (predicate == null) {
- predicate = predicateTemp;
- } else {
- predicate = PredicateBuilder.and(predicate, predicateTemp);
- }
- }
-
- return predicate;
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/RowDataPartitionComputer.java
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
similarity index 75%
rename from
paimon-core/src/main/java/org/apache/paimon/utils/RowDataPartitionComputer.java
rename to
paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
index a5a9c21fe..bdc5197fe 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/RowDataPartitionComputer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
@@ -24,15 +24,18 @@ import org.apache.paimon.types.RowType;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.TypeUtils.castFromString;
/** PartitionComputer for {@link InternalRow}. */
-public class RowDataPartitionComputer {
+public class InternalRowPartitionComputer {
protected final String defaultPartValue;
protected final String[] partitionColumns;
protected final InternalRow.FieldGetter[] partitionFieldGetters;
- public RowDataPartitionComputer(
+ public InternalRowPartitionComputer(
String defaultPartValue, RowType rowType, String[]
partitionColumns) {
this.defaultPartValue = defaultPartValue;
this.partitionColumns = partitionColumns;
@@ -60,4 +63,18 @@ public class RowDataPartitionComputer {
}
return partSpec;
}
+
+ public static Map<String, Object> convertSpecToInternal(
+ Map<String, String> spec, RowType partType, String
defaultPartValue) {
+ Map<String, Object> partValues = new LinkedHashMap<>();
+ for (Map.Entry<String, String> entry : spec.entrySet()) {
+ partValues.put(
+ entry.getKey(),
+ defaultPartValue.equals(entry.getValue())
+ ? null
+ : castFromString(
+ entry.getValue(),
partType.getField(entry.getKey()).type()));
+ }
+ return partValues;
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
index 436a51ada..679278018 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
@@ -18,16 +18,11 @@
package org.apache.paimon.utils;
-import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.types.RowType;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
import java.util.stream.IntStream;
/** Convert {@link InternalRow} to object array. */
@@ -68,15 +63,4 @@ public class RowDataToObjectArrayConverter implements
Serializable {
}
return result;
}
-
- public Predicate createEqualPredicate(BinaryRow binaryRow) {
- PredicateBuilder builder = new PredicateBuilder(rowType);
- List<Predicate> fieldPredicates = new ArrayList<>();
- Object[] partitionObjects = convert(binaryRow);
- for (int i = 0; i < getArity(); i++) {
- Object o = partitionObjects[i];
- fieldPredicates.add(o == null ? builder.isNull(i) :
builder.equal(i, o));
- }
- return PredicateBuilder.and(fieldPredicates);
- }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 75c532a5c..a5cba5c87 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -179,6 +179,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
schemaManager,
commitUser,
partitionType,
+ options.partitionDefaultName(),
pathFactory(),
snapshotManager(),
manifestFileFactory(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 3049e44c7..2dc091d88 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -46,6 +46,7 @@ import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Metadata of a manifest file. */
@@ -405,7 +406,8 @@ public class ManifestFileMeta {
List<Predicate> predicateList =
partitions.stream()
- .map(rowArrayConverter::createEqualPredicate)
+ .map(rowArrayConverter::convert)
+ .map(values ->
createPartitionPredicate(partitionType, values))
.collect(Collectors.toList());
predicateOpt = Optional.of(PredicateBuilder.or(predicateList));
} else {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 0001d5b19..e5efbbac6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -75,6 +75,7 @@ import java.util.stream.Collectors;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
/**
@@ -105,6 +106,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private final SchemaManager schemaManager;
private final String commitUser;
private final RowType partitionType;
+ private final String partitionDefaultName;
private final FileStorePathFactory pathFactory;
private final SnapshotManager snapshotManager;
private final ManifestFile manifestFile;
@@ -133,6 +135,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
SchemaManager schemaManager,
String commitUser,
RowType partitionType,
+ String partitionDefaultName,
FileStorePathFactory pathFactory,
SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
@@ -152,6 +155,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.schemaManager = schemaManager;
this.commitUser = commitUser;
this.partitionType = partitionType;
+ this.partitionDefaultName = partitionDefaultName;
this.pathFactory = pathFactory;
this.snapshotManager = snapshotManager;
this.manifestFile = manifestFileFactory.create();
@@ -413,7 +417,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
.map(ManifestEntry::partition)
.distinct()
// partition filter is built from new
data's partitions
- .map(p ->
PredicateBuilder.equalPartition(p, partitionType))
+ .map(p ->
createPartitionPredicate(partitionType, p))
.reduce(PredicateBuilder::or)
.orElseThrow(
() ->
@@ -421,7 +425,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
"Failed to get
dynamic partition filter. This is unexpected."));
}
} else {
- partitionFilter = PredicateBuilder.partition(partition,
partitionType);
+ partitionFilter =
+ createPartitionPredicate(partition, partitionType,
partitionDefaultName);
// sanity check, all changes must be done within the given
partition
if (partitionFilter != null) {
for (ManifestEntry entry : appendTableFiles) {
@@ -492,7 +497,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
Predicate partitionFilter =
partitions.stream()
- .map(partition ->
PredicateBuilder.partition(partition, partitionType))
+ .map(
+ partition ->
+ createPartitionPredicate(
+ partition, partitionType,
partitionDefaultName))
.reduce(PredicateBuilder::or)
.orElseThrow(() -> new RuntimeException("Failed to get
partition filter."));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index b8893db6b..c2434d4b3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -28,14 +28,20 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.statistics.FullSimpleColStatsCollector;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import javax.annotation.Nullable;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import static
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
/** A special predicate to filter partition only, just like {@link Predicate}.
*/
public interface PartitionPredicate {
@@ -121,11 +127,13 @@ public interface PartitionPredicate {
PredicateBuilder builder = new PredicateBuilder(partitionType);
for (int i = 0; i < collectors.length; i++) {
SimpleColStats stats = collectors[i].result();
- Object minValue = stats.min();
- Object maxValue = stats.max();
-
- min[i] = minValue == null ? builder.isNull(i) :
builder.greaterOrEqual(i, minValue);
- max[i] = maxValue == null ? builder.isNull(i) :
builder.lessOrEqual(i, maxValue);
+ if (stats.nullCount() == partitions.size()) {
+ min[i] = builder.isNull(i);
+ max[i] = builder.isNull(i);
+ } else {
+ min[i] = builder.greaterOrEqual(i,
checkNotNull(stats.min()));
+ max[i] = builder.lessOrEqual(i, checkNotNull(stats.max()));
+ }
}
}
@@ -153,4 +161,58 @@ public interface PartitionPredicate {
return true;
}
}
+
+ static Predicate createPartitionPredicate(RowType rowType, Map<String,
Object> partition) {
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ List<String> fieldNames = rowType.getFieldNames();
+ Predicate predicate = null;
+ for (Map.Entry<String, Object> entry : partition.entrySet()) {
+ Object literal = entry.getValue();
+ int idx = fieldNames.indexOf(entry.getKey());
+ Predicate predicateTemp =
+ literal == null ? builder.isNull(idx) : builder.equal(idx,
literal);
+ if (predicate == null) {
+ predicate = predicateTemp;
+ } else {
+ predicate = PredicateBuilder.and(predicate, predicateTemp);
+ }
+ }
+ return predicate;
+ }
+
+ static Predicate createPartitionPredicate(RowType partitionType, Object[]
partition) {
+ Preconditions.checkArgument(
+ partition.length == partitionType.getFieldCount(),
+ "Partition's field count should be equal to partitionType's
field count.");
+
+ Map<String, Object> partitionMap = new HashMap<>(partition.length);
+ for (int i = 0; i < partition.length; i++) {
+ partitionMap.put(partitionType.getFields().get(i).name(),
partition[i]);
+ }
+
+ return createPartitionPredicate(partitionType, partitionMap);
+ }
+
+ static Predicate createPartitionPredicate(RowType partitionType, BinaryRow
partition) {
+ Preconditions.checkArgument(
+ partition.getFieldCount() == partitionType.getFieldCount(),
+ "Partition's field count should be equal to partitionType's
field count.");
+ RowDataToObjectArrayConverter converter = new
RowDataToObjectArrayConverter(partitionType);
+ return createPartitionPredicate(partitionType,
converter.convert(partition));
+ }
+
+ @Nullable
+ static Predicate createPartitionPredicate(
+ Map<String, String> spec, RowType rowType, String
defaultPartValue) {
+ Map<String, Object> internalValues = convertSpecToInternal(spec,
rowType, defaultPartValue);
+ return createPartitionPredicate(rowType, internalValues);
+ }
+
+ static Predicate createPartitionPredicate(
+ List<Map<String, String>> partitions, RowType rowType, String
defaultPartValue) {
+ return PredicateBuilder.or(
+ partitions.stream()
+ .map(p -> createPartitionPredicate(p, rowType,
defaultPartValue))
+ .toArray(Predicate[]::new));
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index a6082bdcd..be91096ab 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -41,12 +41,10 @@ import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
-import org.apache.paimon.utils.TypeUtils;
import java.util.ArrayList;
import java.util.Collections;
@@ -58,10 +56,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
/** Implementation of {@link SnapshotReader}. */
@@ -140,25 +138,12 @@ public class SnapshotReaderImpl implements SnapshotReader
{
@Override
public SnapshotReader withPartitionFilter(Map<String, String>
partitionSpec) {
if (partitionSpec != null) {
- List<String> partitionKeys = tableSchema.partitionKeys();
- RowType rowType = tableSchema.logicalPartitionType();
- PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
- List<Predicate> partitionFilters =
- partitionSpec.entrySet().stream()
- .map(
- m -> {
- int index =
partitionKeys.indexOf(m.getKey());
- Object value =
-
TypeUtils.castFromStringInternal(
- m.getValue(),
-
rowType.getTypeAt(index),
- false);
- return value == null
- ?
predicateBuilder.isNull(index)
- :
predicateBuilder.equal(index, value);
- })
- .collect(Collectors.toList());
- scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
+ Predicate partitionPredicate =
+ createPartitionPredicate(
+ partitionSpec,
+ tableSchema.logicalPartitionType(),
+ options.partitionDefaultName());
+ scan.withPartitionFilter(partitionPredicate);
}
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 0f3ad7fec..7696f9ada 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -39,7 +39,7 @@ public class FileStorePathFactory {
private final Path root;
private final String uuid;
- private final RowDataPartitionComputer partitionComputer;
+ private final InternalRowPartitionComputer partitionComputer;
private final String formatIdentifier;
private final AtomicInteger manifestFileCount;
@@ -68,10 +68,10 @@ public class FileStorePathFactory {
}
@VisibleForTesting
- public static RowDataPartitionComputer getPartitionComputer(
+ public static InternalRowPartitionComputer getPartitionComputer(
RowType partitionType, String defaultPartValue) {
String[] partitionColumns = partitionType.getFieldNames().toArray(new
String[0]);
- return new RowDataPartitionComputer(defaultPartValue, partitionType,
partitionColumns);
+ return new InternalRowPartitionComputer(defaultPartValue,
partitionType, partitionColumns);
}
public Path newManifestFile() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 7953bd6c6..695b023aa 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -79,6 +79,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -658,8 +659,10 @@ public class FileStoreCommitTest {
partitions.stream()
.map(
partition ->
- PredicateBuilder.partition(
- partition,
TestKeyValueGenerator.DEFAULT_PART_TYPE))
+ createPartitionPredicate(
+ partition,
+
TestKeyValueGenerator.DEFAULT_PART_TYPE,
+
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue()))
.reduce(PredicateBuilder::or)
.get();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 90effe57f..7896ec923 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -32,8 +32,8 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.RowDataPartitionComputer;
import org.apache.paimon.utils.StringUtils;
import org.apache.flink.table.api.TableSchema;
@@ -860,7 +860,7 @@ public class FlinkCatalog extends AbstractCatalog {
org.apache.paimon.types.RowType partitionRowType =
fileStoreTable.schema().logicalPartitionType();
- RowDataPartitionComputer partitionComputer =
+ InternalRowPartitionComputer partitionComputer =
FileStorePathFactory.getPartitionComputer(
partitionRowType,
new
CoreOptions(table.options()).partitionDefaultName());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index a9d283a61..efa2f386d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -42,6 +42,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+
/** Compact with sort action. */
public class SortCompactAction extends CompactAction {
@@ -96,7 +98,14 @@ public class SortCompactAction extends CompactAction {
Predicate partitionPredicate =
PredicateBuilder.or(
getPartitions().stream()
- .map(p -> PredicateBuilder.partition(p,
table.rowType()))
+ .map(
+ p ->
+ createPartitionPredicate(
+ p,
+ table.rowType(),
+ ((FileStoreTable)
table)
+
.coreOptions()
+
.partitionDefaultName()))
.toArray(Predicate[]::new));
sourceBuilder.predicate(partitionPredicate);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 6a9793f44..c525b133b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.flink.api.dag.Transformation;
@@ -60,7 +59,7 @@ public abstract class TableActionBase extends ActionBase {
public TableResult batchSink(DataStream<RowData> dataStream) {
List<Transformation<?>> transformations =
Collections.singletonList(
- new FlinkSinkBuilder((FileStoreTable) table)
+ new FlinkSinkBuilder(table)
.forRowData(dataStream)
.build()
.getTransformation());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
index 7d986463d..edbca53b2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -24,7 +24,6 @@ import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -38,6 +37,8 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+
/**
* Build for unaware-bucket table flink compaction job.
*
@@ -96,7 +97,10 @@ public class UnawareBucketCompactionTopoBuilder {
isContinuous,
scanInterval,
specifiedPartitions != null
- ?
PredicateBuilder.partitions(specifiedPartitions, table.rowType())
+ ? createPartitionPredicate(
+ specifiedPartitions,
+ table.rowType(),
+
table.coreOptions().partitionDefaultName())
: null);
return BucketUnawareCompactSource.buildSource(env, source,
isContinuous, tableIdentifier);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 3f81fdf9a..e2efbcbc5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -28,13 +28,12 @@ import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
-import org.apache.paimon.utils.InternalRowUtils;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
@@ -57,8 +56,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -69,6 +70,7 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE;
import static
org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable;
import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS;
import static
org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
/** A lookup {@link TableFunction} for file store. */
@@ -252,16 +254,17 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
private Predicate createSpecificPartFilter(BinaryRow partition) {
RowType rowType = table.rowType();
- List<String> fieldNames = rowType.getFieldNames();
List<String> partitionKeys = table.partitionKeys();
- PredicateBuilder builder = new PredicateBuilder(rowType);
- List<Predicate> predicates = new ArrayList<>();
- for (int i = 0; i < partitionKeys.size(); i++) {
- int index = fieldNames.indexOf(partitionKeys.get(i));
- Object value = InternalRowUtils.get(partition, i,
rowType.getTypeAt(index));
- predicates.add(value == null ? builder.isNull(index) :
builder.equal(index, value));
+ Object[] partitionSpec =
+ new
RowDataToObjectArrayConverter(rowType.project(partitionKeys))
+ .convert(partition);
+ Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
+ for (int i = 0; i < partitionSpec.length; i++) {
+ partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
}
- return PredicateBuilder.and(predicates);
+
+ // create partition predicate base on rowType instead of partitionType
+ return createPartitionPredicate(rowType, partitionMap);
}
private void reopen() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index b4b4aa0bd..e39c9c807 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -26,7 +26,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.utils.RowDataPartitionComputer;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -63,7 +63,7 @@ public class PartitionMarkDone implements Closeable {
"mark-done-pending-partitions",
new ListSerializer<>(StringSerializer.INSTANCE));
- private final RowDataPartitionComputer partitionComputer;
+ private final InternalRowPartitionComputer partitionComputer;
private final PartitionMarkDoneTrigger trigger;
private final List<PartitionMarkDoneAction> actions;
@@ -100,8 +100,8 @@ public class PartitionMarkDone implements Closeable {
"Table should enable %s",
METASTORE_PARTITIONED_TABLE.key());
- RowDataPartitionComputer partitionComputer =
- new RowDataPartitionComputer(
+ InternalRowPartitionComputer partitionComputer =
+ new InternalRowPartitionComputer(
coreOptions.partitionDefaultName(),
table.schema().logicalPartitionType(),
partitionKeys.toArray(new String[0]));
@@ -136,7 +136,7 @@ public class PartitionMarkDone implements Closeable {
}
public PartitionMarkDone(
- RowDataPartitionComputer partitionComputer,
+ InternalRowPartitionComputer partitionComputer,
PartitionMarkDoneTrigger trigger,
List<PartitionMarkDoneAction> actions) {
this.partitionComputer = partitionComputer;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index e963c92fc..2b9e52466 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -43,6 +43,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+
/**
* Source builder to build a Flink {@link StaticFileStoreSource} or {@link
* ContinuousFileStoreSource}. This is for dedicated compactor jobs.
@@ -89,7 +91,13 @@ public class CompactorSourceBuilder {
partitionPredicate =
PredicateBuilder.or(
specifiedPartitions.stream()
- .map(p -> PredicateBuilder.partition(p,
table.rowType()))
+ .map(
+ p ->
+ createPartitionPredicate(
+ p,
+ table.rowType(),
+ table.coreOptions()
+
.partitionDefaultName()))
.toArray(Predicate[]::new));
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
index 76b1f3cf2..e2771c0e6 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java
@@ -23,8 +23,8 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;
-import org.apache.paimon.utils.RowDataPartitionComputer;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -40,7 +40,7 @@ import java.util.List;
public class HiveMetastoreClient implements MetastoreClient {
private final Identifier identifier;
- private final RowDataPartitionComputer partitionComputer;
+ private final InternalRowPartitionComputer partitionComputer;
private final IMetaStoreClient client;
private final StorageDescriptor sd;
@@ -49,7 +49,7 @@ public class HiveMetastoreClient implements MetastoreClient {
throws Exception {
this.identifier = identifier;
this.partitionComputer =
- new RowDataPartitionComputer(
+ new InternalRowPartitionComputer(
new
CoreOptions(schema.options()).partitionDefaultName(),
schema.logicalPartitionType(),
schema.partitionKeys().toArray(new String[0]));
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index 6dab4d4af..33cbc19e0 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -19,6 +19,7 @@
package org.apache.paimon.hive.utils;
import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
@@ -45,6 +46,7 @@ import static java.util.Collections.singletonMap;
import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
import static org.apache.paimon.hive.utils.HiveUtils.createPredicate;
import static org.apache.paimon.hive.utils.HiveUtils.extractTagName;
+import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
/** Generator to generate hive input splits. */
public class HiveSplitGenerator {
@@ -85,7 +87,8 @@ public class HiveSplitGenerator {
createPartitionPredicate(
table.schema().logicalRowType(),
table.schema().partitionKeys(),
- location)
+ location,
+ table.coreOptions().partitionDefaultName())
.ifPresent(predicatePerPartition::add);
scan = table.newScan();
@@ -105,7 +108,10 @@ public class HiveSplitGenerator {
}
private static Optional<Predicate> createPartitionPredicate(
- RowType rowType, List<String> partitionKeys, String partitionDir) {
+ RowType rowType,
+ List<String> partitionKeys,
+ String partitionDir,
+ String defaultPartName) {
Set<String> partitionKeySet = new HashSet<>(partitionKeys);
LinkedHashMap<String, String> partition = new LinkedHashMap<>();
for (String s : partitionDir.split("/")) {
@@ -124,7 +130,9 @@ public class HiveSplitGenerator {
if (partition.isEmpty() || partition.size() != partitionKeys.size()) {
return Optional.empty();
} else {
- return Optional.ofNullable(PredicateBuilder.partition(partition,
rowType));
+ return Optional.ofNullable(
+ PartitionPredicate.createPartitionPredicate(
+ partition, rowType, defaultPartName));
}
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index bebb1ac4a..a803e1b62 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.operation.FileStoreCommit
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.BatchWriteBuilder
import org.apache.paimon.types.RowType
-import org.apache.paimon.utils.RowDataPartitionComputer
+import org.apache.paimon.utils.InternalRowPartitionComputer
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -55,7 +55,7 @@ trait PaimonPartitionManagement extends
SupportsPartitionManagement {
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema()))
.apply(internalRow)
.asInstanceOf[Row]
- val rowDataPartitionComputer = new RowDataPartitionComputer(
+ val rowDataPartitionComputer = new InternalRowPartitionComputer(
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
tableRowType,
partitionKeys.asScala.toArray)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 86d2aee8b..a2d807ed1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -28,7 +28,7 @@ import
org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
import org.apache.paimon.types.RowKind
-import org.apache.paimon.utils.RowDataPartitionComputer
+import org.apache.paimon.utils.InternalRowPartitionComputer
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.PaimonUtils.createDataset
@@ -82,7 +82,7 @@ case class DeleteFromPaimonTableCommand(
) {
val matchedPartitions =
table.newSnapshotReader().withPartitionFilter(partitionPredicate.get).partitions().asScala
- val rowDataPartitionComputer = new RowDataPartitionComputer(
+ val rowDataPartitionComputer = new InternalRowPartitionComputer(
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
table.schema().logicalPartitionType(),
table.partitionKeys.asScala.toArray
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
index ea23790ca..dbce04719 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala
@@ -25,7 +25,7 @@ import org.apache.paimon.table.DataTable
import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan}
import org.apache.paimon.table.source.TableScan.Plan
import org.apache.paimon.table.source.snapshot.StartingContext
-import org.apache.paimon.utils.{RowDataPartitionComputer, TypeUtils}
+import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
import org.apache.spark.sql.connector.read.streaming.ReadLimit
import org.apache.spark.sql.execution.datasources.PartitioningUtils
@@ -49,11 +49,12 @@ private[spark] trait StreamHelper {
private lazy val partitionSchema: StructType =
SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(),
table.partitionKeys()))
- private lazy val partitionComputer: RowDataPartitionComputer = new
RowDataPartitionComputer(
- new CoreOptions(table.options).partitionDefaultName,
- TypeUtils.project(table.rowType(), table.partitionKeys()),
- table.partitionKeys().asScala.toArray
- )
+ private lazy val partitionComputer: InternalRowPartitionComputer =
+ new InternalRowPartitionComputer(
+ new CoreOptions(table.options).partitionDefaultName,
+ TypeUtils.project(table.rowType(), table.partitionKeys()),
+ table.partitionKeys().asScala.toArray
+ )
// Used to get the initial offset.
lazy val streamScanStartingContext: StartingContext =
streamScan.startingContext()