JingsongLi commented on code in PR #1199:
URL: https://github.com/apache/incubator-paimon/pull/1199#discussion_r1208778855
##########
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java:
##########
@@ -185,4 +207,132 @@ private static void mergeCandidates(
newMetas.addAll(merged);
}
}
+
+ public static List<ManifestFileMeta> tryFullCompaction(
+ List<ManifestFileMeta> inputs,
+ List<ManifestFileMeta> newMetas,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ long suggestedMetaSize,
+ long sizeTrigger) {
+ List<ManifestFileMeta> base = new ArrayList<>();
+ int totalManifestSize = 0;
+ int i;
+ for (i = 0; i < inputs.size(); i++) {
+ ManifestFileMeta manifestFileMeta = inputs.get(i);
+ if (manifestFileMeta.numDeletedFiles == 0
+ && manifestFileMeta.fileSize >= suggestedMetaSize) {
+ base.add(manifestFileMeta);
+ totalManifestSize += manifestFileMeta.fileSize;
+ } else {
+ break;
+ }
+ }
+
+ List<ManifestFileMeta> delta = new ArrayList<>();
+ for (; i < inputs.size(); i++) {
+ ManifestFileMeta manifestFileMeta = inputs.get(i);
+ delta.add(manifestFileMeta);
+ totalManifestSize += manifestFileMeta.fileSize;
+ }
+
+ long deltaDeleteFileNum = 0;
+ long totalDeltaFileSize = 0;
+ for (ManifestFileMeta manifestFileMeta : delta) {
+ deltaDeleteFileNum += manifestFileMeta.numDeletedFiles();
+ totalDeltaFileSize += manifestFileMeta.fileSize();
+ totalManifestSize += manifestFileMeta.fileSize();
+ }
+
+ if (totalDeltaFileSize < sizeTrigger) {
+ return inputs;
+ }
+
+ logger.info(
+ "Start Manifest File Full Compaction,pick the number of delete
file: {} ,total manifest file size: {}",
+ deltaDeleteFileNum,
+ totalManifestSize);
+ // do full-compaction
+ Map<ManifestEntry.Identifier, ManifestEntry> deltaMerged = new
LinkedHashMap<>();
+ for (ManifestFileMeta manifest : delta) {
+ ManifestEntry.mergeEntries(manifestFile.read(manifest.fileName),
deltaMerged);
+ }
+ Set<BinaryRow> deletePartitions = computeDeletePartitions(deltaMerged);
+
+ Optional<Predicate> predicateOpt =
+ convertPartitionToPredicate(partitionType, deletePartitions);
+ List<ManifestFileMeta> result = new ArrayList<>();
+
+ if (predicateOpt.isPresent()) {
+ Predicate predicate = predicateOpt.get();
+ FieldStatsArraySerializer fieldStatsArraySerializer =
+ new FieldStatsArraySerializer(partitionType);
+ int j;
+ for (j = 0; j < base.size(); j++) {
+ // todo optimize this to binary search.
+ ManifestFileMeta file = base.get(j);
+ if (predicate.test(
+ file.numAddedFiles + file.numDeletedFiles,
+
file.partitionStats.fields(fieldStatsArraySerializer))) {
+ break;
+ } else {
+ result.add(file);
+ }
+ }
+
+ Map<ManifestEntry.Identifier, ManifestEntry> fullMerged = new
LinkedHashMap<>();
+ for (; j < base.size(); j++) {
+ ManifestFileMeta manifestFileMeta = base.get(j);
+ ManifestEntry.mergeEntries(
+ manifestFile.read(manifestFileMeta.fileName),
fullMerged);
+ }
+ ManifestEntry.mergeEntries(deltaMerged.values(), fullMerged);
+ if (!fullMerged.isEmpty()) {
+ List<ManifestFileMeta> merged =
+ manifestFile.write(new
ArrayList<>(fullMerged.values()));
+ result.addAll(merged);
+ newMetas.addAll(merged);
+ }
+ } else {
+ result.addAll(base);
+ if (!deltaMerged.isEmpty()) {
+ List<ManifestFileMeta> merged =
+ manifestFile.write(new
ArrayList<>(deltaMerged.values()));
+ result.addAll(merged);
+ newMetas.addAll(merged);
+ }
+ }
+
+ return result;
+ }
+
+ private static Set<BinaryRow> computeDeletePartitions(
+ Map<ManifestEntry.Identifier, ManifestEntry> deltaMerged) {
+ Set<BinaryRow> partitions = new HashSet<>();
+ for (ManifestEntry manifestEntry : deltaMerged.values()) {
+ if (manifestEntry.kind() == FileKind.DELETE) {
+ BinaryRow partition = manifestEntry.partition();
+ partitions.add(partition);
+ }
+ }
+ return partitions;
+ }
+
+ private static Optional<Predicate> convertPartitionToPredicate(
+ RowType partitionType, Set<BinaryRow> partitions) {
+ Optional<Predicate> predicateOpt;
+ if (!partitions.isEmpty()) {
+ RowDataToObjectArrayConverter rowDataToObjectArrayConverter =
+ new RowDataToObjectArrayConverter(partitionType);
+
+ List<Predicate> predicateList =
+ partitions.stream()
+
.map(rowDataToObjectArrayConverter::createEqualPredicate)
Review Comment:
How about a table without partition fields? Are there some exceptions?
##########
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java:
##########
@@ -18,21 +18,33 @@
package org.apache.paimon.manifest;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.stats.BinaryTableStats;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** Metadata of a manifest file. */
public class ManifestFileMeta {
+ private static Logger logger =
LoggerFactory.getLogger(ManifestFileMeta.class);
Review Comment:
see above comments.
##########
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFileMeta.java:
##########
@@ -185,4 +207,132 @@ private static void mergeCandidates(
newMetas.addAll(merged);
}
}
+
+ public static List<ManifestFileMeta> tryFullCompaction(
+ List<ManifestFileMeta> inputs,
+ List<ManifestFileMeta> newMetas,
+ ManifestFile manifestFile,
+ RowType partitionType,
+ long suggestedMetaSize,
+ long sizeTrigger) {
+ List<ManifestFileMeta> base = new ArrayList<>();
+ int totalManifestSize = 0;
+ int i;
+ for (i = 0; i < inputs.size(); i++) {
+ ManifestFileMeta manifestFileMeta = inputs.get(i);
+ if (manifestFileMeta.numDeletedFiles == 0
+ && manifestFileMeta.fileSize >= suggestedMetaSize) {
+ base.add(manifestFileMeta);
+ totalManifestSize += manifestFileMeta.fileSize;
+ } else {
+ break;
+ }
+ }
+
+ List<ManifestFileMeta> delta = new ArrayList<>();
+ for (; i < inputs.size(); i++) {
+ ManifestFileMeta manifestFileMeta = inputs.get(i);
+ delta.add(manifestFileMeta);
+ totalManifestSize += manifestFileMeta.fileSize;
+ }
+
+ long deltaDeleteFileNum = 0;
+ long totalDeltaFileSize = 0;
+ for (ManifestFileMeta manifestFileMeta : delta) {
+ deltaDeleteFileNum += manifestFileMeta.numDeletedFiles();
+ totalDeltaFileSize += manifestFileMeta.fileSize();
+ totalManifestSize += manifestFileMeta.fileSize();
+ }
+
+ if (totalDeltaFileSize < sizeTrigger) {
+ return inputs;
+ }
+
+ logger.info(
+ "Start Manifest File Full Compaction,pick the number of delete
file: {} ,total manifest file size: {}",
+ deltaDeleteFileNum,
+ totalManifestSize);
+ // do full-compaction
+ Map<ManifestEntry.Identifier, ManifestEntry> deltaMerged = new
LinkedHashMap<>();
+ for (ManifestFileMeta manifest : delta) {
+ ManifestEntry.mergeEntries(manifestFile.read(manifest.fileName),
deltaMerged);
+ }
+ Set<BinaryRow> deletePartitions = computeDeletePartitions(deltaMerged);
+
+ Optional<Predicate> predicateOpt =
+ convertPartitionToPredicate(partitionType, deletePartitions);
+ List<ManifestFileMeta> result = new ArrayList<>();
+
+ if (predicateOpt.isPresent()) {
+ Predicate predicate = predicateOpt.get();
+ FieldStatsArraySerializer fieldStatsArraySerializer =
+ new FieldStatsArraySerializer(partitionType);
+ int j;
+ for (j = 0; j < base.size(); j++) {
+ // todo optimize this to binary search.
+ ManifestFileMeta file = base.get(j);
+ if (predicate.test(
+ file.numAddedFiles + file.numDeletedFiles,
+
file.partitionStats.fields(fieldStatsArraySerializer))) {
+ break;
+ } else {
+ result.add(file);
+ }
+ }
+
+ Map<ManifestEntry.Identifier, ManifestEntry> fullMerged = new
LinkedHashMap<>();
+ for (; j < base.size(); j++) {
+ ManifestFileMeta manifestFileMeta = base.get(j);
+ ManifestEntry.mergeEntries(
+ manifestFile.read(manifestFileMeta.fileName),
fullMerged);
+ }
+ ManifestEntry.mergeEntries(deltaMerged.values(), fullMerged);
+ if (!fullMerged.isEmpty()) {
+ List<ManifestFileMeta> merged =
+ manifestFile.write(new
ArrayList<>(fullMerged.values()));
+ result.addAll(merged);
+ newMetas.addAll(merged);
+ }
+ } else {
+ result.addAll(base);
+ if (!deltaMerged.isEmpty()) {
Review Comment:
```
if (!deltaMerged.isEmpty()) {
List<ManifestFileMeta> merged =
manifestFile.write(new
ArrayList<>(deltaMerged.values()));
result.addAll(merged);
newMetas.addAll(merged);
}
```
The logical can be reused in below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]