This is an automated email from the ASF dual-hosted git repository. junhao pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit c16cf29e5edd4d7c13ac4b1bef263e1cc7f102a3 Author: Jingsong Lee <[email protected]> AuthorDate: Thu May 30 10:13:32 2024 +0800 [core] Unify Partition Predicate generation in PartitionPredicate (#3427) --- .../apache/paimon/predicate/PredicateBuilder.java | 40 +++--------- .../paimon/utils/InternalRowPartitionComputer.java | 21 ++++++- .../utils/RowDataToObjectArrayConverter.java | 16 ----- .../java/org/apache/paimon/AbstractFileStore.java | 1 + .../apache/paimon/manifest/ManifestFileMeta.java | 4 +- .../paimon/operation/FileStoreCommitImpl.java | 14 ++++- .../paimon/partition/PartitionPredicate.java | 72 ++++++++++++++++++++-- .../table/source/snapshot/SnapshotReaderImpl.java | 29 +++------ .../apache/paimon/utils/FileStorePathFactory.java | 6 +- .../paimon/operation/FileStoreCommitTest.java | 7 ++- .../java/org/apache/paimon/flink/FlinkCatalog.java | 4 +- .../paimon/flink/action/SortCompactAction.java | 11 +++- .../paimon/flink/action/TableActionBase.java | 3 +- .../UnawareBucketCompactionTopoBuilder.java | 8 ++- .../flink/lookup/FileStoreLookupFunction.java | 23 ++++--- .../flink/source/CompactorSourceBuilder.java | 10 ++- .../apache/paimon/hive/HiveMetastoreClient.java | 6 +- .../paimon/hive/utils/HiveSplitGenerator.java | 14 ++++- .../paimon/spark/PaimonPartitionManagement.scala | 4 +- .../commands/DeleteFromPaimonTableCommand.scala | 4 +- .../apache/paimon/spark/sources/StreamHelper.scala | 13 ++-- 21 files changed, 191 insertions(+), 119 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java index 2f50c05eb..acb861b1c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java +++ b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java @@ -19,7 +19,6 @@ package org.apache.paimon.predicate; import org.apache.paimon.annotation.Public; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.Timestamp; @@ -28,8 +27,6 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.utils.RowDataToObjectArrayConverter; -import org.apache.paimon.utils.TypeUtils; import javax.annotation.Nullable; @@ -52,6 +49,7 @@ import java.util.Set; import java.util.stream.Collectors; import static java.util.Collections.singletonList; +import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal; /** * A utility class to create {@link Predicate} object for common filter conditions. @@ -360,14 +358,15 @@ public class PredicateBuilder { } @Nullable - public static Predicate partition(Map<String, String> map, RowType rowType) { - // TODO: It is somewhat misleading that an empty map creates a null predicate filter + public static Predicate partition( + Map<String, String> map, RowType rowType, String defaultPartValue) { + Map<String, Object> internalValues = convertSpecToInternal(map, rowType, defaultPartValue); List<String> fieldNames = rowType.getFieldNames(); Predicate predicate = null; PredicateBuilder builder = new PredicateBuilder(rowType); - for (Map.Entry<String, String> entry : map.entrySet()) { + for (Map.Entry<String, Object> entry : internalValues.entrySet()) { int idx = fieldNames.indexOf(entry.getKey()); - Object literal = TypeUtils.castFromString(entry.getValue(), rowType.getTypeAt(idx)); + Object literal = internalValues.get(entry.getKey()); Predicate predicateTemp = literal == null ? builder.isNull(idx) : builder.equal(idx, literal); if (predicate == null) { @@ -379,32 +378,11 @@ public class PredicateBuilder { return predicate; } - public static Predicate partitions(List<Map<String, String>> partitions, RowType rowType) { + public static Predicate partitions( + List<Map<String, String>> partitions, RowType rowType, String defaultPartValue) { return PredicateBuilder.or( partitions.stream() - .map(p -> PredicateBuilder.partition(p, rowType)) + .map(p -> PredicateBuilder.partition(p, rowType, defaultPartValue)) .toArray(Predicate[]::new)); } - - public static Predicate equalPartition(BinaryRow partition, RowType partitionType) { - Preconditions.checkArgument( - partition.getFieldCount() == partitionType.getFieldCount(), - "Partition's field count should be equal to partitionType's field count."); - - RowDataToObjectArrayConverter converter = new RowDataToObjectArrayConverter(partitionType); - Predicate predicate = null; - PredicateBuilder builder = new PredicateBuilder(partitionType); - Object[] literals = converter.convert(partition); - for (int i = 0; i < literals.length; i++) { - Predicate predicateTemp = - literals[i] == null ? builder.isNull(i) : builder.equal(i, literals[i]); - if (predicate == null) { - predicate = predicateTemp; - } else { - predicate = PredicateBuilder.and(predicate, predicateTemp); - } - } - - return predicate; - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/RowDataPartitionComputer.java b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java similarity index 75% rename from paimon-core/src/main/java/org/apache/paimon/utils/RowDataPartitionComputer.java rename to paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java index a5a9c21fe..bdc5197fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/RowDataPartitionComputer.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java @@ -24,15 +24,18 @@ import org.apache.paimon.types.RowType; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; + +import static org.apache.paimon.utils.TypeUtils.castFromString; /** PartitionComputer for {@link InternalRow}. */ -public class RowDataPartitionComputer { +public class InternalRowPartitionComputer { protected final String defaultPartValue; protected final String[] partitionColumns; protected final InternalRow.FieldGetter[] partitionFieldGetters; - public RowDataPartitionComputer( + public InternalRowPartitionComputer( String defaultPartValue, RowType rowType, String[] partitionColumns) { this.defaultPartValue = defaultPartValue; this.partitionColumns = partitionColumns; @@ -60,4 +63,18 @@ public class RowDataPartitionComputer { } return partSpec; } + + public static Map<String, Object> convertSpecToInternal( + Map<String, String> spec, RowType partType, String defaultPartValue) { + Map<String, Object> partValues = new LinkedHashMap<>(); + for (Map.Entry<String, String> entry : spec.entrySet()) { + partValues.put( + entry.getKey(), + defaultPartValue.equals(entry.getValue()) + ? null + : castFromString( + entry.getValue(), partType.getField(entry.getKey()).type())); + } + return partValues; + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java index 436a51ada..679278018 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java @@ -18,16 +18,11 @@ package org.apache.paimon.utils; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.types.RowType; import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; import java.util.stream.IntStream; /** Convert {@link InternalRow} to object array. */ @@ -68,15 +63,4 @@ public class RowDataToObjectArrayConverter implements Serializable { } return result; } - - public Predicate createEqualPredicate(BinaryRow binaryRow) { - PredicateBuilder builder = new PredicateBuilder(rowType); - List<Predicate> fieldPredicates = new ArrayList<>(); - Object[] partitionObjects = convert(binaryRow); - for (int i = 0; i < getArity(); i++) { - Object o = partitionObjects[i]; - fieldPredicates.add(o == null ? builder.isNull(i) : builder.equal(i, o)); - } - return PredicateBuilder.and(fieldPredicates); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index e893c0525..9ce2e6017 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -184,6 +184,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> { schemaManager, commitUser, partitionType, + options.partitionDefaultName(), pathFactory(), snapshotManager(), manifestFileFactory(), diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java index 105e150a9..53c5bd6f8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java @@ -46,6 +46,7 @@ import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Metadata of a manifest file. */ @@ -405,7 +406,8 @@ public class ManifestFileMeta { List<Predicate> predicateList = partitions.stream() - .map(rowArrayConverter::createEqualPredicate) + .map(rowArrayConverter::convert) + .map(values -> createPartitionPredicate(partitionType, values)) .collect(Collectors.toList()); predicateOpt = Optional.of(PredicateBuilder.or(predicateList)); } else { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index dab336b81..2096b8cbf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -74,6 +74,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; +import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; /** @@ -104,6 +105,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final SchemaManager schemaManager; private final String commitUser; private final RowType partitionType; + private final String partitionDefaultName; private final FileStorePathFactory pathFactory; private final SnapshotManager snapshotManager; private final ManifestFile manifestFile; @@ -130,6 +132,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { SchemaManager schemaManager, String commitUser, RowType partitionType, + String partitionDefaultName, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, ManifestFile.Factory manifestFileFactory, @@ -148,6 +151,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { this.schemaManager = schemaManager; this.commitUser = commitUser; this.partitionType = partitionType; + this.partitionDefaultName = partitionDefaultName; this.pathFactory = pathFactory; this.snapshotManager = snapshotManager; this.manifestFile = manifestFileFactory.create(); @@ -408,7 +412,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { .map(ManifestEntry::partition) .distinct() // partition filter is built from new data's partitions - .map(p -> PredicateBuilder.equalPartition(p, partitionType)) + .map(p -> createPartitionPredicate(partitionType, p)) .reduce(PredicateBuilder::or) .orElseThrow( () -> @@ -416,7 +420,8 @@ public class FileStoreCommitImpl implements FileStoreCommit { "Failed to get dynamic partition filter. This is unexpected.")); } } else { - partitionFilter = PredicateBuilder.partition(partition, partitionType); + partitionFilter = + createPartitionPredicate(partition, partitionType, partitionDefaultName); // sanity check, all changes must be done within the given partition if (partitionFilter != null) { for (ManifestEntry entry : appendTableFiles) { @@ -487,7 +492,10 @@ public class FileStoreCommitImpl implements FileStoreCommit { Predicate partitionFilter = partitions.stream() - .map(partition -> PredicateBuilder.partition(partition, partitionType)) + .map( + partition -> + createPartitionPredicate( + partition, partitionType, partitionDefaultName)) .reduce(PredicateBuilder::or) .orElseThrow(() -> new RuntimeException("Failed to get partition filter.")); diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java index 663a4434b..8bd73b7eb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java +++ b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java @@ -28,14 +28,20 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.statistics.FullFieldStatsCollector; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.RowDataToObjectArrayConverter; import javax.annotation.Nullable; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal; +import static org.apache.paimon.utils.Preconditions.checkNotNull; + /** A special predicate to filter partition only, just like {@link Predicate}. */ public interface PartitionPredicate { @@ -121,11 +127,13 @@ public interface PartitionPredicate { PredicateBuilder builder = new PredicateBuilder(partitionType); for (int i = 0; i < collectors.length; i++) { FieldStats stats = collectors[i].result(); - Object minValue = stats.minValue(); - Object maxValue = stats.maxValue(); - - min[i] = minValue == null ? builder.isNull(i) : builder.greaterOrEqual(i, minValue); - max[i] = maxValue == null ? builder.isNull(i) : builder.lessOrEqual(i, maxValue); + if (stats.nullCount() == partitions.size()) { + min[i] = builder.isNull(i); + max[i] = builder.isNull(i); + } else { + min[i] = builder.greaterOrEqual(i, checkNotNull(stats.minValue())); + max[i] = builder.lessOrEqual(i, checkNotNull(stats.maxValue())); + } } } @@ -153,4 +161,58 @@ public interface PartitionPredicate { return true; } } + + static Predicate createPartitionPredicate(RowType rowType, Map<String, Object> partition) { + PredicateBuilder builder = new PredicateBuilder(rowType); + List<String> fieldNames = rowType.getFieldNames(); + Predicate predicate = null; + for (Map.Entry<String, Object> entry : partition.entrySet()) { + Object literal = entry.getValue(); + int idx = fieldNames.indexOf(entry.getKey()); + Predicate predicateTemp = + literal == null ? builder.isNull(idx) : builder.equal(idx, literal); + if (predicate == null) { + predicate = predicateTemp; + } else { + predicate = PredicateBuilder.and(predicate, predicateTemp); + } + } + return predicate; + } + + static Predicate createPartitionPredicate(RowType partitionType, Object[] partition) { + Preconditions.checkArgument( + partition.length == partitionType.getFieldCount(), + "Partition's field count should be equal to partitionType's field count."); + + Map<String, Object> partitionMap = new HashMap<>(partition.length); + for (int i = 0; i < partition.length; i++) { + partitionMap.put(partitionType.getFields().get(i).name(), partition[i]); + } + + return createPartitionPredicate(partitionType, partitionMap); + } + + static Predicate createPartitionPredicate(RowType partitionType, BinaryRow partition) { + Preconditions.checkArgument( + partition.getFieldCount() == partitionType.getFieldCount(), + "Partition's field count should be equal to partitionType's field count."); + RowDataToObjectArrayConverter converter = new RowDataToObjectArrayConverter(partitionType); + return createPartitionPredicate(partitionType, converter.convert(partition)); + } + + @Nullable + static Predicate createPartitionPredicate( + Map<String, String> spec, RowType rowType, String defaultPartValue) { + Map<String, Object> internalValues = convertSpecToInternal(spec, rowType, defaultPartValue); + return createPartitionPredicate(rowType, internalValues); + } + + static Predicate createPartitionPredicate( + List<Map<String, String>> partitions, RowType rowType, String defaultPartValue) { + return PredicateBuilder.or( + partitions.stream() + .map(p -> createPartitionPredicate(p, rowType, defaultPartValue)) + .toArray(Predicate[]::new)); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 3136bce5f..aae2ccfc9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -41,12 +41,10 @@ import org.apache.paimon.table.source.DeletionFile; import org.apache.paimon.table.source.PlanImpl; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.table.source.SplitGenerator; -import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; -import org.apache.paimon.utils.TypeUtils; import javax.annotation.Nullable; @@ -60,10 +58,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.BiConsumer; -import java.util.stream.Collectors; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles; +import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; /** Implementation of {@link SnapshotReader}. */ @@ -142,25 +140,12 @@ public class SnapshotReaderImpl implements SnapshotReader { @Override public SnapshotReader withPartitionFilter(Map<String, String> partitionSpec) { if (partitionSpec != null) { - List<String> partitionKeys = tableSchema.partitionKeys(); - RowType rowType = tableSchema.logicalPartitionType(); - PredicateBuilder predicateBuilder = new PredicateBuilder(rowType); - List<Predicate> partitionFilters = - partitionSpec.entrySet().stream() - .map( - m -> { - int index = partitionKeys.indexOf(m.getKey()); - Object value = - TypeUtils.castFromStringInternal( - m.getValue(), - rowType.getTypeAt(index), - false); - return value == null - ? predicateBuilder.isNull(index) - : predicateBuilder.equal(index, value); - }) - .collect(Collectors.toList()); - scan.withPartitionFilter(PredicateBuilder.and(partitionFilters)); + Predicate partitionPredicate = + createPartitionPredicate( + partitionSpec, + tableSchema.logicalPartitionType(), + options.partitionDefaultName()); + scan.withPartitionFilter(partitionPredicate); } return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java index 0f3ad7fec..7696f9ada 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java @@ -39,7 +39,7 @@ public class FileStorePathFactory { private final Path root; private final String uuid; - private final RowDataPartitionComputer partitionComputer; + private final InternalRowPartitionComputer partitionComputer; private final String formatIdentifier; private final AtomicInteger manifestFileCount; @@ -68,10 +68,10 @@ public class FileStorePathFactory { } @VisibleForTesting - public static RowDataPartitionComputer getPartitionComputer( + public static InternalRowPartitionComputer getPartitionComputer( RowType partitionType, String defaultPartValue) { String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]); - return new RowDataPartitionComputer(defaultPartValue, partitionType, partitionColumns); + return new InternalRowPartitionComputer(defaultPartValue, partitionType, partitionColumns); } public Path newManifestFile() { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 26cb564ca..5b1e55f6f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -73,6 +73,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; +import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -652,8 +653,10 @@ public class FileStoreCommitTest { partitions.stream() .map( partition -> - PredicateBuilder.partition( - partition, TestKeyValueGenerator.DEFAULT_PART_TYPE)) + createPartitionPredicate( + partition, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + CoreOptions.PARTITION_DEFAULT_NAME.defaultValue())) .reduce(PredicateBuilder::or) .get(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 90effe57f..7896ec923 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -32,8 +32,8 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Preconditions; -import org.apache.paimon.utils.RowDataPartitionComputer; import org.apache.paimon.utils.StringUtils; import org.apache.flink.table.api.TableSchema; @@ -860,7 +860,7 @@ public class FlinkCatalog extends AbstractCatalog { org.apache.paimon.types.RowType partitionRowType = fileStoreTable.schema().logicalPartitionType(); - RowDataPartitionComputer partitionComputer = + InternalRowPartitionComputer partitionComputer = FileStorePathFactory.getPartitionComputer( partitionRowType, new CoreOptions(table.options()).partitionDefaultName()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java index 0936943ed..03717ab01 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java @@ -42,6 +42,8 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; + /** Compact with sort action. */ public class SortCompactAction extends CompactAction { @@ -96,7 +98,14 @@ public class SortCompactAction extends CompactAction { Predicate partitionPredicate = PredicateBuilder.or( getPartitions().stream() - .map(p -> PredicateBuilder.partition(p, table.rowType())) + .map( + p -> + createPartitionPredicate( + p, + table.rowType(), + ((FileStoreTable) table) + .coreOptions() + .partitionDefaultName())) .toArray(Predicate[]::new)); sourceBuilder.predicate(partitionPredicate); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java index 6a9793f44..c525b133b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java @@ -21,7 +21,6 @@ package org.apache.paimon.flink.action; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.sink.FlinkSinkBuilder; -import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.flink.api.dag.Transformation; @@ -60,7 +59,7 @@ public abstract class TableActionBase extends ActionBase { public TableResult batchSink(DataStream<RowData> dataStream) { List<Transformation<?>> transformations = Collections.singletonList( - new FlinkSinkBuilder((FileStoreTable) table) + new FlinkSinkBuilder(table) .forRowData(dataStream) .build() .getTransformation()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java index 7d986463d..edbca53b2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java @@ -24,7 +24,6 @@ import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.flink.sink.UnawareBucketCompactionSink; import org.apache.paimon.flink.source.BucketUnawareCompactSource; import org.apache.paimon.options.Options; -import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.streaming.api.datastream.DataStream; @@ -38,6 +37,8 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; + /** * Build for unaware-bucket table flink compaction job. * @@ -96,7 +97,10 @@ public class UnawareBucketCompactionTopoBuilder { isContinuous, scanInterval, specifiedPartitions != null - ? PredicateBuilder.partitions(specifiedPartitions, table.rowType()) + ? createPartitionPredicate( + specifiedPartitions, + table.rowType(), + table.coreOptions().partitionDefaultName()) : null); return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 3f81fdf9a..e2efbcbc5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -28,13 +28,12 @@ import org.apache.paimon.flink.FlinkRowWrapper; import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.OutOfRangeException; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileIOUtils; -import org.apache.paimon.utils.InternalRowUtils; +import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints; @@ -57,8 +56,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -69,6 +70,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_CACHE_MODE; import static org.apache.paimon.flink.query.RemoteTableQuery.isRemoteServiceAvailable; import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CACHE_ROWS; import static org.apache.paimon.lookup.RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL; +import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping; /** A lookup {@link TableFunction} for file store. */ @@ -252,16 +254,17 @@ public class FileStoreLookupFunction implements Serializable, Closeable { private Predicate createSpecificPartFilter(BinaryRow partition) { RowType rowType = table.rowType(); - List<String> fieldNames = rowType.getFieldNames(); List<String> partitionKeys = table.partitionKeys(); - PredicateBuilder builder = new PredicateBuilder(rowType); - List<Predicate> predicates = new ArrayList<>(); - for (int i = 0; i < partitionKeys.size(); i++) { - int index = fieldNames.indexOf(partitionKeys.get(i)); - Object value = InternalRowUtils.get(partition, i, rowType.getTypeAt(index)); - predicates.add(value == null ? builder.isNull(index) : builder.equal(index, value)); + Object[] partitionSpec = + new RowDataToObjectArrayConverter(rowType.project(partitionKeys)) + .convert(partition); + Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length); + for (int i = 0; i < partitionSpec.length; i++) { + partitionMap.put(partitionKeys.get(i), partitionSpec[i]); } - return PredicateBuilder.and(predicates); + + // create partition predicate base on rowType instead of partitionType + return createPartitionPredicate(rowType, partitionMap); } private void reopen() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java index e963c92fc..2b9e52466 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java @@ -43,6 +43,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; + /** * Source builder to build a Flink {@link StaticFileStoreSource} or {@link * ContinuousFileStoreSource}. This is for dedicated compactor jobs. @@ -89,7 +91,13 @@ public class CompactorSourceBuilder { partitionPredicate = PredicateBuilder.or( specifiedPartitions.stream() - .map(p -> PredicateBuilder.partition(p, table.rowType())) + .map( + p -> + createPartitionPredicate( + p, + table.rowType(), + table.coreOptions() + .partitionDefaultName())) .toArray(Predicate[]::new)); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java index 031b1848a..64e01b253 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveMetastoreClient.java @@ -23,8 +23,8 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.metastore.MetastoreClient; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.PartitionPathUtils; -import org.apache.paimon.utils.RowDataPartitionComputer; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -40,7 +40,7 @@ import java.util.List; public class HiveMetastoreClient implements MetastoreClient { private final Identifier identifier; - private final RowDataPartitionComputer partitionComputer; + private final InternalRowPartitionComputer partitionComputer; private final IMetaStoreClient client; private final StorageDescriptor sd; @@ -49,7 +49,7 @@ public class HiveMetastoreClient implements MetastoreClient { throws Exception { this.identifier = identifier; this.partitionComputer = - new RowDataPartitionComputer( + new InternalRowPartitionComputer( new CoreOptions(schema.options()).partitionDefaultName(), schema.logicalPartitionType(), schema.partitionKeys().toArray(new String[0])); diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java index 6dab4d4af..33cbc19e0 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java @@ -19,6 +19,7 @@ package org.apache.paimon.hive.utils; import org.apache.paimon.hive.mapred.PaimonInputSplit; +import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.table.FileStoreTable; @@ -45,6 +46,7 @@ import static java.util.Collections.singletonMap; import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME; import static org.apache.paimon.hive.utils.HiveUtils.createPredicate; import static org.apache.paimon.hive.utils.HiveUtils.extractTagName; +import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; /** Generator to generate hive input splits. */ public class HiveSplitGenerator { @@ -85,7 +87,8 @@ public class HiveSplitGenerator { createPartitionPredicate( table.schema().logicalRowType(), table.schema().partitionKeys(), - location) + location, + table.coreOptions().partitionDefaultName()) .ifPresent(predicatePerPartition::add); scan = table.newScan(); @@ -105,7 +108,10 @@ public class HiveSplitGenerator { } private static Optional<Predicate> createPartitionPredicate( - RowType rowType, List<String> partitionKeys, String partitionDir) { + RowType rowType, + List<String> partitionKeys, + String partitionDir, + String defaultPartName) { Set<String> partitionKeySet = new HashSet<>(partitionKeys); LinkedHashMap<String, String> partition = new LinkedHashMap<>(); for (String s : partitionDir.split("/")) { @@ -124,7 +130,9 @@ public class HiveSplitGenerator { if (partition.isEmpty() || partition.size() != partitionKeys.size()) { return Optional.empty(); } else { - return Optional.ofNullable(PredicateBuilder.partition(partition, rowType)); + return Optional.ofNullable( + PartitionPredicate.createPartitionPredicate( + partition, rowType, defaultPartName)); } } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala index 8c5192e01..3618bce56 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala @@ -23,7 +23,7 @@ import org.apache.paimon.operation.FileStoreCommit import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.BatchWriteBuilder import org.apache.paimon.types.RowType -import org.apache.paimon.utils.{FileStorePathFactory, RowDataPartitionComputer} +import org.apache.paimon.utils.InternalRowPartitionComputer import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} @@ -54,7 +54,7 @@ trait PaimonPartitionManagement extends SupportsPartitionManagement { .createToScalaConverter(partitionSchema()) .apply(internalRow) .asInstanceOf[Row] - val rowDataPartitionComputer = new RowDataPartitionComputer( + val rowDataPartitionComputer = new InternalRowPartitionComputer( CoreOptions.PARTITION_DEFAULT_NAME.defaultValue, tableRowType, partitionKeys.asScala.toArray) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index 7ce78e79c..03df87634 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -29,7 +29,7 @@ import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL import org.apache.paimon.table.FileStoreTable import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage} import org.apache.paimon.types.RowKind -import org.apache.paimon.utils.RowDataPartitionComputer +import org.apache.paimon.utils.InternalRowPartitionComputer import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.PaimonUtils.createDataset @@ -83,7 +83,7 @@ case class DeleteFromPaimonTableCommand( ) { val matchedPartitions = table.newSnapshotReader().withPartitionFilter(partitionPredicate.get).partitions().asScala - val rowDataPartitionComputer = new RowDataPartitionComputer( + val rowDataPartitionComputer = new InternalRowPartitionComputer( CoreOptions.PARTITION_DEFAULT_NAME.defaultValue, table.schema().logicalPartitionType(), table.partitionKeys.asScala.toArray diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala index ea23790ca..dbce04719 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/StreamHelper.scala @@ -25,7 +25,7 @@ import org.apache.paimon.table.DataTable import org.apache.paimon.table.source.{DataSplit, InnerStreamTableScan} import org.apache.paimon.table.source.TableScan.Plan import org.apache.paimon.table.source.snapshot.StartingContext -import org.apache.paimon.utils.{RowDataPartitionComputer, TypeUtils} +import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils} import org.apache.spark.sql.connector.read.streaming.ReadLimit import org.apache.spark.sql.execution.datasources.PartitioningUtils @@ -49,11 +49,12 @@ private[spark] trait StreamHelper { private lazy val partitionSchema: StructType = SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(), table.partitionKeys())) - private lazy val partitionComputer: RowDataPartitionComputer = new RowDataPartitionComputer( - new CoreOptions(table.options).partitionDefaultName, - TypeUtils.project(table.rowType(), table.partitionKeys()), - table.partitionKeys().asScala.toArray - ) + private lazy val partitionComputer: InternalRowPartitionComputer = + new InternalRowPartitionComputer( + new CoreOptions(table.options).partitionDefaultName, + TypeUtils.project(table.rowType(), table.partitionKeys()), + table.partitionKeys().asScala.toArray + ) // Used to get the initial offset. lazy val streamScanStartingContext: StartingContext = streamScan.startingContext()
