This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new acb0beaea [core] Optimize manifest-full-compact and parts overwrite by
PartitionPredicate (#3864)
acb0beaea is described below
commit acb0beaead147c8834113bd998ab071a20852977
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Aug 1 10:05:08 2024 +0800
[core] Optimize manifest-full-compact and parts overwrite by
PartitionPredicate (#3864)
---
.../apache/paimon/manifest/ManifestFileMeta.java | 33 +++---------------
.../paimon/operation/FileStoreCommitImpl.java | 40 ++++++++++++----------
.../paimon/partition/PartitionPredicate.java | 18 ++++++++--
.../apache/paimon/operation/FileDeletionTest.java | 27 ++++++++-------
4 files changed, 54 insertions(+), 64 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
index 32ecc23b5..49d606c80 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java
@@ -21,8 +21,7 @@ package org.apache.paimon.manifest;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.manifest.FileEntry.Identifier;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.types.BigIntType;
@@ -30,7 +29,6 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,9 +43,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
-import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Metadata of a manifest file. */
@@ -291,11 +287,9 @@ public class ManifestFileMeta {
int j = 0;
if (partitionType.getFieldCount() > 0) {
Set<BinaryRow> deletePartitions =
computeDeletePartitions(deltaMerged);
- Optional<Predicate> predicateOpt =
- convertPartitionToPredicate(partitionType,
deletePartitions);
-
- if (predicateOpt.isPresent()) {
- Predicate predicate = predicateOpt.get();
+ PartitionPredicate predicate =
+ PartitionPredicate.fromMultiple(partitionType,
deletePartitions);
+ if (predicate != null) {
for (; j < base.size(); j++) {
// TODO: optimize this to binary search.
ManifestFileMeta file = base.get(j);
@@ -404,23 +398,4 @@ public class ManifestFileMeta {
}
return partitions;
}
-
- private static Optional<Predicate> convertPartitionToPredicate(
- RowType partitionType, Set<BinaryRow> partitions) {
- Optional<Predicate> predicateOpt;
- if (!partitions.isEmpty()) {
- RowDataToObjectArrayConverter rowArrayConverter =
- new RowDataToObjectArrayConverter(partitionType);
-
- List<Predicate> predicateList =
- partitions.stream()
- .map(rowArrayConverter::convert)
- .map(values ->
createPartitionPredicate(partitionType, values))
- .collect(Collectors.toList());
- predicateOpt = Optional.of(PredicateBuilder.or(predicateList));
- } else {
- predicateOpt = Optional.empty();
- }
- return predicateOpt;
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 39a66ae8c..4d72efe9f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -39,6 +39,7 @@ import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
@@ -422,27 +423,24 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
try {
boolean skipOverwrite = false;
// partition filter is built from static or dynamic partition
according to properties
- Predicate partitionFilter = null;
+ PartitionPredicate partitionFilter = null;
if (dynamicPartitionOverwrite) {
if (appendTableFiles.isEmpty()) {
// in dynamic mode, if there is no changes to commit, no
data will be deleted
skipOverwrite = true;
} else {
- partitionFilter =
+ Set<BinaryRow> partitions =
appendTableFiles.stream()
.map(ManifestEntry::partition)
- .distinct()
- // partition filter is built from new
data's partitions
- .map(p ->
createPartitionPredicate(partitionType, p))
- .reduce(PredicateBuilder::or)
- .orElseThrow(
- () ->
- new RuntimeException(
- "Failed to get
dynamic partition filter. This is unexpected."));
+ .collect(Collectors.toSet());
+ partitionFilter =
PartitionPredicate.fromMultiple(partitionType, partitions);
}
} else {
- partitionFilter =
+ // partition may be partial partition fields, so here must to
use predicate way.
+ Predicate partitionPredicate =
createPartitionPredicate(partition, partitionType,
partitionDefaultName);
+ partitionFilter =
+ PartitionPredicate.fromPredicate(partitionType,
partitionPredicate);
// sanity check, all changes must be done within the given
partition
if (partitionFilter != null) {
for (ManifestEntry entry : appendTableFiles) {
@@ -511,7 +509,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
partitions.stream().map(Objects::toString).collect(Collectors.joining(",")));
}
- Predicate partitionFilter =
+ // partitions may be partial partition fields, so here must to use
predicate way.
+ Predicate predicate =
partitions.stream()
.map(
partition ->
@@ -519,6 +518,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
partition, partitionType,
partitionDefaultName))
.reduce(PredicateBuilder::or)
.orElseThrow(() -> new RuntimeException("Failed to get
partition filter."));
+ PartitionPredicate partitionFilter =
+ PartitionPredicate.fromPredicate(partitionType, predicate);
tryOverwrite(
partitionFilter,
@@ -722,7 +723,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
private int tryOverwrite(
- Predicate partitionFilter,
+ @Nullable PartitionPredicate partitionFilter,
List<ManifestEntry> changes,
List<IndexManifestEntry> indexFiles,
long identifier,
@@ -784,7 +785,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
@VisibleForTesting
- public boolean tryCommitOnce(
+ boolean tryCommitOnce(
List<ManifestEntry> tableFiles,
List<ManifestEntry> changelogFiles,
List<IndexManifestEntry> indexFiles,
@@ -804,13 +805,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
:
snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshotId);
if (LOG.isDebugEnabled()) {
- LOG.debug("Ready to commit table files to snapshot #" +
newSnapshotId);
+ LOG.debug("Ready to commit table files to snapshot {}",
newSnapshotId);
for (ManifestEntry entry : tableFiles) {
- LOG.debug(" * " + entry.toString());
+ LOG.debug(" * {}", entry);
}
- LOG.debug("Ready to commit changelog to snapshot #" +
newSnapshotId);
+ LOG.debug("Ready to commit changelog to snapshot {}",
newSnapshotId);
for (ManifestEntry entry : changelogFiles) {
- LOG.debug(" * " + entry.toString());
+ LOG.debug(" * {}", entry);
}
}
@@ -1292,7 +1293,8 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return latestSnapshot -> false;
}
- public static ConflictCheck mustConflictCheck() {
+ @VisibleForTesting
+ static ConflictCheck mustConflictCheck() {
return latestSnapshot -> true;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index b8339fee2..12ea884be 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Set;
import static
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** A special predicate to filter partition only, just like {@link Predicate}.
*/
@@ -50,6 +51,10 @@ public interface PartitionPredicate {
boolean test(
long rowCount, InternalRow minValues, InternalRow maxValues,
InternalArray nullCounts);
+ /**
+ * Compared to the multiple method, this approach can accept filtering of
partially partitioned
+ * fields.
+ */
@Nullable
static PartitionPredicate fromPredicate(RowType partitionType, Predicate
predicate) {
if (partitionType.getFieldCount() == 0 || predicate == null) {
@@ -61,12 +66,17 @@ public interface PartitionPredicate {
@Nullable
static PartitionPredicate fromMultiple(RowType partitionType,
List<BinaryRow> partitions) {
+ return fromMultiple(partitionType, new HashSet<>(partitions));
+ }
+
+ @Nullable
+ static PartitionPredicate fromMultiple(RowType partitionType,
Set<BinaryRow> partitions) {
if (partitionType.getFieldCount() == 0 || partitions.isEmpty()) {
return null;
}
return new MultiplePartitionPredicate(
- new RowDataToObjectArrayConverter(partitionType), new
HashSet<>(partitions));
+ new RowDataToObjectArrayConverter(partitionType), partitions);
}
/** A {@link PartitionPredicate} using {@link Predicate}. */
@@ -127,13 +137,15 @@ public interface PartitionPredicate {
PredicateBuilder builder = new PredicateBuilder(partitionType);
for (int i = 0; i < collectors.length; i++) {
SimpleColStats stats = collectors[i].result();
- if (stats.nullCount() == partitions.size()) {
+ Long nullCount = stats.nullCount();
+ checkArgument(nullCount != null, "nullCount cannot be null!");
+ if (nullCount == partitions.size()) {
min[i] = builder.isNull(i);
max[i] = builder.isNull(i);
} else {
min[i] = builder.greaterOrEqual(i,
checkNotNull(stats.min()));
max[i] = builder.lessOrEqual(i, checkNotNull(stats.max()));
- if (stats.nullCount() > 0) {
+ if (nullCount > 0) {
min[i] = PredicateBuilder.or(builder.isNull(i),
min[i]);
max[i] = PredicateBuilder.or(builder.isNull(i),
max[i]);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 17693b073..3fb1d36ac 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -787,19 +787,20 @@ public class FileDeletionTest {
entry.file()))
.collect(Collectors.toList());
// commit
- store.newCommit()
- .tryCommitOnce(
- delete,
- Collections.emptyList(),
- Collections.emptyList(),
- commitIdentifier++,
- null,
- Collections.emptyMap(),
- Snapshot.CommitKind.APPEND,
- store.snapshotManager().latestSnapshot(),
- mustConflictCheck(),
- DEFAULT_MAIN_BRANCH,
- null);
+ try (FileStoreCommitImpl commit = store.newCommit()) {
+ commit.tryCommitOnce(
+ delete,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ commitIdentifier++,
+ null,
+ Collections.emptyMap(),
+ Snapshot.CommitKind.APPEND,
+ store.snapshotManager().latestSnapshot(),
+ mustConflictCheck(),
+ DEFAULT_MAIN_BRANCH,
+ null);
+ }
}
private void createTag(Snapshot snapshot, String tagName, Duration
timeRetained) {