rizaon commented on code in PR #4518: URL: https://github.com/apache/iceberg/pull/4518#discussion_r861314579
########## core/src/main/java/org/apache/iceberg/io/CachingFileIO.java: ########## @@ -0,0 +1,404 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Weigher; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.Arrays; +import java.util.Map; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.hadoop.HadoopConfigurable; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SerializableSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of FileIO that adds metadata content caching features. + * <p> + * This FileIO intend to speedup scan planning by caching most of the table metadata contents in memory and reduce + * remote reads. + */ +public class CachingFileIO implements FileIO, HadoopConfigurable { + private static final Logger LOG = LoggerFactory.getLogger(CachingFileIO.class); + private static ContentCache sharedCache; + + private static ContentCache createCache(Map<String, String> properties) { + long durationMs = PropertyUtil.propertyAsLong(properties, + CatalogProperties.CONTENT_CACHE_EXPIRATION_INTERVAL_MS, + CatalogProperties.CONTENT_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT); + long totalBytes = PropertyUtil.propertyAsLong(properties, + CatalogProperties.CONTENT_CACHE_MAX_TOTAL_BYTES, + CatalogProperties.CONTENT_CACHE_MAX_TOTAL_BYTES_DEFAULT); + long contentLength = PropertyUtil.propertyAsLong(properties, + CatalogProperties.CONTENT_CACHE_MAX_CONTENT_LENGTH, + CatalogProperties.CONTENT_CACHE_MAX_CONTENT_LENGTH_DEFAULT); + + if (durationMs >= 0 && totalBytes > 0) { + return new ContentCache(durationMs, totalBytes, contentLength); + } else { + return null; + } + } + + public static synchronized ContentCache getSharedCache(Map<String, String> properties) { + if (sharedCache == null) { + sharedCache = createCache(properties); + } + return sharedCache; + } + + private SerializableSupplier<Configuration> hadoopConf; + private ContentCache fileContentCache; + private Map<String, String> properties; + private FileIO wrappedIO; + + /** + * Constructor used for dynamic FileIO loading. + * <p> + * {@link Configuration Hadoop configuration} must be set through {@link HadoopFileIO#setConf(Configuration)} + */ + public CachingFileIO() { + } + + public CachingFileIO(Configuration hadoopConf) { + this(new SerializableConfiguration(hadoopConf)::get); + } + + public CachingFileIO(SerializableSupplier<Configuration> hadoopConf) { + this.hadoopConf = hadoopConf; + } + + @Override + public void initialize(Map<String, String> newProperties) { + close(); // close and discard any previous FileIO instances + this.properties = newProperties; + initCache(); + initWrappedIO(); + } + + private void initCache() { + boolean isSharedCache = PropertyUtil.propertyAsBoolean(properties, + CatalogProperties.CONTENT_CACHE_SHARED, + CatalogProperties.CONTENT_CACHE_SHARED_DEFAULT); + if (isSharedCache) { + this.fileContentCache = getSharedCache(properties); + LOG.info("CachingFileIO created with shared cache. {} ", this.fileContentCache); + } else { + this.fileContentCache = createCache(properties); + LOG.info("CachingFileIO created. {} ", this.fileContentCache); + } + } + + public Configuration conf() { + return hadoopConf.get(); + } + + @Override + public void setConf(Configuration conf) { + this.hadoopConf = new SerializableConfiguration(conf)::get; + } + + @Override + public Configuration getConf() { + return hadoopConf.get(); + } + + @Override + public void serializeConfWith(Function<Configuration, SerializableSupplier<Configuration>> confSerializer) { + this.hadoopConf = confSerializer.apply(hadoopConf.get()); + } + + private void initWrappedIO() { + String wrappedIOImpl = PropertyUtil.propertyAsString(properties, + CatalogProperties.CONTENT_CACHE_WRAPPED_FILE_IO_IMPL, + CatalogProperties.CONTENT_CACHE_WRAPPED_FILE_IO_IMPL_DEFAULT); + this.wrappedIO = CatalogUtil.loadFileIO(wrappedIOImpl, properties, conf()); + } + + @Override + public void close() { + if (wrappedIO != null) { + wrappedIO.close(); + wrappedIO = null; + } + } + + @Override + public InputFile newInputFile(String path) { + InputFile inFile = wrappedIO.newInputFile(path); + if (fileContentCache != null && path.contains("/metadata/")) { + // TODO: Currently we just believe that this is a metadata file if it has "metadata" dir on its path. But metadata + // location can be set differently for different tables. A table might have different metadata path by + // setting custom "write.metadata.path" TableProperties. + // We might need to extend FileIO interface. Add method 'newMetadataInputFile' and let caller use it to + // differentiate metadata files vs other files. + return new CachingInputFile(fileContentCache, inFile); + } + return inFile; + } + + @Override + public OutputFile newOutputFile(String path) { + return wrappedIO.newOutputFile(path); + } + + @Override + public void deleteFile(String path) { + wrappedIO.deleteFile(path); + if (fileContentCache != null) { + fileContentCache.cache.invalidate(path); + } + } + + public ContentCache getCache() { + return fileContentCache; + } + + public long cacheEstimatedSize() { + return fileContentCache == null ? 0 : fileContentCache.cache.estimatedSize(); + } + + public static class ContentCache { + private final long expireAfterAccessMs; + private final long maxTotalBytes; + private final long maxContentLength; + private final Cache<String, byte[]> cache; + + private ContentCache(long expireAfterAccessMs, long maxTotalBytes, long maxContentLength) { + this.expireAfterAccessMs = expireAfterAccessMs; + this.maxTotalBytes = maxTotalBytes; + this.maxContentLength = maxContentLength; + + Caffeine<Object, Object> builder = Caffeine.newBuilder(); + if (expireAfterAccessMs > 0) { + builder = builder.expireAfterAccess(Duration.ofMillis(expireAfterAccessMs)); + } + + this.cache = builder.maximumWeight(maxTotalBytes) + .weigher((Weigher<String, byte[]>) (key, value) -> value.length) + .recordStats() + .build(); + } + + public long expireAfterAccess() { + return expireAfterAccessMs; + } + + public long maxContentLength() { + return maxContentLength; + } + + public long maxTotalBytes() { + return maxTotalBytes; + } + + public Cache<String, byte[]> cache() { + return cache; + } + + public CacheStats stats() { + return cache.stats(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + '{' + + "expireAfterAccessMs=" + expireAfterAccessMs + ", " + + "maxContentLength=" + maxContentLength + ", " + + "maxTotalBytes=" + maxTotalBytes + ", " + + "cacheStats=" + cache.stats() + '}'; + } + } + + private static class CachingInputFile implements InputFile { + private final ContentCache contentCache; + private final InputFile wrappedInputFile; + + private CachingInputFile(ContentCache cache, InputFile inFile) { + this.contentCache = cache; + this.wrappedInputFile = inFile; + } + + @Override + public long getLength() { + byte[] buf = contentCache.cache.getIfPresent(location()); + return (buf != null) ? buf.length : wrappedInputFile.getLength(); + } + + @Override + public SeekableInputStream newStream() { + try { + if (getLengthChecked() <= contentCache.maxContentLength()) { + return cachedStream(); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
