This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b126f9d46 [core] Add compaction when creating Iceberg compatible 
metadata (#4005)
b126f9d46 is described below

commit b126f9d46e6fe7253cbf66e2a597493c13afad37
Author: tsreaper <[email protected]>
AuthorDate: Tue Aug 20 16:59:41 2024 +0800

    [core] Add compaction when creating Iceberg compatible metadata (#4005)
---
 .../iceberg/AbstractIcebergCommitCallback.java     | 87 +++++++++++++++++++++-
 .../paimon/iceberg/IcebergCompatibilityTest.java   | 17 +++--
 2 files changed, 98 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
index 210e56ed9..c1ab2933d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
@@ -40,6 +40,8 @@ import 
org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.table.FileStoreTable;
@@ -52,6 +54,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.ManifestReadThreadPool;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -66,6 +69,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -77,6 +81,15 @@ public abstract class AbstractIcebergCommitCallback 
implements CommitCallback {
     // see org.apache.iceberg.hadoop.Util
     private static final String VERSION_HINT_FILENAME = "version-hint.text";
 
+    static final ConfigOption<Integer> COMPACT_MIN_FILE_NUM =
+            ConfigOptions.key("metadata.iceberg.compaction.min.file-num")
+                    .intType()
+                    .defaultValue(10);
+    static final ConfigOption<Integer> COMPACT_MAX_FILE_NUM =
+            ConfigOptions.key("metadata.iceberg.compaction.max.file-num")
+                    .intType()
+                    .defaultValue(50);
+
     protected final FileStoreTable table;
     private final String commitUser;
     private final IcebergPathFactory pathFactory;
@@ -289,7 +302,9 @@ public abstract class AbstractIcebergCommitCallback 
implements CommitCallback {
             newManifestFileMetas = result.getLeft();
             snapshotSummary = result.getRight();
         }
-        String manifestListFileName = 
manifestList.writeWithoutRolling(newManifestFileMetas);
+        String manifestListFileName =
+                manifestList.writeWithoutRolling(
+                        compactMetadataIfNeeded(newManifestFileMetas, 
snapshotId));
 
         // add new schema if needed
         int schemaId = (int) table.schema().id();
@@ -491,6 +506,10 @@ public abstract class AbstractIcebergCommitCallback 
implements CommitCallback {
                     newManifestFileMetas.addAll(
                             manifestFile.rollingWrite(newEntries.iterator(), 
currentSnapshotId));
                 }
+            } else {
+                // partition of this file meta is not modified in this 
snapshot,
+                // use this file meta again
+                newManifestFileMetas.add(fileMeta);
             }
         }
 
@@ -509,6 +528,72 @@ public abstract class AbstractIcebergCommitCallback 
implements CommitCallback {
         return result;
     }
 
+    private List<IcebergManifestFileMeta> compactMetadataIfNeeded(
+            List<IcebergManifestFileMeta> toCompact, long currentSnapshotId) 
throws IOException {
+        List<IcebergManifestFileMeta> result = new ArrayList<>();
+        long targetSizeInBytes = 
table.coreOptions().manifestTargetSize().getBytes();
+
+        List<IcebergManifestFileMeta> candidates = new ArrayList<>();
+        long totalSizeInBytes = 0;
+        for (IcebergManifestFileMeta meta : toCompact) {
+            if (meta.manifestLength() < targetSizeInBytes * 2 / 3) {
+                candidates.add(meta);
+                totalSizeInBytes += meta.manifestLength();
+            } else {
+                result.add(meta);
+            }
+        }
+
+        Options options = new Options(table.options());
+        if (candidates.size() < options.get(COMPACT_MIN_FILE_NUM)) {
+            return toCompact;
+        }
+        if (candidates.size() < options.get(COMPACT_MAX_FILE_NUM)
+                && totalSizeInBytes < targetSizeInBytes) {
+            return toCompact;
+        }
+
+        Function<IcebergManifestFileMeta, List<IcebergManifestEntry>> 
processor =
+                meta -> {
+                    List<IcebergManifestEntry> entries = new ArrayList<>();
+                    for (IcebergManifestEntry entry :
+                            manifestFile.read(new 
Path(meta.manifestPath()).getName())) {
+                        if (entry.fileSequenceNumber() == currentSnapshotId
+                                || entry.status() == 
IcebergManifestEntry.Status.EXISTING) {
+                            entries.add(entry);
+                        } else {
+                            // rewrite status if this entry is from an older 
snapshot
+                            IcebergManifestEntry.Status newStatus;
+                            if (entry.status() == 
IcebergManifestEntry.Status.ADDED) {
+                                newStatus = 
IcebergManifestEntry.Status.EXISTING;
+                            } else if (entry.status() == 
IcebergManifestEntry.Status.DELETED) {
+                                continue;
+                            } else {
+                                throw new UnsupportedOperationException(
+                                        "Unknown IcebergManifestEntry.Status " 
+ entry.status());
+                            }
+                            entries.add(
+                                    new IcebergManifestEntry(
+                                            newStatus,
+                                            entry.snapshotId(),
+                                            entry.sequenceNumber(),
+                                            entry.fileSequenceNumber(),
+                                            entry.file()));
+                        }
+                    }
+                    if (meta.sequenceNumber() == currentSnapshotId) {
+                        // this file is created for this snapshot, so it is 
not recorded in any
+                        // iceberg metas, we need to clean it
+                        table.fileIO().deleteQuietly(new 
Path(meta.manifestPath()));
+                    }
+                    return entries;
+                };
+        Iterable<IcebergManifestEntry> newEntries =
+                ManifestReadThreadPool.sequentialBatchedExecute(processor, 
candidates, null);
+        result.addAll(manifestFile.rollingWrite(newEntries.iterator(), 
currentSnapshotId));
+        return result;
+    }
+
     @Override
     public void close() throws Exception {}
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 4efeaa855..067fad2b1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -258,8 +259,8 @@ public class IcebergCompatibilityTest {
                         },
                         new String[] {"k1", "k2", "v1", "v2"});
 
