This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 675374fb9 [core] Introduce cache.manifest-max-memory to CachingCatalog
(#3856)
675374fb9 is described below
commit 675374fb9116b6d428b4b856f87bd6c683181edf
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Aug 2 19:12:47 2024 +0800
[core] Introduce cache.manifest-max-memory to CachingCatalog (#3856)
---
.../generated/catalog_configuration.html | 18 +++++
.../org/apache/paimon/codegen/CompileUtils.java | 5 +-
.../org/apache/paimon/io/cache/CacheManager.java | 11 +--
.../org/apache/paimon/options/CatalogOptions.java | 18 +++++
.../apache/paimon/reader/RecordReaderIterator.java | 4 +-
.../java/org/apache/paimon/AbstractFileStore.java | 25 ++++---
.../src/main/java/org/apache/paimon/FileStore.java | 4 ++
.../org/apache/paimon/catalog/CachingCatalog.java | 72 +++++++++++++++++--
.../org/apache/paimon/catalog/CatalogFactory.java | 22 +-----
.../iceberg/manifest/IcebergManifestFile.java | 1 +
.../iceberg/manifest/IcebergManifestList.java | 1 +
.../org/apache/paimon/index/IndexFileHandler.java | 20 ------
.../org/apache/paimon/lookup/RocksDBState.java | 4 +-
.../apache/paimon/manifest/IndexManifestFile.java | 18 +++--
.../org/apache/paimon/manifest/ManifestFile.java | 19 +++--
.../org/apache/paimon/manifest/ManifestList.java | 19 +++--
.../paimon/manifest/SimpleFileEntrySerializer.java | 3 +-
.../org/apache/paimon/mergetree/LookupFile.java | 3 +-
.../apache/paimon/privilege/PrivilegedCatalog.java | 14 ++++
.../paimon/privilege/PrivilegedFileStore.java | 9 ++-
.../paimon/table/AbstractFileStoreTable.java | 6 ++
.../paimon/table/DelegatedFileStoreTable.java | 6 ++
.../org/apache/paimon/table/FileStoreTable.java | 4 ++
.../java/org/apache/paimon/utils/ObjectsCache.java | 63 +++++++++++------
.../java/org/apache/paimon/utils/ObjectsFile.java | 63 ++++++++++++-----
.../org/apache/paimon/utils/SegmentsCache.java | 46 ++++++++++--
.../apache/paimon/catalog/CachingCatalogTest.java | 81 ++++++++++++++++++++++
.../paimon/catalog/TestableCachingCatalog.java | 3 +-
.../manifest/IndexManifestFileHandlerTest.java | 6 +-
.../org/apache/paimon/utils/ObjectsCacheTest.java | 4 +-
30 files changed, 439 insertions(+), 133 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 4508d898d..c7449eeb6 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -44,6 +44,24 @@ under the License.
<td>Duration</td>
<td>Controls the duration for which databases and tables in the
catalog are cached.</td>
</tr>
+ <tr>
+ <td><h5>cache.manifest.max-memory</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>MemorySize</td>
+ <td>Controls the maximum memory to cache manifest content.</td>
+ </tr>
+ <tr>
+ <td><h5>cache.manifest.small-file-memory</h5></td>
+ <td style="word-wrap: break-word;">128 mb</td>
+ <td>MemorySize</td>
+ <td>Controls the cache memory to cache small manifest files.</td>
+ </tr>
+ <tr>
+ <td><h5>cache.manifest.small-file-threshold</h5></td>
+ <td style="word-wrap: break-word;">1 mb</td>
+ <td>MemorySize</td>
+ <td>Controls the threshold of small manifest file.</td>
+ </tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
b/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
index e13248a92..fcad6bf30 100644
--- a/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/codegen/CompileUtils.java
@@ -20,7 +20,6 @@ package org.apache.paimon.codegen;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
-import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
import org.codehaus.janino.SimpleCompiler;
import org.slf4j.Logger;
@@ -46,12 +45,12 @@ public final class CompileUtils {
*/
static final Cache<ClassKey, Class<?>> COMPILED_CLASS_CACHE =
Caffeine.newBuilder()
+ .softValues()
// estimated maximum planning/startup time
.expireAfterAccess(Duration.ofMinutes(5))
// estimated cache size
.maximumSize(300)
- .softValues()
- .executor(MoreExecutors.directExecutor())
+ .executor(Runnable::run)
.build();
/**
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
index 3f09fa5f4..b8f00205e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
@@ -25,7 +25,6 @@ import org.apache.paimon.options.MemorySize;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
-import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
@@ -48,13 +47,13 @@ public class CacheManager {
.weigher(this::weigh)
.maximumWeight(maxMemorySize.getBytes())
.removalListener(this::onRemoval)
- .executor(MoreExecutors.directExecutor())
+ .executor(Runnable::run)
.build();
this.fileReadCount = 0;
}
@VisibleForTesting
- public Cache<CacheKey, CacheValue> cache() {
+ public Cache<CacheKey, ?> cache() {
return cache;
}
@@ -81,8 +80,10 @@ public class CacheManager {
}
private void onRemoval(CacheKey key, CacheValue value, RemovalCause cause)
{
- value.isClosed = true;
- value.callback.onRemoval(key);
+ if (value != null) {
+ value.isClosed = true;
+ value.callback.onRemoval(key);
+ }
}
public int fileReadCount() {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 980083e71..d9758d4b0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -105,6 +105,24 @@ public class CatalogOptions {
.withDescription(
"Controls the duration for which databases and
tables in the catalog are cached.");
+ public static final ConfigOption<MemorySize>
CACHE_MANIFEST_SMALL_FILE_MEMORY =
+ key("cache.manifest.small-file-memory")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(128))
+ .withDescription("Controls the cache memory to cache small
manifest files.");
+
+ public static final ConfigOption<MemorySize>
CACHE_MANIFEST_SMALL_FILE_THRESHOLD =
+ key("cache.manifest.small-file-threshold")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(1))
+ .withDescription("Controls the threshold of small manifest
file.");
+
+ public static final ConfigOption<MemorySize> CACHE_MANIFEST_MAX_MEMORY =
+ key("cache.manifest.max-memory")
+ .memoryType()
+ .noDefaultValue()
+ .withDescription("Controls the maximum memory to cache
manifest content.");
+
public static final ConfigOption<String> LINEAGE_META =
key("lineage-meta")
.stringType()
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReaderIterator.java
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReaderIterator.java
index 5a807c6f1..98a485a9c 100644
---
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReaderIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReaderIterator.java
@@ -19,6 +19,7 @@
package org.apache.paimon.reader;
import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.IOUtils;
import java.io.IOException;
@@ -34,7 +35,8 @@ public class RecordReaderIterator<T> implements
CloseableIterator<T> {
this.reader = reader;
try {
this.currentIterator = reader.readBatch();
- } catch (IOException e) {
+ } catch (Exception e) {
+ IOUtils.closeQuietly(reader);
throw new RuntimeException(e);
}
this.advanced = false;
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 49d2c47f0..5120db295 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -21,6 +21,7 @@ package org.apache.paimon;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.index.HashIndexFile;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestFile;
@@ -74,7 +75,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
protected final RowType partitionType;
private final CatalogEnvironment catalogEnvironment;
- @Nullable private final SegmentsCache<String> writeManifestCache;
+ @Nullable private final SegmentsCache<Path> writeManifestCache;
+ @Nullable private SegmentsCache<Path> readManifestCache;
protected AbstractFileStore(
FileIO fileIO,
@@ -89,11 +91,9 @@ abstract class AbstractFileStore<T> implements FileStore<T> {
this.options = options;
this.partitionType = partitionType;
this.catalogEnvironment = catalogEnvironment;
- MemorySize writeManifestCache = options.writeManifestCache();
this.writeManifestCache =
- writeManifestCache.getBytes() == 0
- ? null
- : new SegmentsCache<>(options.pageSize(),
writeManifestCache);
+ SegmentsCache.create(
+ options.pageSize(), options.writeManifestCache(),
Long.MAX_VALUE);
}
@Override
@@ -124,7 +124,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.manifestCompression(),
pathFactory(),
options.manifestTargetSize().getBytes(),
- forWrite ? writeManifestCache : null);
+ forWrite ? writeManifestCache : readManifestCache);
}
@Override
@@ -138,12 +138,16 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
options.manifestFormat(),
options.manifestCompression(),
pathFactory(),
- forWrite ? writeManifestCache : null);
+ forWrite ? writeManifestCache : readManifestCache);
}
protected IndexManifestFile.Factory indexManifestFileFactory() {
return new IndexManifestFile.Factory(
- fileIO, options.manifestFormat(),
options.manifestCompression(), pathFactory());
+ fileIO,
+ options.manifestFormat(),
+ options.manifestCompression(),
+ pathFactory(),
+ readManifestCache);
}
@Override
@@ -314,4 +318,9 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
public ServiceManager newServiceManager() {
return new ServiceManager(fileIO, options.path());
}
+
+ @Override
+ public void setManifestCache(SegmentsCache<Path> manifestCache) {
+ this.readManifestCache = manifestCache;
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 38b7321fc..715062565 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -18,6 +18,7 @@
package org.apache.paimon;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestFile;
@@ -38,6 +39,7 @@ import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -100,4 +102,6 @@ public interface FileStore<T> {
boolean mergeSchema(RowType rowType, boolean allowExplicitCast);
List<TagCallback> createTagCallbacks();
+
+ void setManifestCache(SegmentsCache<Path> manifestCache);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index b229963f5..077775945 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -18,11 +18,15 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SegmentsCache;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
@@ -34,13 +38,20 @@ import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
/** A {@link Catalog} to cache databases and tables and manifests. */
@@ -50,16 +61,35 @@ public class CachingCatalog extends DelegateCatalog {
protected final Cache<String, Map<String, String>> databaseCache;
protected final Cache<Identifier, Table> tableCache;
+ @Nullable protected final SegmentsCache<Path> manifestCache;
public CachingCatalog(Catalog wrapped) {
- this(wrapped, CACHE_EXPIRATION_INTERVAL_MS.defaultValue());
+ this(
+ wrapped,
+ CACHE_EXPIRATION_INTERVAL_MS.defaultValue(),
+ CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue(),
+ CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
}
- public CachingCatalog(Catalog wrapped, Duration expirationInterval) {
- this(wrapped, expirationInterval, Ticker.systemTicker());
+ public CachingCatalog(
+ Catalog wrapped,
+ Duration expirationInterval,
+ MemorySize manifestMaxMemory,
+ long manifestCacheThreshold) {
+ this(
+ wrapped,
+ expirationInterval,
+ manifestMaxMemory,
+ manifestCacheThreshold,
+ Ticker.systemTicker());
}
- public CachingCatalog(Catalog wrapped, Duration expirationInterval, Ticker
ticker) {
+ public CachingCatalog(
+ Catalog wrapped,
+ Duration expirationInterval,
+ MemorySize manifestMaxMemory,
+ long manifestCacheThreshold,
+ Ticker ticker) {
super(wrapped);
if (expirationInterval.isZero() || expirationInterval.isNegative()) {
throw new IllegalArgumentException(
@@ -81,6 +111,27 @@ public class CachingCatalog extends DelegateCatalog {
.expireAfterAccess(expirationInterval)
.ticker(ticker)
.build();
+ this.manifestCache = SegmentsCache.create(manifestMaxMemory,
manifestCacheThreshold);
+ }
+
+ public static Catalog tryToCreate(Catalog catalog, Options options) {
+ if (!options.get(CACHE_ENABLED)) {
+ return catalog;
+ }
+
+ MemorySize manifestMaxMemory =
options.get(CACHE_MANIFEST_SMALL_FILE_MEMORY);
+ long manifestThreshold =
options.get(CACHE_MANIFEST_SMALL_FILE_THRESHOLD).getBytes();
+ Optional<MemorySize> maxMemory =
options.getOptional(CACHE_MANIFEST_MAX_MEMORY);
+ if (maxMemory.isPresent() &&
maxMemory.get().compareTo(manifestMaxMemory) > 0) {
+ // cache all manifest files
+ manifestMaxMemory = maxMemory.get();
+ manifestThreshold = Long.MAX_VALUE;
+ }
+ return new CachingCatalog(
+ catalog,
+ options.get(CACHE_EXPIRATION_INTERVAL_MS),
+ manifestMaxMemory,
+ manifestThreshold);
}
@Override
@@ -151,7 +202,7 @@ public class CachingCatalog extends DelegateCatalog {
Table originTable = tableCache.getIfPresent(originIdentifier);
if (originTable == null) {
originTable = wrapped.getTable(originIdentifier);
- tableCache.put(originIdentifier, originTable);
+ putTableCache(originIdentifier, originTable);
}
table =
SystemTableLoader.load(
@@ -160,15 +211,22 @@ public class CachingCatalog extends DelegateCatalog {
if (table == null) {
throw new TableNotExistException(identifier);
}
- tableCache.put(identifier, table);
+ putTableCache(identifier, table);
return table;
}
table = wrapped.getTable(identifier);
- tableCache.put(identifier, table);
+ putTableCache(identifier, table);
return table;
}
+ private void putTableCache(Identifier identifier, Table table) {
+ if (manifestCache != null && table instanceof FileStoreTable) {
+ ((FileStoreTable) table).setManifestCache(manifestCache);
+ }
+ tableCache.put(identifier, table);
+ }
+
private class TableInvalidatingRemovalListener implements
RemovalListener<Identifier, Table> {
@Override
public void onRemoval(Identifier identifier, Table table, @NonNull
RemovalCause cause) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
index 2eff8e902..0739014c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
@@ -24,16 +24,12 @@ import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
-import org.apache.paimon.privilege.FileBasedPrivilegeManager;
-import org.apache.paimon.privilege.PrivilegeManager;
import org.apache.paimon.privilege.PrivilegedCatalog;
import org.apache.paimon.utils.Preconditions;
import java.io.IOException;
import java.io.UncheckedIOException;
-import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
-import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
@@ -73,23 +69,9 @@ public interface CatalogFactory extends Factory {
static Catalog createCatalog(CatalogContext context, ClassLoader
classLoader) {
Catalog catalog = createUnwrappedCatalog(context, classLoader);
-
Options options = context.options();
- if (options.get(CACHE_ENABLED)) {
- catalog = new CachingCatalog(catalog,
options.get(CACHE_EXPIRATION_INTERVAL_MS));
- }
-
- PrivilegeManager privilegeManager =
- new FileBasedPrivilegeManager(
- catalog.warehouse(),
- catalog.fileIO(),
- context.options().get(PrivilegedCatalog.USER),
- context.options().get(PrivilegedCatalog.PASSWORD));
- if (privilegeManager.privilegeEnabled()) {
- catalog = new PrivilegedCatalog(catalog, privilegeManager);
- }
-
- return catalog;
+ catalog = CachingCatalog.tryToCreate(catalog, options);
+ return PrivilegedCatalog.tryToCreate(catalog, options);
}
static Catalog createUnwrappedCatalog(CatalogContext context, ClassLoader
classLoader) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index d4c363b4c..97ab8c5d1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -67,6 +67,7 @@ public class IcebergManifestFile extends
ObjectsFile<IcebergManifestEntry> {
super(
fileIO,
new IcebergManifestEntrySerializer(partitionType),
+ IcebergManifestEntry.schema(partitionType),
readerFactory,
writerFactory,
compression,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
index e247b0238..a468a662d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestList.java
@@ -39,6 +39,7 @@ public class IcebergManifestList extends
ObjectsFile<IcebergManifestFileMeta> {
super(
fileIO,
new IcebergManifestFileMetaSerializer(),
+ IcebergManifestFileMeta.schema(),
readerFactory,
writerFactory,
compression,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index eed18af17..ee6cbe769 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -197,26 +197,6 @@ public class IndexFileHandler {
return result;
}
- public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow
partition, int bucket) {
- Snapshot snapshot = snapshotManager.latestSnapshot();
- if (snapshot == null) {
- return Collections.emptyList();
- }
- String indexManifest = snapshot.indexManifest();
- if (indexManifest == null) {
- return Collections.emptyList();
- }
- List<IndexManifestEntry> result = new ArrayList<>();
- for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
- if (file.indexFile().indexType().equals(indexType)
- && file.partition().equals(partition)
- && file.bucket() == bucket) {
- result.add(file);
- }
- }
- return result;
- }
-
public Path filePath(IndexFileMeta file) {
return pathFactory.toPath(file.fileName());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
index c0417b8f1..d3fafeeb6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
+++ b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBState.java
@@ -29,7 +29,6 @@ import org.apache.paimon.types.RowType;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
-import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
@@ -80,8 +79,9 @@ public abstract class RocksDBState<K, V, CacheV> {
this.writeOptions = new WriteOptions().setDisableWAL(true);
this.cache =
Caffeine.newBuilder()
+ .softValues()
.maximumSize(lruCacheSize)
- .executor(MoreExecutors.directExecutor())
+ .executor(Runnable::run)
.build();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
index 91a4f171a..9989f5809 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
@@ -22,11 +22,13 @@ import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ObjectsFile;
import org.apache.paimon.utils.PathFactory;
+import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.VersionedObjectSerializer;
import javax.annotation.Nullable;
@@ -38,18 +40,21 @@ public class IndexManifestFile extends
ObjectsFile<IndexManifestEntry> {
private IndexManifestFile(
FileIO fileIO,
+ RowType schema,
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
String compression,
- PathFactory pathFactory) {
+ PathFactory pathFactory,
+ @Nullable SegmentsCache<Path> cache) {
super(
fileIO,
new IndexManifestEntrySerializer(),
+ schema,
readerFactory,
writerFactory,
compression,
pathFactory,
- null);
+ cache);
}
/** Write new index files to index manifest. */
@@ -72,26 +77,31 @@ public class IndexManifestFile extends
ObjectsFile<IndexManifestEntry> {
private final FileFormat fileFormat;
private final String compression;
private final FileStorePathFactory pathFactory;
+ @Nullable private final SegmentsCache<Path> cache;
public Factory(
FileIO fileIO,
FileFormat fileFormat,
String compression,
- FileStorePathFactory pathFactory) {
+ FileStorePathFactory pathFactory,
+ @Nullable SegmentsCache<Path> cache) {
this.fileIO = fileIO;
this.fileFormat = fileFormat;
this.compression = compression;
this.pathFactory = pathFactory;
+ this.cache = cache;
}
public IndexManifestFile create() {
RowType schema =
VersionedObjectSerializer.versionType(IndexManifestEntry.schema());
return new IndexManifestFile(
fileIO,
+ schema,
fileFormat.createReaderFactory(schema),
fileFormat.createWriterFactory(schema),
compression,
- pathFactory.indexManifestFileFactory());
+ pathFactory.indexManifestFileFactory(),
+ cache);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
index e2fa86789..4fd3df960 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java
@@ -57,13 +57,22 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
SchemaManager schemaManager,
RowType partitionType,
ManifestEntrySerializer serializer,
+ RowType schema,
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
String compression,
PathFactory pathFactory,
long suggestedFileSize,
- @Nullable SegmentsCache<String> cache) {
- super(fileIO, serializer, readerFactory, writerFactory, compression,
pathFactory, cache);
+ @Nullable SegmentsCache<Path> cache) {
+ super(
+ fileIO,
+ serializer,
+ schema,
+ readerFactory,
+ writerFactory,
+ compression,
+ pathFactory,
+ cache);
this.schemaManager = schemaManager;
this.partitionType = partitionType;
this.writerFactory = writerFactory;
@@ -156,7 +165,7 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
private final String compression;
private final FileStorePathFactory pathFactory;
private final long suggestedFileSize;
- @Nullable private final SegmentsCache<String> cache;
+ @Nullable private final SegmentsCache<Path> cache;
public Factory(
FileIO fileIO,
@@ -166,7 +175,7 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
String compression,
FileStorePathFactory pathFactory,
long suggestedFileSize,
- @Nullable SegmentsCache<String> cache) {
+ @Nullable SegmentsCache<Path> cache) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.partitionType = partitionType;
@@ -184,6 +193,7 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
schemaManager,
partitionType,
new ManifestEntrySerializer(),
+ entryType,
fileFormat.createReaderFactory(entryType),
fileFormat.createWriterFactory(entryType),
compression,
@@ -197,6 +207,7 @@ public class ManifestFile extends
ObjectsFile<ManifestEntry> {
return new ObjectsFile<>(
fileIO,
new SimpleFileEntrySerializer(),
+ entryType,
fileFormat.createReaderFactory(entryType),
fileFormat.createWriterFactory(entryType),
compression,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
index e33d4a643..304730f30 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestList.java
@@ -22,6 +22,7 @@ import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ObjectsFile;
@@ -42,12 +43,21 @@ public class ManifestList extends
ObjectsFile<ManifestFileMeta> {
private ManifestList(
FileIO fileIO,
ManifestFileMetaSerializer serializer,
+ RowType schema,
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
String compression,
PathFactory pathFactory,
- @Nullable SegmentsCache<String> cache) {
- super(fileIO, serializer, readerFactory, writerFactory, compression,
pathFactory, cache);
+ @Nullable SegmentsCache<Path> cache) {
+ super(
+ fileIO,
+ serializer,
+ schema,
+ readerFactory,
+ writerFactory,
+ compression,
+ pathFactory,
+ cache);
}
/**
@@ -66,14 +76,14 @@ public class ManifestList extends
ObjectsFile<ManifestFileMeta> {
private final FileFormat fileFormat;
private final String compression;
private final FileStorePathFactory pathFactory;
- @Nullable private final SegmentsCache<String> cache;
+ @Nullable private final SegmentsCache<Path> cache;
public Factory(
FileIO fileIO,
FileFormat fileFormat,
String compression,
FileStorePathFactory pathFactory,
- @Nullable SegmentsCache<String> cache) {
+ @Nullable SegmentsCache<Path> cache) {
this.fileIO = fileIO;
this.fileFormat = fileFormat;
this.compression = compression;
@@ -86,6 +96,7 @@ public class ManifestList extends
ObjectsFile<ManifestFileMeta> {
return new ManifestList(
fileIO,
new ManifestFileMetaSerializer(),
+ metaType,
fileFormat.createReaderFactory(metaType),
fileFormat.createWriterFactory(metaType),
compression,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
index f23f167f7..c33abc00c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java
@@ -19,6 +19,7 @@
package org.apache.paimon.manifest;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.utils.VersionedObjectSerializer;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
@@ -51,7 +52,7 @@ public class SimpleFileEntrySerializer extends
VersionedObjectSerializer<SimpleF
throw new IllegalArgumentException("Unsupported version: " +
version);
}
- InternalRow file = row.getRow(4, 3);
+ InternalRow file = row.getRow(4, DataFileMeta.SCHEMA.getFieldCount());
return new SimpleFileEntry(
FileKind.fromByteValue(row.getByte(0)),
deserializeBinaryRow(row.getBinary(1)),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
index b3b6fabc4..0e05d85a4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
@@ -28,7 +28,6 @@ import org.apache.paimon.utils.FileIOUtils;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
-import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
import javax.annotation.Nullable;
@@ -91,7 +90,7 @@ public class LookupFile implements Closeable {
.maximumWeight(maxDiskSize.getKibiBytes())
.weigher(LookupFile::fileWeigh)
.removalListener(LookupFile::removalCallback)
- .executor(MoreExecutors.directExecutor())
+ .executor(Runnable::run)
.build();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
index 3e4781086..bda06a08d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.DelegateCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
@@ -49,6 +50,19 @@ public class PrivilegedCatalog extends DelegateCatalog {
this.privilegeManager = privilegeManager;
}
+ public static Catalog tryToCreate(Catalog catalog, Options options) {
+ PrivilegeManager privilegeManager =
+ new FileBasedPrivilegeManager(
+ catalog.warehouse(),
+ catalog.fileIO(),
+ options.get(PrivilegedCatalog.USER),
+ options.get(PrivilegedCatalog.PASSWORD));
+ if (privilegeManager.privilegeEnabled()) {
+ catalog = new PrivilegedCatalog(catalog, privilegeManager);
+ }
+ return catalog;
+ }
+
public PrivilegeManager privilegeManager() {
return privilegeManager;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 1870813e9..768e6259e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -21,6 +21,7 @@ package org.apache.paimon.privilege;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.manifest.ManifestFile;
@@ -41,6 +42,7 @@ import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -51,8 +53,6 @@ import java.util.List;
/** {@link FileStore} with privilege checks. */
public class PrivilegedFileStore<T> implements FileStore<T> {
- private static final long serialVersionUID = 1L;
-
private final FileStore<T> wrapped;
private final PrivilegeChecker privilegeChecker;
private final Identifier identifier;
@@ -199,4 +199,9 @@ public class PrivilegedFileStore<T> implements FileStore<T>
{
public List<TagCallback> createTagCallbacks() {
return wrapped.createTagCallbacks();
}
+
+ @Override
+ public void setManifestCache(SegmentsCache<Path> manifestCache) {
+ wrapped.setManifestCache(manifestCache);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index bad718a04..d30bd11ef 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -58,6 +58,7 @@ import
org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanne
import
org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -108,6 +109,11 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
this.catalogEnvironment = catalogEnvironment;
}
+ @Override
+ public void setManifestCache(SegmentsCache<Path> manifestCache) {
+ store().setManifestCache(manifestCache);
+ }
+
@Override
public OptionalLong latestSnapshotId() {
Long snapshot = store().snapshotManager().latestSnapshotId();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 6a8e94240..58d1caaac 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -35,6 +35,7 @@ import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -97,6 +98,11 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
return wrapped.fileIO();
}
+ @Override
+ public void setManifestCache(SegmentsCache<Path> manifestCache) {
+ wrapped.setManifestCache(manifestCache);
+ }
+
@Override
public TableSchema schema() {
return wrapped.schema();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 68f053178..61fe816ac 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
import org.apache.paimon.FileStore;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.schema.TableSchema;
@@ -29,6 +30,7 @@ import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SegmentsCache;
import java.util.List;
import java.util.Map;
@@ -40,6 +42,8 @@ import java.util.Optional;
*/
public interface FileStoreTable extends DataTable {
+ void setManifestCache(SegmentsCache<Path> manifestCache);
+
@Override
default RowType rowType() {
return schema().logicalRowType();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
index f80037911..8c490e008 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -26,6 +26,7 @@ import org.apache.paimon.data.SimpleCollectingOutputView;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentSource;
+import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -34,25 +35,30 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.function.BiFunction;
+
+import static org.apache.paimon.utils.ObjectsFile.readFromIterator;
/** Cache records to {@link SegmentsCache} by compacted serializer. */
@ThreadSafe
public class ObjectsCache<K, V> {
private final SegmentsCache<K> cache;
- private final ObjectSerializer<V> serializer;
- private final ThreadLocal<InternalRowSerializer> threadLocalRowSerializer;
- private final BiFunction<K, Long, CloseableIterator<InternalRow>> reader;
+ private final ObjectSerializer<V> projectedSerializer;
+ private final ThreadLocal<InternalRowSerializer> formatSerializer;
+ private final FunctionWithIOException<K, Long> fileSizeFunction;
+ private final BiFunctionWithIOE<K, Long, CloseableIterator<InternalRow>>
reader;
public ObjectsCache(
SegmentsCache<K> cache,
- ObjectSerializer<V> serializer,
- BiFunction<K, Long, CloseableIterator<InternalRow>> reader) {
+ ObjectSerializer<V> projectedSerializer,
+ RowType formatSchema,
+ FunctionWithIOException<K, Long> fileSizeFunction,
+ BiFunctionWithIOE<K, Long, CloseableIterator<InternalRow>> reader)
{
this.cache = cache;
- this.serializer = serializer;
- this.threadLocalRowSerializer =
- ThreadLocal.withInitial(() -> new
InternalRowSerializer(serializer.fieldTypes()));
+ this.projectedSerializer = projectedSerializer;
+ this.formatSerializer =
+ ThreadLocal.withInitial(() -> new
InternalRowSerializer(formatSchema));
+ this.fileSizeFunction = fileSizeFunction;
this.reader = reader;
}
@@ -62,19 +68,37 @@ public class ObjectsCache<K, V> {
Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter)
throws IOException {
- InternalRowSerializer rowSerializer = threadLocalRowSerializer.get();
- Segments segments =
- cache.getSegments(key, k -> readSegments(k, fileSize,
loadFilter, rowSerializer));
+ Segments segments = cache.getIfPresents(key);
+ if (segments != null) {
+ return readFromSegments(segments, readFilter);
+ } else {
+ if (fileSize == null) {
+ fileSize = fileSizeFunction.apply(key);
+ }
+ if (fileSize <= cache.maxElementSize()) {
+ segments = readSegments(key, fileSize, loadFilter);
+ cache.put(key, segments);
+ return readFromSegments(segments, readFilter);
+ } else {
+ return readFromIterator(
+ reader.apply(key, fileSize), projectedSerializer,
readFilter);
+ }
+ }
+ }
+
+ private List<V> readFromSegments(Segments segments, Filter<InternalRow>
readFilter)
+ throws IOException {
+ InternalRowSerializer formatSerializer = this.formatSerializer.get();
List<V> entries = new ArrayList<>();
RandomAccessInputView view =
new RandomAccessInputView(
segments.segments(), cache.pageSize(),
segments.limitInLastSegment());
- BinaryRow binaryRow = new BinaryRow(rowSerializer.getArity());
+ BinaryRow binaryRow = new BinaryRow(formatSerializer.getArity());
while (true) {
try {
- rowSerializer.mapFromPages(binaryRow, view);
+ formatSerializer.mapFromPages(binaryRow, view);
if (readFilter.test(binaryRow)) {
- entries.add(serializer.fromRow(binaryRow));
+ entries.add(projectedSerializer.fromRow(binaryRow));
}
} catch (EOFException e) {
return entries;
@@ -82,11 +106,8 @@ public class ObjectsCache<K, V> {
}
}
- private Segments readSegments(
- K key,
- @Nullable Long fileSize,
- Filter<InternalRow> loadFilter,
- InternalRowSerializer rowSerializer) {
+ private Segments readSegments(K key, @Nullable Long fileSize,
Filter<InternalRow> loadFilter) {
+ InternalRowSerializer formatSerializer = this.formatSerializer.get();
try (CloseableIterator<InternalRow> iterator = reader.apply(key,
fileSize)) {
ArrayList<MemorySegment> segments = new ArrayList<>();
MemorySegmentSource segmentSource =
@@ -96,7 +117,7 @@ public class ObjectsCache<K, V> {
while (iterator.hasNext()) {
InternalRow row = iterator.next();
if (loadFilter.test(row)) {
- rowSerializer.serializeToPages(row, output);
+ formatSerializer.serializeToPages(row, output);
}
}
return new Segments(segments,
output.getCurrentPositionInSegment());
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index a457a868a..41f18f448 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -25,7 +25,7 @@ import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
-import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
@@ -36,7 +36,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import static org.apache.paimon.utils.FileUtils.createFormatReader;
+import static org.apache.paimon.utils.FileUtils.checkExists;
/** A file which contains several {@link T}s, provides read and write. */
public class ObjectsFile<T> {
@@ -48,16 +48,17 @@ public class ObjectsFile<T> {
protected final String compression;
protected final PathFactory pathFactory;
- @Nullable private final ObjectsCache<String, T> cache;
+ @Nullable private final ObjectsCache<Path, T> cache;
public ObjectsFile(
FileIO fileIO,
ObjectSerializer<T> serializer,
+ RowType formatType,
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
String compression,
PathFactory pathFactory,
- @Nullable SegmentsCache<String> cache) {
+ @Nullable SegmentsCache<Path> cache) {
this.fileIO = fileIO;
this.serializer = serializer;
this.readerFactory = readerFactory;
@@ -65,7 +66,14 @@ public class ObjectsFile<T> {
this.compression = compression;
this.pathFactory = pathFactory;
this.cache =
- cache == null ? null : new ObjectsCache<>(cache, serializer,
this::createIterator);
+ cache == null
+ ? null
+ : new ObjectsCache<>(
+ cache,
+ serializer,
+ formatType,
+ this::fileSize,
+ this::createIterator);
}
public FileIO fileIO() {
@@ -123,18 +131,12 @@ public class ObjectsFile<T> {
Filter<InternalRow> loadFilter,
Filter<InternalRow> readFilter)
throws IOException {
+ Path path = pathFactory.toPath(fileName);
if (cache != null) {
- return cache.read(fileName, fileSize, loadFilter, readFilter);
+ return cache.read(path, fileSize, loadFilter, readFilter);
}
- RecordReader<InternalRow> reader =
- createFormatReader(fileIO, readerFactory,
pathFactory.toPath(fileName), fileSize);
- if (readFilter != Filter.ALWAYS_TRUE) {
- reader = reader.filter(readFilter);
- }
- List<T> result = new ArrayList<>();
- reader.forEachRemaining(row -> result.add(serializer.fromRow(row)));
- return result;
+ return readFromIterator(createIterator(path, fileSize), serializer,
readFilter);
}
public String writeWithoutRolling(Collection<T> records) {
@@ -163,17 +165,40 @@ public class ObjectsFile<T> {
}
}
- private CloseableIterator<InternalRow> createIterator(
- String fileName, @Nullable Long fileSize) {
+ private CloseableIterator<InternalRow> createIterator(Path file, @Nullable
Long fileSize)
+ throws IOException {
+ return FileUtils.createFormatReader(fileIO, readerFactory, file,
fileSize)
+ .toCloseableIterator();
+ }
+
+ private long fileSize(Path file) throws IOException {
try {
- return createFormatReader(fileIO, readerFactory,
pathFactory.toPath(fileName), fileSize)
- .toCloseableIterator();
+ return fileIO.getFileSize(file);
} catch (IOException e) {
- throw new RuntimeException(e);
+ checkExists(fileIO, file);
+ throw e;
}
}
public void delete(String fileName) {
fileIO.deleteQuietly(pathFactory.toPath(fileName));
}
+
+ public static <V> List<V> readFromIterator(
+ CloseableIterator<InternalRow> inputIterator,
+ ObjectSerializer<V> serializer,
+ Filter<InternalRow> readFilter) {
+ try (CloseableIterator<InternalRow> iterator = inputIterator) {
+ List<V> result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ if (readFilter.test(row)) {
+ result.add(serializer.fromRow(row));
+ }
+ }
+ return result;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
index 87d5d8586..d5c2178c8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SegmentsCache.java
@@ -23,9 +23,10 @@ import org.apache.paimon.options.MemorySize;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
-import
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
-import java.util.function.Function;
+import javax.annotation.Nullable;
+
+import static org.apache.paimon.CoreOptions.PAGE_SIZE;
/** Cache {@link Segments}. */
public class SegmentsCache<T> {
@@ -34,26 +35,59 @@ public class SegmentsCache<T> {
private final int pageSize;
private final Cache<T, Segments> cache;
+ private final MemorySize maxMemorySize;
+ private final long maxElementSize;
- public SegmentsCache(int pageSize, MemorySize maxMemorySize) {
+ public SegmentsCache(int pageSize, MemorySize maxMemorySize, long
maxElementSize) {
this.pageSize = pageSize;
this.cache =
Caffeine.newBuilder()
+ .softValues()
.weigher(this::weigh)
.maximumWeight(maxMemorySize.getBytes())
- .executor(MoreExecutors.directExecutor())
+ .executor(Runnable::run)
.build();
+ this.maxMemorySize = maxMemorySize;
+ this.maxElementSize = maxElementSize;
}
public int pageSize() {
return pageSize;
}
- public Segments getSegments(T key, Function<T, Segments> viewFunction) {
- return cache.get(key, viewFunction);
+ public MemorySize maxMemorySize() {
+ return maxMemorySize;
+ }
+
+ public long maxElementSize() {
+ return maxElementSize;
+ }
+
+ @Nullable
+ public Segments getIfPresents(T key) {
+ return cache.getIfPresent(key);
+ }
+
+ public void put(T key, Segments segments) {
+ cache.put(key, segments);
}
private int weigh(T cacheKey, Segments segments) {
return OBJECT_MEMORY_SIZE + segments.segments().size() * pageSize;
}
+
+ @Nullable
+ public static <T> SegmentsCache<T> create(MemorySize maxMemorySize, long
maxElementSize) {
+ return create((int) PAGE_SIZE.defaultValue().getBytes(),
maxMemorySize, maxElementSize);
+ }
+
+ @Nullable
+ public static <T> SegmentsCache<T> create(
+ int pageSize, MemorySize maxMemorySize, long maxElementSize) {
+ if (maxMemorySize.getBytes() == 0) {
+ return null;
+ }
+
+ return new SegmentsCache<>(pageSize, maxMemorySize, maxElementSize);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
index 56fc238a8..d1f7eeb8a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -18,9 +18,18 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.FakeTicker;
@@ -31,6 +40,7 @@ import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.FileNotFoundException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -41,6 +51,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.paimon.data.BinaryString.fromString;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_MAX_MEMORY;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_MEMORY;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_MANIFEST_SMALL_FILE_THRESHOLD;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -285,4 +299,71 @@ class CachingCatalogTest extends CatalogTestBase {
.map(type -> Identifier.fromString(tableIdent.getFullName() +
"$" + type))
.toArray(Identifier[]::new);
}
+
+ @Test
+ public void testManifestCache() throws Exception {
+ innerTestManifestCache(Long.MAX_VALUE);
+ assertThatThrownBy(() -> innerTestManifestCache(10))
+ .hasRootCauseInstanceOf(FileNotFoundException.class);
+ }
+
+ private void innerTestManifestCache(long manifestCacheThreshold) throws
Exception {
+ Catalog catalog =
+ new CachingCatalog(
+ this.catalog,
+ Duration.ofSeconds(10),
+ MemorySize.ofMebiBytes(1),
+ manifestCacheThreshold);
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.dropTable(tableIdent, true);
+ catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+
+ // write
+ Table table = catalog.getTable(tableIdent);
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(1, fromString("1"), fromString("1")));
+ write.write(GenericRow.of(2, fromString("2"), fromString("2")));
+ commit.commit(write.prepareCommit());
+ }
+
+ // repeat read
+ for (int i = 0; i < 5; i++) {
+ table = catalog.getTable(tableIdent);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan scan = readBuilder.newScan();
+ TableRead read = readBuilder.newRead();
+ read.createReader(scan.plan()).forEachRemaining(r -> {});
+
+ // delete manifest to validate cache
+ if (i == 0) {
+ Path manifestPath = new Path(table.options().get("path"),
"manifest");
+ assertThat(fileIO.exists(manifestPath)).isTrue();
+ fileIO.deleteDirectoryQuietly(manifestPath);
+ }
+ }
+ }
+
+ @Test
+ public void testManifestCacheOptions() {
+ Options options = new Options();
+
+ CachingCatalog caching = (CachingCatalog)
CachingCatalog.tryToCreate(catalog, options);
+ assertThat(caching.manifestCache.maxMemorySize())
+ .isEqualTo(CACHE_MANIFEST_SMALL_FILE_MEMORY.defaultValue());
+ assertThat(caching.manifestCache.maxElementSize())
+
.isEqualTo(CACHE_MANIFEST_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
+
+ options.set(CACHE_MANIFEST_SMALL_FILE_MEMORY,
MemorySize.ofMebiBytes(100));
+ options.set(CACHE_MANIFEST_SMALL_FILE_THRESHOLD,
MemorySize.ofBytes(100));
+ caching = (CachingCatalog) CachingCatalog.tryToCreate(catalog,
options);
+
assertThat(caching.manifestCache.maxMemorySize()).isEqualTo(MemorySize.ofMebiBytes(100));
+ assertThat(caching.manifestCache.maxElementSize()).isEqualTo(100);
+
+ options.set(CACHE_MANIFEST_MAX_MEMORY, MemorySize.ofMebiBytes(256));
+ caching = (CachingCatalog) CachingCatalog.tryToCreate(catalog,
options);
+
assertThat(caching.manifestCache.maxMemorySize()).isEqualTo(MemorySize.ofMebiBytes(256));
+
assertThat(caching.manifestCache.maxElementSize()).isEqualTo(Long.MAX_VALUE);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
index c393d3aff..159f5edae 100644
---
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -18,6 +18,7 @@
package org.apache.paimon.catalog;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.Table;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
@@ -35,7 +36,7 @@ public class TestableCachingCatalog extends CachingCatalog {
private final Duration cacheExpirationInterval;
public TestableCachingCatalog(Catalog catalog, Duration
expirationInterval, Ticker ticker) {
- super(catalog, expirationInterval, ticker);
+ super(catalog, expirationInterval, MemorySize.ZERO, Long.MAX_VALUE,
ticker);
this.cacheExpirationInterval = expirationInterval;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
index 58b259823..78d9ba17c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
@@ -47,7 +47,8 @@ public class IndexManifestFileHandlerTest {
fileStore.fileIO(),
fileStore.options().manifestFormat(),
"zstd",
- fileStore.pathFactory())
+ fileStore.pathFactory(),
+ null)
.create();
IndexManifestFileHandler indexManifestFileHandler =
new IndexManifestFileHandler(indexManifestFile,
BucketMode.BUCKET_UNAWARE);
@@ -81,7 +82,8 @@ public class IndexManifestFileHandlerTest {
fileStore.fileIO(),
fileStore.options().manifestFormat(),
"zstd",
- fileStore.pathFactory())
+ fileStore.pathFactory(),
+ null)
.create();
IndexManifestFileHandler indexManifestFileHandler =
new IndexManifestFileHandler(indexManifestFile,
BucketMode.HASH_FIXED);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
index be9d2a48c..8a4f0b061 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
@@ -44,8 +44,10 @@ public class ObjectsCacheTest {
Map<String, List<String>> map = new HashMap<>();
ObjectsCache<String, String> cache =
new ObjectsCache<>(
- new SegmentsCache<>(1024, MemorySize.ofKibiBytes(5)),
+ new SegmentsCache<>(1024, MemorySize.ofKibiBytes(5),
Long.MAX_VALUE),
new StringSerializer(),
+ RowType.of(DataTypes.STRING()),
+ k -> 1L,
(k, size) ->
CloseableIterator.adapterForIterator(
map.get(k).stream()