This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b126f9d46 [core] Add compaction when creating Iceberg compatible
metadata (#4005)
b126f9d46 is described below
commit b126f9d46e6fe7253cbf66e2a597493c13afad37
Author: tsreaper <[email protected]>
AuthorDate: Tue Aug 20 16:59:41 2024 +0800
[core] Add compaction when creating Iceberg compatible metadata (#4005)
---
.../iceberg/AbstractIcebergCommitCallback.java | 87 +++++++++++++++++++++-
.../paimon/iceberg/IcebergCompatibilityTest.java | 17 +++--
2 files changed, 98 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
index 210e56ed9..c1ab2933d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
@@ -40,6 +40,8 @@ import
org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
@@ -52,6 +54,7 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
@@ -66,6 +69,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -77,6 +81,15 @@ public abstract class AbstractIcebergCommitCallback
implements CommitCallback {
// see org.apache.iceberg.hadoop.Util
private static final String VERSION_HINT_FILENAME = "version-hint.text";
+ static final ConfigOption<Integer> COMPACT_MIN_FILE_NUM =
+ ConfigOptions.key("metadata.iceberg.compaction.min.file-num")
+ .intType()
+ .defaultValue(10);
+ static final ConfigOption<Integer> COMPACT_MAX_FILE_NUM =
+ ConfigOptions.key("metadata.iceberg.compaction.max.file-num")
+ .intType()
+ .defaultValue(50);
+
protected final FileStoreTable table;
private final String commitUser;
private final IcebergPathFactory pathFactory;
@@ -289,7 +302,9 @@ public abstract class AbstractIcebergCommitCallback
implements CommitCallback {
newManifestFileMetas = result.getLeft();
snapshotSummary = result.getRight();
}
- String manifestListFileName =
manifestList.writeWithoutRolling(newManifestFileMetas);
+ String manifestListFileName =
+ manifestList.writeWithoutRolling(
+ compactMetadataIfNeeded(newManifestFileMetas,
snapshotId));
// add new schema if needed
int schemaId = (int) table.schema().id();
@@ -491,6 +506,10 @@ public abstract class AbstractIcebergCommitCallback
implements CommitCallback {
newManifestFileMetas.addAll(
manifestFile.rollingWrite(newEntries.iterator(),
currentSnapshotId));
}
+ } else {
+ // partition of this file meta is not modified in this
snapshot,
+ // use this file meta again
+ newManifestFileMetas.add(fileMeta);
}
}
@@ -509,6 +528,72 @@ public abstract class AbstractIcebergCommitCallback
implements CommitCallback {
return result;
}
+ private List<IcebergManifestFileMeta> compactMetadataIfNeeded(
+ List<IcebergManifestFileMeta> toCompact, long currentSnapshotId)
throws IOException {
+ List<IcebergManifestFileMeta> result = new ArrayList<>();
+ long targetSizeInBytes =
table.coreOptions().manifestTargetSize().getBytes();
+
+ List<IcebergManifestFileMeta> candidates = new ArrayList<>();
+ long totalSizeInBytes = 0;
+ for (IcebergManifestFileMeta meta : toCompact) {
+ if (meta.manifestLength() < targetSizeInBytes * 2 / 3) {
+ candidates.add(meta);
+ totalSizeInBytes += meta.manifestLength();
+ } else {
+ result.add(meta);
+ }
+ }
+
+ Options options = new Options(table.options());
+ if (candidates.size() < options.get(COMPACT_MIN_FILE_NUM)) {
+ return toCompact;
+ }
+ if (candidates.size() < options.get(COMPACT_MAX_FILE_NUM)
+ && totalSizeInBytes < targetSizeInBytes) {
+ return toCompact;
+ }
+
+ Function<IcebergManifestFileMeta, List<IcebergManifestEntry>>
processor =
+ meta -> {
+ List<IcebergManifestEntry> entries = new ArrayList<>();
+ for (IcebergManifestEntry entry :
+ manifestFile.read(new
Path(meta.manifestPath()).getName())) {
+ if (entry.fileSequenceNumber() == currentSnapshotId
+ || entry.status() ==
IcebergManifestEntry.Status.EXISTING) {
+ entries.add(entry);
+ } else {
+ // rewrite status if this entry is from an older
snapshot
+ IcebergManifestEntry.Status newStatus;
+ if (entry.status() ==
IcebergManifestEntry.Status.ADDED) {
+ newStatus =
IcebergManifestEntry.Status.EXISTING;
+ } else if (entry.status() ==
IcebergManifestEntry.Status.DELETED) {
+ continue;
+ } else {
+ throw new UnsupportedOperationException(
+ "Unknown IcebergManifestEntry.Status "
+ entry.status());
+ }
+ entries.add(
+ new IcebergManifestEntry(
+ newStatus,
+ entry.snapshotId(),
+ entry.sequenceNumber(),
+ entry.fileSequenceNumber(),
+ entry.file()));
+ }
+ }
+ if (meta.sequenceNumber() == currentSnapshotId) {
+ // this file is created for this snapshot, so it is
not recorded in any
+ // iceberg metas, we need to clean it
+ table.fileIO().deleteQuietly(new
Path(meta.manifestPath()));
+ }
+ return entries;
+ };
+ Iterable<IcebergManifestEntry> newEntries =
+ ManifestReadThreadPool.sequentialBatchedExecute(processor,
candidates, null);
+ result.addAll(manifestFile.rollingWrite(newEntries.iterator(),
currentSnapshotId));
+ return result;
+ }
+
@Override
public void close() throws Exception {}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 4efeaa855..067fad2b1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -29,6 +29,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -258,8 +259,8 @@ public class IcebergCompatibilityTest {
},
new String[] {"k1", "k2", "v1", "v2"});
- int numRounds = 5;
- int numRecords = 500;
+ int numRounds = 20;
+ int numRecords = 1000;
ThreadLocalRandom random = ThreadLocalRandom.current();
List<List<TestRecord>> testRecords = new ArrayList<>();
List<List<String>> expected = new ArrayList<>();
@@ -316,16 +317,18 @@ public class IcebergCompatibilityTest {
return b;
};
- int numRounds = 2;
- int numRecords = 3;
+ int numRounds = 20;
+ int numRecords = 500;
ThreadLocalRandom random = ThreadLocalRandom.current();
+ boolean samePartitionEachRound = random.nextBoolean();
+
List<List<TestRecord>> testRecords = new ArrayList<>();
List<List<String>> expected = new ArrayList<>();
Map<String, String> expectedMap = new LinkedHashMap<>();
for (int r = 0; r < numRounds; r++) {
List<TestRecord> round = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
- int pt1 = random.nextInt(0, 2);
+ int pt1 = (random.nextInt(0, samePartitionEachRound ? 1 : 2) +
r) % 3;
String pt2 = String.valueOf(random.nextInt(10, 12));
String k = String.valueOf(random.nextInt(0, 100));
int v1 = random.nextInt();
@@ -551,6 +554,10 @@ public class IcebergCompatibilityTest {
options.set(CoreOptions.BUCKET, numBuckets);
options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true);
options.set(CoreOptions.FILE_FORMAT, "avro");
+ options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32));
+ options.set(AbstractIcebergCommitCallback.COMPACT_MIN_FILE_NUM, 4);
+ options.set(AbstractIcebergCommitCallback.COMPACT_MIN_FILE_NUM, 8);
+ options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE,
MemorySize.ofKibiBytes(8));
Schema schema =
new Schema(rowType.getFields(), partitionKeys, primaryKeys,
options.toMap(), "");