-        int numRounds = 5;
-        int numRecords = 500;
+        int numRounds = 20;
+        int numRecords = 1000;
         ThreadLocalRandom random = ThreadLocalRandom.current();
         List<List<TestRecord>> testRecords = new ArrayList<>();
         List<List<String>> expected = new ArrayList<>();
@@ -316,16 +317,18 @@ public class IcebergCompatibilityTest {
                     return b;
                 };
 
-        int numRounds = 2;
-        int numRecords = 3;
+        int numRounds = 20;
+        int numRecords = 500;
         ThreadLocalRandom random = ThreadLocalRandom.current();
+        boolean samePartitionEachRound = random.nextBoolean();
+
         List<List<TestRecord>> testRecords = new ArrayList<>();
         List<List<String>> expected = new ArrayList<>();
         Map<String, String> expectedMap = new LinkedHashMap<>();
         for (int r = 0; r < numRounds; r++) {
             List<TestRecord> round = new ArrayList<>();
             for (int i = 0; i < numRecords; i++) {
-                int pt1 = random.nextInt(0, 2);
+                int pt1 = (random.nextInt(0, samePartitionEachRound ? 1 : 2) + 
r) % 3;
                 String pt2 = String.valueOf(random.nextInt(10, 12));
                 String k = String.valueOf(random.nextInt(0, 100));
                 int v1 = random.nextInt();
@@ -551,6 +554,10 @@ public class IcebergCompatibilityTest {
         options.set(CoreOptions.BUCKET, numBuckets);
         options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true);
         options.set(CoreOptions.FILE_FORMAT, "avro");
+        options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32));
+        options.set(AbstractIcebergCommitCallback.COMPACT_MIN_FILE_NUM, 4);
+        options.set(AbstractIcebergCommitCallback.COMPACT_MIN_FILE_NUM, 8);
+        options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE, 
MemorySize.ofKibiBytes(8));
         Schema schema =
                 new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
options.toMap(), "");
 

Reply via email to