This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f64e66511 [core] Add cache to manifest files (#885)
f64e66511 is described below
commit f64e66511504692445aefbaae2a8f46e4f074be1
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Apr 13 13:53:23 2023 +0800
[core] Add cache to manifest files (#885)
---
docs/content/maintenance/write-performance.md | 6 ++
.../shortcodes/generated/core_configuration.html | 6 ++
.../apache/paimon/data/RandomAccessInputView.java | 5 ++
.../main/java/org/apache/paimon/data/Segments.java | 44 ++++++++++++
.../data/serializer/InternalRowSerializer.java | 4 ++
.../paimon/memory/HeapMemorySegmentPool.java | 0
.../apache/paimon/memory/MemorySegmentPool.java | 0
.../java/org/apache/paimon/AbstractFileStore.java | 16 ++++-
.../main/java/org/apache/paimon/CoreOptions.java | 10 +++
.../org/apache/paimon/manifest/ManifestFile.java | 47 +++++-------
.../org/apache/paimon/manifest/ManifestList.java | 76 ++++++++------------
.../apache/paimon/utils/FileStorePathFactory.java | 28 ++++++++
.../org/apache/paimon/utils/ObjectSerializer.java | 5 ++
.../java/org/apache/paimon/utils/ObjectsCache.java | 84 ++++++++++++++++++++++
.../java/org/apache/paimon/utils/ObjectsFile.java | 81 +++++++++++++++++++++
.../java/org/apache/paimon/utils/PathFactory.java | 29 ++++++++
.../org/apache/paimon/utils/SegmentsCache.java | 58 +++++++++++++++
.../paimon/manifest/ManifestFileMetaTest.java | 3 +-
.../apache/paimon/manifest/ManifestFileTest.java | 3 +-
.../apache/paimon/manifest/ManifestListTest.java | 7 +-
.../paimon/table/FileStoreTableTestBase.java | 40 +++++++++++
.../apache/paimon/flink/ForceCompactionITCase.java | 2 +-
22 files changed, 466 insertions(+), 88 deletions(-)
diff --git a/docs/content/maintenance/write-performance.md
b/docs/content/maintenance/write-performance.md
index c67990d23..1f01d75b6 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -53,6 +53,12 @@ It is recommended that the parallelism of sink should be
less than or equal to t
</tbody>
</table>
+## Write Initialize
+
+In the initialization of write, the writer of the bucket needs to read all
historical files. If there is a bottleneck
+here (For example, writing a large number of partitions simultaneously), you
can use `manifest.cache-size` to cache
+the read manifest data to accelerate initialization.
+
## Compaction
### Number of Sorted Runs to Trigger Compaction
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index b55585981..49fb22a92 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -158,6 +158,12 @@
<td>Float</td>
<td>The index load factor for lookup.</td>
</tr>
+ <tr>
+ <td><h5>manifest.cache-size</h5></td>
+ <td style="word-wrap: break-word;">0 bytes</td>
+ <td>MemorySize</td>
+ <td>Cache size for reading manifest files.</td>
+ </tr>
<tr>
<td><h5>manifest.format</h5></td>
<td style="word-wrap: break-word;">avro</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/RandomAccessInputView.java
b/paimon-common/src/main/java/org/apache/paimon/data/RandomAccessInputView.java
index 4a87058c8..dec01ef1c 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/RandomAccessInputView.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/RandomAccessInputView.java
@@ -24,6 +24,7 @@ import org.apache.paimon.utils.MathUtils;
import java.io.EOFException;
import java.util.ArrayList;
+import java.util.List;
/** A {@link AbstractPagedInputView} to read pages in memory. */
public class RandomAccessInputView extends AbstractPagedInputView implements
SeekableDataInputView {
@@ -86,4 +87,8 @@ public class RandomAccessInputView extends
AbstractPagedInputView implements See
? this.limitInLastSegment
: this.segmentSize;
}
+
+ public List<MemorySegment> segments() {
+ return segments;
+ }
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/Segments.java
b/paimon-common/src/main/java/org/apache/paimon/data/Segments.java
new file mode 100644
index 000000000..3982b2f36
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/data/Segments.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data;
+
+import org.apache.paimon.memory.MemorySegment;
+
+import java.util.ArrayList;
+
+/** Segments with limit in last segment. */
+public class Segments {
+
+ private final ArrayList<MemorySegment> segments;
+
+ private final int limitInLastSegment;
+
+ public Segments(ArrayList<MemorySegment> segments, int limitInLastSegment)
{
+ this.segments = segments;
+ this.limitInLastSegment = limitInLastSegment;
+ }
+
+ public ArrayList<MemorySegment> segments() {
+ return segments;
+ }
+
+ public int limitInLastSegment() {
+ return limitInLastSegment;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index 145e5811b..b0701be48 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -132,6 +132,10 @@ public class InternalRowSerializer extends
AbstractRowDataSerializer<InternalRow
return types.length;
}
+ public DataType[] fieldTypes() {
+ return types;
+ }
+
/** Convert {@link InternalRow} into {@link BinaryRow}. TODO modify it to
code gen. */
@Override
public BinaryRow toBinaryRow(InternalRow row) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/memory/HeapMemorySegmentPool.java
b/paimon-common/src/main/java/org/apache/paimon/memory/HeapMemorySegmentPool.java
similarity index 100%
rename from
paimon-core/src/main/java/org/apache/paimon/memory/HeapMemorySegmentPool.java
rename to
paimon-common/src/main/java/org/apache/paimon/memory/HeapMemorySegmentPool.java
diff --git
a/paimon-core/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
similarity index 100%
rename from
paimon-core/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
rename to
paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentPool.java
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 20fde8b42..5f8676012 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -26,11 +26,15 @@ import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.operation.FileStoreExpireImpl;
import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.util.Comparator;
@@ -47,6 +51,8 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
protected final CoreOptions options;
protected final RowType partitionType;
+ @Nullable private final SegmentsCache<String> manifestCache;
+
public AbstractFileStore(
FileIO fileIO,
SchemaManager schemaManager,
@@ -58,6 +64,11 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
this.schemaId = schemaId;
this.options = options;
this.partitionType = partitionType;
+ MemorySize manifestCacheSize = options.manifestCacheSize();
+ this.manifestCache =
+ manifestCacheSize.getBytes() == 0
+ ? null
+ : new SegmentsCache<>(options.pageSize(),
manifestCacheSize);
}
public FileStorePathFactory pathFactory() {
@@ -81,13 +92,14 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
partitionType,
options.manifestFormat(),
pathFactory(),
- options.manifestTargetSize().getBytes());
+ options.manifestTargetSize().getBytes(),
+ manifestCache);
}
@VisibleForTesting
public ManifestList.Factory manifestListFactory() {
return new ManifestList.Factory(
- fileIO, partitionType, options.manifestFormat(),
pathFactory());
+ fileIO, options.manifestFormat(), pathFactory(),
manifestCache);
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 40ac103e3..aac6fc36f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -133,6 +133,12 @@ public class CoreOptions implements Serializable {
"To avoid frequent manifest merges, this parameter
specifies the minimum number "
+ "of ManifestFileMeta to merge.");
+ public static final ConfigOption<MemorySize> MANIFEST_CACHE_SIZE =
+ key("manifest.cache-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(0))
+ .withDescription("Cache size for reading manifest files.");
+
public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
key("partition.default-name")
.stringType()
@@ -644,6 +650,10 @@ public class CoreOptions implements Serializable {
return options.get(MANIFEST_TARGET_FILE_SIZE);
}
+ public MemorySize manifestCacheSize() {
+ return options.get(MANIFEST_CACHE_SIZE);
+ }
+
public String partitionDefaultName() {
return options.get(PARTITION_DEFAULT_NAME);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index 3f136da45..e5ea62794 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -31,9 +31,13 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.stats.FieldStatsArraySerializer;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.FileUtils;
+import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.PathFactory;
+import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.VersionedObjectSerializer;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.List;
@@ -41,15 +45,11 @@ import java.util.List;
* This file includes several {@link ManifestEntry}s, representing the
additional changes since last
* snapshot.
*/
-public class ManifestFile {
+public class ManifestFile extends ObjectsFile<ManifestEntry> {
- private final FileIO fileIO;
private final SchemaManager schemaManager;
private final RowType partitionType;
- private final ManifestEntrySerializer serializer;
- private final FormatReaderFactory readerFactory;
private final FormatWriterFactory writerFactory;
- private final FileStorePathFactory pathFactory;
private final long suggestedFileSize;
private ManifestFile(
@@ -59,15 +59,13 @@ public class ManifestFile {
ManifestEntrySerializer serializer,
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
- FileStorePathFactory pathFactory,
- long suggestedFileSize) {
- this.fileIO = fileIO;
+ PathFactory pathFactory,
+ long suggestedFileSize,
+ @Nullable SegmentsCache<String> cache) {
+ super(fileIO, serializer, readerFactory, pathFactory, cache);
this.schemaManager = schemaManager;
this.partitionType = partitionType;
- this.serializer = serializer;
- this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
- this.pathFactory = pathFactory;
this.suggestedFileSize = suggestedFileSize;
}
@@ -76,15 +74,6 @@ public class ManifestFile {
return suggestedFileSize;
}
- public List<ManifestEntry> read(String fileName) {
- try {
- return FileUtils.readListFromFile(
- fileIO, pathFactory.toManifestFilePath(fileName),
serializer, readerFactory);
- } catch (IOException e) {
- throw new RuntimeException("Failed to read manifest file " +
fileName, e);
- }
- }
-
/**
* Write several {@link ManifestEntry}s into manifest files.
*
@@ -93,7 +82,7 @@ public class ManifestFile {
public List<ManifestFileMeta> write(List<ManifestEntry> entries) {
RollingFileWriter<ManifestEntry, ManifestFileMeta> writer =
new RollingFileWriter<>(
- () -> new ManifestEntryWriter(writerFactory,
pathFactory.newManifestFile()),
+ () -> new ManifestEntryWriter(writerFactory,
pathFactory.newPath()),
suggestedFileSize);
try {
writer.write(entries);
@@ -104,10 +93,6 @@ public class ManifestFile {
return writer.result();
}
- public void delete(String fileName) {
- fileIO.deleteQuietly(pathFactory.toManifestFilePath(fileName));
- }
-
private class ManifestEntryWriter extends SingleFileWriter<ManifestEntry,
ManifestFileMeta> {
private final FieldStatsCollector partitionStatsCollector;
@@ -166,6 +151,7 @@ public class ManifestFile {
private final FileFormat fileFormat;
private final FileStorePathFactory pathFactory;
private final long suggestedFileSize;
+ @Nullable private final SegmentsCache<String> cache;
public Factory(
FileIO fileIO,
@@ -173,13 +159,15 @@ public class ManifestFile {
RowType partitionType,
FileFormat fileFormat,
FileStorePathFactory pathFactory,
- long suggestedFileSize) {
+ long suggestedFileSize,
+ @Nullable SegmentsCache<String> cache) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.partitionType = partitionType;
this.fileFormat = fileFormat;
this.pathFactory = pathFactory;
this.suggestedFileSize = suggestedFileSize;
+ this.cache = cache;
}
public ManifestFile create() {
@@ -191,8 +179,9 @@ public class ManifestFile {
new ManifestEntrySerializer(),
fileFormat.createReaderFactory(entryType),
fileFormat.createWriterFactory(entryType),
- pathFactory,
- suggestedFileSize);
+ pathFactory.manifestFileFactory(),
+ suggestedFileSize,
+ cache);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index c95718da0..86529e31c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -27,44 +27,32 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
-import org.apache.paimon.utils.FileUtils;
+import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.PathFactory;
+import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.VersionedObjectSerializer;
-import java.io.IOException;
+import javax.annotation.Nullable;
+
import java.util.List;
/**
* This file includes several {@link ManifestFileMeta}, representing all data
of the whole table at
* the corresponding snapshot.
*/
-public class ManifestList {
+public class ManifestList extends ObjectsFile<ManifestFileMeta> {
- private final FileIO fileIO;
- private final ManifestFileMetaSerializer serializer;
- private final FormatReaderFactory readerFactory;
private final FormatWriterFactory writerFactory;
- private final FileStorePathFactory pathFactory;
private ManifestList(
FileIO fileIO,
ManifestFileMetaSerializer serializer,
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
- FileStorePathFactory pathFactory) {
- this.fileIO = fileIO;
- this.serializer = serializer;
- this.readerFactory = readerFactory;
+ PathFactory pathFactory,
+ @Nullable SegmentsCache<String> cache) {
+ super(fileIO, serializer, readerFactory, pathFactory, cache);
this.writerFactory = writerFactory;
- this.pathFactory = pathFactory;
- }
-
- public List<ManifestFileMeta> read(String fileName) {
- try {
- return FileUtils.readListFromFile(
- fileIO, pathFactory.toManifestListPath(fileName),
serializer, readerFactory);
- } catch (IOException e) {
- throw new RuntimeException("Failed to read manifest list " +
fileName, e);
- }
}
/**
@@ -73,9 +61,20 @@ public class ManifestList {
* <p>NOTE: This method is atomic.
*/
public String write(List<ManifestFileMeta> metas) {
- Path path = pathFactory.newManifestList();
+ Path path = pathFactory.newPath();
try {
- return write(metas, path);
+ try (PositionOutputStream out = fileIO.newOutputStream(path,
false)) {
+ FormatWriter writer = writerFactory.create(out);
+ try {
+ for (ManifestFileMeta record : metas) {
+ writer.addElement(serializer.toRow(record));
+ }
+ } finally {
+ writer.flush();
+ writer.finish();
+ }
+ }
+ return path.getName();
} catch (Throwable e) {
fileIO.deleteQuietly(path);
throw new RuntimeException(
@@ -83,43 +82,23 @@ public class ManifestList {
}
}
- private String write(List<ManifestFileMeta> metas, Path path) throws
IOException {
- try (PositionOutputStream out = fileIO.newOutputStream(path, false)) {
- // Initialize the bulk writer to accept the ManifestFileMeta.
- FormatWriter writer = writerFactory.create(out);
- try {
- for (ManifestFileMeta manifest : metas) {
- writer.addElement(serializer.toRow(manifest));
- }
- } finally {
- writer.flush();
- writer.finish();
- }
- }
- return path.getName();
- }
-
- public void delete(String fileName) {
- fileIO.deleteQuietly(pathFactory.toManifestListPath(fileName));
- }
-
/** Creator of {@link ManifestList}. */
public static class Factory {
private final FileIO fileIO;
- private final RowType partitionType;
private final FileFormat fileFormat;
private final FileStorePathFactory pathFactory;
+ @Nullable private final SegmentsCache<String> cache;
public Factory(
FileIO fileIO,
- RowType partitionType,
FileFormat fileFormat,
- FileStorePathFactory pathFactory) {
+ FileStorePathFactory pathFactory,
+ @Nullable SegmentsCache<String> cache) {
this.fileIO = fileIO;
- this.partitionType = partitionType;
this.fileFormat = fileFormat;
this.pathFactory = pathFactory;
+ this.cache = cache;
}
public ManifestList create() {
@@ -129,7 +108,8 @@ public class ManifestList {
new ManifestFileMetaSerializer(),
fileFormat.createReaderFactory(metaType),
fileFormat.createWriterFactory(metaType),
- pathFactory);
+ pathFactory.manifestFileFactory(),
+ cache);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index cf4220b8c..304fbdb82 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -141,4 +141,32 @@ public class FileStorePathFactory {
public String uuid() {
return uuid;
}
+
+ public PathFactory manifestFileFactory() {
+ return new PathFactory() {
+ @Override
+ public Path newPath() {
+ return newManifestFile();
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return toManifestFilePath(fileName);
+ }
+ };
+ }
+
+ public PathFactory manifestListFactory() {
+ return new PathFactory() {
+ @Override
+ public Path newPath() {
+ return newManifestList();
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return toManifestListPath(fileName);
+ }
+ };
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
index 77644ab74..329dee230 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectSerializer.java
@@ -25,6 +25,7 @@ import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import java.io.ByteArrayInputStream;
@@ -50,6 +51,10 @@ public abstract class ObjectSerializer<T> implements
Serializable {
return rowSerializer.getArity();
}
+ public DataType[] fieldTypes() {
+ return rowSerializer.fieldTypes();
+ }
+
/**
* Serializes the given record to the given target output view.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
new file mode 100644
index 000000000..06ed61320
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.RandomAccessInputView;
+import org.apache.paimon.data.Segments;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentSource;
+import org.apache.paimon.types.RowType;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+/** Cache records to {@link SegmentsCache} by compacted serializer. */
+public class ObjectsCache<K, V> {
+
+ private final SegmentsCache<K> cache;
+ private final ObjectSerializer<V> serializer;
+ private final RowCompactedSerializer compactedSerializer;
+ private final Function<K, CloseableIterator<InternalRow>> reader;
+
+ public ObjectsCache(
+ SegmentsCache<K> cache,
+ ObjectSerializer<V> serializer,
+ Function<K, CloseableIterator<InternalRow>> reader) {
+ this.cache = cache;
+ this.serializer = serializer;
+ this.compactedSerializer = new
RowCompactedSerializer(RowType.of(serializer.fieldTypes()));
+ this.reader = reader;
+ }
+
+ public List<V> read(K key) throws IOException {
+ Segments segments = cache.getSegments(key, this::readSegments);
+ List<V> entries = new ArrayList<>();
+ RandomAccessInputView view =
+ new RandomAccessInputView(
+ segments.segments(), cache.pageSize(),
segments.limitInLastSegment());
+ while (true) {
+ try {
+
entries.add(serializer.fromRow(compactedSerializer.deserialize(view)));
+ } catch (EOFException e) {
+ return entries;
+ }
+ }
+ }
+
+ private Segments readSegments(K key) {
+ try (CloseableIterator<InternalRow> iterator = reader.apply(key)) {
+ ArrayList<MemorySegment> segments = new ArrayList<>();
+ MemorySegmentSource segmentSource =
+ () -> MemorySegment.allocateHeapMemory(cache.pageSize());
+ SimpleCollectingOutputView output =
+ new SimpleCollectingOutputView(segments, segmentSource,
cache.pageSize());
+ while (iterator.hasNext()) {
+ compactedSerializer.serialize(iterator.next(), output);
+ }
+ return new Segments(segments,
output.getCurrentPositionInSegment());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
new file mode 100644
index 000000000..10ac5ee2c
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.fs.FileIO;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.paimon.utils.FileUtils.createFormatReader;
+
+/** A file which contains several {@link T}s, provides read and write. */
+public abstract class ObjectsFile<T> {
+
+ protected final FileIO fileIO;
+ protected final ObjectSerializer<T> serializer;
+ protected final FormatReaderFactory readerFactory;
+ protected final PathFactory pathFactory;
+
+ @Nullable private final ObjectsCache<String, T> cache;
+
+ protected ObjectsFile(
+ FileIO fileIO,
+ ObjectSerializer<T> serializer,
+ FormatReaderFactory readerFactory,
+ PathFactory pathFactory,
+ @Nullable SegmentsCache<String> cache) {
+ this.fileIO = fileIO;
+ this.serializer = serializer;
+ this.readerFactory = readerFactory;
+ this.pathFactory = pathFactory;
+ this.cache =
+ cache == null ? null : new ObjectsCache<>(cache, serializer,
this::createIterator);
+ }
+
+ public List<T> read(String fileName) {
+ try {
+ if (cache != null) {
+ return cache.read(fileName);
+ }
+
+ return FileUtils.readListFromFile(
+ fileIO, pathFactory.toPath(fileName), serializer,
readerFactory);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read manifest list " +
fileName, e);
+ }
+ }
+
+ private CloseableIterator<InternalRow> createIterator(String fileName) {
+ try {
+ return createFormatReader(fileIO, readerFactory,
pathFactory.toPath(fileName))
+ .toCloseableIterator();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void delete(String fileName) {
+ fileIO.deleteQuietly(pathFactory.toPath(fileName));
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/PathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/PathFactory.java
new file mode 100644
index 000000000..9d1cf87bb
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/PathFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.fs.Path;
+
+/** Path factory to create a path. */
+public interface PathFactory {
+
+ Path newPath();
+
+ Path toPath(String fileName);
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
new file mode 100644
index 000000000..e42b262ab
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.Segments;
+import org.apache.paimon.options.MemorySize;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
+
+import java.util.function.Function;
+
+/** Cache {@link Segments}. */
+public class SegmentsCache<T> {
+
+ private static final int OBJECT_MEMORY_SIZE = 1000;
+
+ private final int pageSize;
+ private final Cache<T, Segments> cache;
+
+ public SegmentsCache(int pageSize, MemorySize maxMemorySize) {
+ this.pageSize = pageSize;
+ this.cache =
+ Caffeine.newBuilder()
+ .weigher(this::weigh)
+ .maximumWeight(maxMemorySize.getBytes())
+ .executor(MoreExecutors.directExecutor())
+ .build();
+ }
+
+ public int pageSize() {
+ return pageSize;
+ }
+
+ public Segments getSegments(T key, Function<T, Segments> viewFunction) {
+ return cache.get(key, viewFunction);
+ }
+
+ private int weigh(T cacheKey, Segments segments) {
+ return OBJECT_MEMORY_SIZE + segments.segments().size() * pageSize;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
index 1a8f10e28..6615e9074 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTest.java
@@ -151,7 +151,8 @@ public class ManifestFileMetaTest {
PARTITION_TYPE,
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString()),
- Long.MAX_VALUE)
+ Long.MAX_VALUE,
+ null)
.create();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index d4773395b..5be026b7c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -107,7 +107,8 @@ public class ManifestFileTest {
DEFAULT_PART_TYPE,
avro,
pathFactory,
- suggestedFileSize)
+ suggestedFileSize,
+ null)
.create();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index bbceeff3a..824d3e219 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -95,11 +95,6 @@ public class ManifestListTest {
TestKeyValueGenerator.DEFAULT_PART_TYPE,
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString());
- return new ManifestList.Factory(
- FileIOFinder.find(path),
- TestKeyValueGenerator.DEFAULT_PART_TYPE,
- avro,
- pathFactory)
- .create();
+ return new ManifestList.Factory(FileIOFinder.find(path), avro,
pathFactory, null).create();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index b5ea597bb..bf99f93f7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -33,6 +33,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
@@ -404,6 +405,45 @@ public abstract class FileStoreTableTestBase {
write.close();
}
+ @Test
+ public void testManifestCache() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf ->
+ conf.set(
+ CoreOptions.MANIFEST_CACHE_SIZE,
+ MemorySize.ofMebiBytes(1)));
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // lots of commits, produce lots of manifest
+ List<String> expected = new ArrayList<>();
+ int cnt = 50;
+ for (int i = 0; i < cnt; i++) {
+ write.write(rowData(i, i, (long) i));
+ commit.commit(i, write.prepareCommit(false, i));
+ expected.add(
+
String.format("%s|%s|%s|binary|varbinary|mapKey:mapVal|multiset", i, i, i));
+ }
+ write.close();
+
+ // create new write and reload manifests
+ write = table.newWrite(commitUser);
+ for (int i = 0; i < cnt; i++) {
+ write.write(rowData(i, i + 1, (long) i + 1));
+ expected.add(
+ String.format(
+
"%s|%s|%s|binary|varbinary|mapKey:mapVal|multiset", i, i + 1, i + 1));
+ }
+ commit.commit(cnt, write.prepareCommit(false, cnt));
+
+ // check result
+ List<String> result =
+ getResult(table.newRead(), table.newScan().plan().splits(),
BATCH_ROW_TO_STRING);
+ assertThat(result.size()).isEqualTo(expected.size());
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expected);
+ }
+
@Test
public void testWriteWithoutCompactionAndExpiration() throws Exception {
FileStoreTable table =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
index 1306dd386..50ab2d0aa 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ForceCompactionITCase.java
@@ -234,7 +234,7 @@ public class ForceCompactionITCase extends
CatalogITCaseBase {
CoreOptions.FILE_FORMAT.defaultValue().toString());
List<ManifestFileMeta> manifestFileMetas =
- new ManifestList.Factory(LocalFileIO.create(), partType, avro,
pathFactory)
+ new ManifestList.Factory(LocalFileIO.create(), avro,
pathFactory, null)
.create()
.read(snapshot.deltaManifestList());
assertThat(manifestFileMetas.get(0).numDeletedFiles()).isGreaterThanOrEqualTo(1);