This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch tpc_preview4-external
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpc_preview4-external by this
push:
new 3d2a4d61cd1 Manifest cache for tpch1000 (#59178)
3d2a4d61cd1 is described below
commit 3d2a4d61cd111dcdd31bdd0dc0d652e1af405246
Author: Socrates <[email protected]>
AuthorDate: Fri Dec 19 10:53:50 2025 +0800
Manifest cache for tpch1000 (#59178)
---
be/src/clucene | 1 +
fe/check/checkstyle/suppressions.xml | 3 +
.../main/java/org/apache/doris/common/Config.java | 12 +
.../doris/datasource/ExternalMetaCacheMgr.java | 7 +
.../iceberg/IcebergManifestCacheMgr.java | 35 +
.../doris/datasource/iceberg/IcebergUtils.java | 8 +
.../iceberg/cache/ContentFileEstimater.java | 194 +++++
.../iceberg/cache/IcebergManifestCache.java | 96 +++
.../iceberg/cache/IcebergManifestCacheLoader.java | 89 ++
.../datasource/iceberg/cache/ManifestCacheKey.java | 58 ++
.../iceberg/cache/ManifestCacheValue.java | 65 ++
.../datasource/iceberg/source/IcebergScanNode.java | 146 +++-
.../metastore/AbstractIcebergProperties.java | 62 ++
.../java/org/apache/iceberg/DeleteFileIndex.java | 906 +++++++++++++++++++++
14 files changed, 1681 insertions(+), 1 deletion(-)
diff --git a/be/src/clucene b/be/src/clucene
new file mode 160000
index 00000000000..bb22247973e
--- /dev/null
+++ b/be/src/clucene
@@ -0,0 +1 @@
+Subproject commit bb22247973e55dcac9a3eaafedc57cc6c36d2fc3
diff --git a/fe/check/checkstyle/suppressions.xml
b/fe/check/checkstyle/suppressions.xml
index 8f000bb7616..7340c4c5bd5 100644
--- a/fe/check/checkstyle/suppressions.xml
+++ b/fe/check/checkstyle/suppressions.xml
@@ -69,6 +69,9 @@ under the License.
<!-- ignore hudi disk map copied from
hudi/common/util/collection/DiskMap.java -->
<suppress
files="org[\\/]apache[\\/]hudi[\\/]common[\\/]util[\\/]collection[\\/]DiskMap\.java"
checks="[a-zA-Z0-9]*"/>
+ <!-- ignore iceberg delete file index copied from
iceberg/DeleteFileIndex.java -->
+ <suppress files="org[\\/]apache[\\/]iceberg[\\/]DeleteFileIndex\.java"
checks="[a-zA-Z0-9]*"/>
+
<!-- ignore gensrc/thrift/ExternalTableSchema.thrift -->
<suppress files=".*thrift/schema/external/.*" checks=".*"/>
</suppressions>
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 350e34f8a90..d4426c2d515 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2298,6 +2298,18 @@ public class Config extends ConfigBase {
})
public static long external_cache_refresh_time_minutes = 10; // 10 mins
+ @ConfField(description = {"是否启用 Iceberg Manifest DataFile/DeleteFile 缓存。",
+ "Whether to enable Iceberg manifest DataFile/DeleteFile cache."})
+ public static boolean iceberg_manifest_cache_enable = true;
+
+ @ConfField(description = {"Iceberg Manifest 缓存的容量上限,单位 MB。",
+ "Iceberg manifest cache capacity in MB."})
+ public static long iceberg_manifest_cache_capacity_mb = 1024;
+
+ @ConfField(description = {"Iceberg Manifest 缓存的访问过期时间(秒),0 或负数表示不过期。",
+ "Iceberg manifest cache expire after access in seconds. 0 or
negative disables expiration."})
+ public static long iceberg_manifest_cache_ttl_sec = 48 * 60 * 60;
+
/**
* Github workflow test type, for setting some session variables
* only for certain test type. E.g. only settting batch_size to small
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index e777285a07f..798a2170b1e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -29,6 +29,7 @@ import
org.apache.doris.datasource.hudi.source.HudiCachedFsViewProcessor;
import org.apache.doris.datasource.hudi.source.HudiCachedMetaClientProcessor;
import org.apache.doris.datasource.hudi.source.HudiMetadataCacheMgr;
import org.apache.doris.datasource.hudi.source.HudiPartitionProcessor;
+import org.apache.doris.datasource.iceberg.IcebergManifestCacheMgr;
import org.apache.doris.datasource.iceberg.IcebergMetadataCache;
import org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
@@ -97,6 +98,7 @@ public class ExternalMetaCacheMgr {
private FileSystemCache fsCache;
// all external table row count cache.
private ExternalRowCountCache rowCountCache;
+ private final IcebergManifestCacheMgr icebergManifestCacheMgr;
private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
@@ -128,6 +130,7 @@ public class ExternalMetaCacheMgr {
rowCountCache = new ExternalRowCountCache(rowCountRefreshExecutor);
hudiMetadataCacheMgr = new HudiMetadataCacheMgr(commonRefreshExecutor);
+ icebergManifestCacheMgr = new IcebergManifestCacheMgr();
icebergMetadataCacheMgr = new
IcebergMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
paimonMetadataCacheMgr = new
PaimonMetadataCacheMgr(commonRefreshExecutor);
@@ -199,6 +202,10 @@ public class ExternalMetaCacheMgr {
return hudiMetadataCacheMgr;
}
+ public IcebergManifestCacheMgr getIcebergManifestCacheMgr() {
+ return icebergManifestCacheMgr;
+ }
+
public IcebergMetadataCache getIcebergMetadataCache() {
return icebergMetadataCacheMgr.getIcebergMetadataCache();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergManifestCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergManifestCacheMgr.java
new file mode 100644
index 00000000000..ad95e151b98
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergManifestCacheMgr.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg;
+
+import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
+
+/**
+ * Wrapper manager for Iceberg manifest cache.
+ */
+public class IcebergManifestCacheMgr {
+ private final IcebergManifestCache manifestCache;
+
+ public IcebergManifestCacheMgr() {
+ this.manifestCache = new IcebergManifestCache();
+ }
+
+ public IcebergManifestCache getManifestCache() {
+ return manifestCache;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 9587ca4f816..28ddf2817df 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -56,6 +56,7 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
import org.apache.doris.datasource.iceberg.source.IcebergTableQueryInfo;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccUtil;
@@ -1452,4 +1453,11 @@ public class IcebergUtils {
icebergExternalTable.getViewText();
}
+ public static IcebergManifestCache getManifestCache() {
+ return Env.getCurrentEnv()
+ .getExtMetaCacheMgr()
+ .getIcebergManifestCacheMgr()
+ .getManifestCache();
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimater.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimater.java
new file mode 100644
index 00000000000..43f60096e31
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ContentFileEstimater.java
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.StructLike;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility to estimate the JVM weight of Iceberg {@link ContentFile} objects.
+ */
+public final class ContentFileEstimater {
+ private static final long LIST_BASE_WEIGHT = 48L;
+ private static final long OBJECT_REFERENCE_WEIGHT = 8L;
+ private static final long CONTENT_FILE_BASE_WEIGHT = 256L;
+ private static final long STRING_BASE_WEIGHT = 40L;
+ private static final long CHAR_BYTES = 2L;
+ private static final long BYTE_BUFFER_BASE_WEIGHT = 16L;
+ private static final long MAP_BASE_WEIGHT = 48L;
+ private static final long MAP_ENTRY_OVERHEAD = 24L;
+ private static final long LONG_OBJECT_WEIGHT = 24L;
+ private static final long INT_OBJECT_WEIGHT = 16L;
+ private static final long PARTITION_BASE_WEIGHT = 48L;
+ private static final long PARTITION_VALUE_BASE_WEIGHT = 8L;
+
+ private ContentFileEstimater() {
+ }
+
+ public static long estimate(List<? extends ContentFile<?>> files) {
+ return listReferenceWeight(files) + estimateContentFilesWeight(files);
+ }
+
+ private static long listReferenceWeight(List<?> files) {
+ if (files == null || files.isEmpty()) {
+ return 0L;
+ }
+ return LIST_BASE_WEIGHT + (long) files.size() *
OBJECT_REFERENCE_WEIGHT;
+ }
+
+ private static long estimateContentFilesWeight(List<? extends
ContentFile<?>> files) {
+ long total = 0L;
+ if (files == null) {
+ return 0L;
+ }
+ for (ContentFile<?> file : files) {
+ total += estimateContentFileWeight(file);
+ }
+ return total;
+ }
+
+ private static long estimateContentFileWeight(ContentFile<?> file) {
+ if (file == null) {
+ return 0L;
+ }
+
+ long weight = CONTENT_FILE_BASE_WEIGHT;
+ weight += charSequenceWeight(file.path());
+ weight += stringWeight(file.manifestLocation());
+ weight += byteBufferWeight(file.keyMetadata());
+ weight += partitionWeight(file.partition());
+
+ weight += numericMapWeight(file.columnSizes());
+ weight += numericMapWeight(file.valueCounts());
+ weight += numericMapWeight(file.nullValueCounts());
+ weight += numericMapWeight(file.nanValueCounts());
+ weight += byteBufferMapWeight(file.lowerBounds());
+ weight += byteBufferMapWeight(file.upperBounds());
+
+ weight += listWeight(file.splitOffsets(), LONG_OBJECT_WEIGHT);
+ weight += listWeight(file.equalityFieldIds(), INT_OBJECT_WEIGHT);
+
+ weight += optionalLongWeight(file.pos());
+ weight += optionalLongWeight(file.dataSequenceNumber());
+ weight += optionalLongWeight(file.fileSequenceNumber());
+ weight += optionalLongWeight(file.firstRowId());
+ weight += optionalIntWeight(file.sortOrderId());
+
+ if (file instanceof DeleteFile) {
+ DeleteFile deleteFile = (DeleteFile) file;
+ weight += stringWeight(deleteFile.referencedDataFile());
+ weight += optionalLongWeight(deleteFile.contentOffset());
+ weight += optionalLongWeight(deleteFile.contentSizeInBytes());
+ }
+
+ return weight;
+ }
+
+ private static long listWeight(List<? extends Number> list, long
elementWeight) {
+ if (list == null || list.isEmpty()) {
+ return 0L;
+ }
+ return LIST_BASE_WEIGHT + (long) list.size() *
(OBJECT_REFERENCE_WEIGHT + elementWeight);
+ }
+
+ private static long numericMapWeight(Map<Integer, Long> map) {
+ if (map == null || map.isEmpty()) {
+ return 0L;
+ }
+ return MAP_BASE_WEIGHT + (long) map.size() * (MAP_ENTRY_OVERHEAD +
LONG_OBJECT_WEIGHT);
+ }
+
+ private static long byteBufferMapWeight(Map<Integer, ByteBuffer> map) {
+ if (map == null || map.isEmpty()) {
+ return 0L;
+ }
+ long weight = MAP_BASE_WEIGHT + (long) map.size() * MAP_ENTRY_OVERHEAD;
+ for (ByteBuffer buffer : map.values()) {
+ weight += byteBufferWeight(buffer);
+ }
+ return weight;
+ }
+
+ private static long partitionWeight(StructLike partition) {
+ if (partition == null) {
+ return 0L;
+ }
+ long weight = PARTITION_BASE_WEIGHT + (long) partition.size() *
PARTITION_VALUE_BASE_WEIGHT;
+ for (int i = 0; i < partition.size(); i++) {
+ Object value = partition.get(i, Object.class);
+ weight += estimateValueWeight(value);
+ }
+ return weight;
+ }
+
+ private static long estimateValueWeight(Object value) {
+ if (value == null) {
+ return 0L;
+ }
+ if (value instanceof CharSequence) {
+ return charSequenceWeight((CharSequence) value);
+ } else if (value instanceof byte[]) {
+ return BYTE_BUFFER_BASE_WEIGHT + ((byte[]) value).length;
+ } else if (value instanceof ByteBuffer) {
+ return byteBufferWeight((ByteBuffer) value);
+ } else if (value instanceof Long || value instanceof Double) {
+ return LONG_OBJECT_WEIGHT;
+ } else if (value instanceof Integer || value instanceof Float) {
+ return INT_OBJECT_WEIGHT;
+ } else if (value instanceof Short || value instanceof Character) {
+ return 4L;
+ } else if (value instanceof Boolean) {
+ return 1L;
+ }
+ return OBJECT_REFERENCE_WEIGHT;
+ }
+
+ private static long charSequenceWeight(CharSequence value) {
+ if (value == null) {
+ return 0L;
+ }
+ return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
+ }
+
+ private static long stringWeight(String value) {
+ if (value == null) {
+ return 0L;
+ }
+ return STRING_BASE_WEIGHT + (long) value.length() * CHAR_BYTES;
+ }
+
+ private static long byteBufferWeight(ByteBuffer buffer) {
+ if (buffer == null) {
+ return 0L;
+ }
+ return BYTE_BUFFER_BASE_WEIGHT + buffer.remaining();
+ }
+
+ private static long optionalLongWeight(Long value) {
+ return value == null ? 0L : LONG_OBJECT_WEIGHT;
+ }
+
+ private static long optionalIntWeight(Integer value) {
+ return value == null ? 0L : INT_OBJECT_WEIGHT;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
new file mode 100644
index 00000000000..be919c5d313
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCache.java
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.doris.common.Config;
+import org.apache.doris.datasource.CacheException;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+
+/**
+ * A lightweight manifest cache that stores parsed DataFile/DeleteFile lists
per manifest.
+ */
+public class IcebergManifestCache {
+ private static final Logger LOG =
LogManager.getLogger(IcebergManifestCache.class);
+
+ private final LoadingCache<ManifestCacheKey, ManifestCacheValue> cache;
+
+ public IcebergManifestCache() {
+ long capacityInBytes = Config.iceberg_manifest_cache_capacity_mb *
1024L * 1024L;
+ Weigher<ManifestCacheKey, ManifestCacheValue> weigher = (key, value)
-> {
+ long weight =
Optional.ofNullable(value).map(ManifestCacheValue::getWeightBytes).orElse(0L);
+ if (weight > Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ }
+ return (int) weight;
+ };
+ Caffeine<ManifestCacheKey, ManifestCacheValue> builder =
Caffeine.newBuilder()
+ .maximumWeight(capacityInBytes)
+ .weigher(weigher);
+ if (Config.iceberg_manifest_cache_ttl_sec > 0) {
+ builder =
builder.expireAfterAccess(Duration.ofSeconds(Config.iceberg_manifest_cache_ttl_sec));
+ }
+ cache = builder.build(new CacheLoader<ManifestCacheKey,
ManifestCacheValue>() {
+ @Override
+ public ManifestCacheValue load(ManifestCacheKey key) {
+ throw new CacheException("Manifest cache loader should be
provided explicitly for key %s", null, key);
+ }
+ });
+ }
+
+ public ManifestCacheValue get(ManifestCacheKey key,
Callable<ManifestCacheValue> loader) {
+ try {
+ return cache.get(key, ignored -> {
+ try {
+ return loader.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (Exception e) {
+ throw new CacheException("Failed to load manifest cache for key
%s", e, key);
+ }
+ }
+
+ public Optional<ManifestCacheValue> peek(ManifestCacheKey key) {
+ return Optional.ofNullable(cache.getIfPresent(key));
+ }
+
+ public void invalidateByPath(String path) {
+ cache.asMap().keySet().stream()
+ .filter(key -> key.getPath().equals(path))
+ .forEach(cache::invalidate);
+ }
+
+ public void invalidateAll() {
+ cache.invalidateAll();
+ }
+
+ public ManifestCacheKey buildKey(String path) {
+ return new ManifestCacheKey(path);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCacheLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCacheLoader.java
new file mode 100644
index 00000000000..dc4d16da61b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/IcebergManifestCacheLoader.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.doris.datasource.CacheException;
+
+import com.google.common.collect.Lists;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Helper to load manifest content and populate the manifest cache.
+ */
+public class IcebergManifestCacheLoader {
+ private static final Logger LOG =
LogManager.getLogger(IcebergManifestCacheLoader.class);
+
+ private IcebergManifestCacheLoader() {
+ }
+
+ public static ManifestCacheValue
loadDataFilesWithCache(IcebergManifestCache cache, ManifestFile manifest,
+ Table table) {
+ ManifestCacheKey key = buildKey(cache, manifest);
+ return cache.get(key, () -> loadDataFiles(manifest, table));
+ }
+
+ public static ManifestCacheValue
loadDeleteFilesWithCache(IcebergManifestCache cache, ManifestFile manifest,
+ Table table) {
+ ManifestCacheKey key = buildKey(cache, manifest);
+ return cache.get(key, () -> loadDeleteFiles(manifest, table));
+ }
+
+ private static ManifestCacheValue loadDataFiles(ManifestFile manifest,
Table table) {
+ List<DataFile> dataFiles = Lists.newArrayList();
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest,
table.io())) {
+ // ManifestReader implements CloseableIterable<DataFile>, iterate
directly
+ for (DataFile dataFile : reader) {
+ dataFiles.add(dataFile.copy());
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to read data manifest {}", manifest.path(), e);
+ throw new CacheException("Failed to read data manifest %s", e,
manifest.path());
+ }
+ return ManifestCacheValue.forDataFiles(dataFiles);
+ }
+
+ private static ManifestCacheValue loadDeleteFiles(ManifestFile manifest,
Table table) {
+ List<DeleteFile> deleteFiles = Lists.newArrayList();
+ try (ManifestReader<DeleteFile> reader =
ManifestFiles.readDeleteManifest(manifest, table.io(),
+ table.specs())) {
+ // ManifestReader implements CloseableIterable<DeleteFile>,
iterate directly
+ for (DeleteFile deleteFile : reader) {
+ deleteFiles.add(deleteFile.copy());
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to read delete manifest {}", manifest.path(), e);
+ throw new CacheException("Failed to read delete manifest %s", e,
manifest.path());
+ }
+ return ManifestCacheValue.forDeleteFiles(deleteFiles);
+ }
+
+ private static ManifestCacheKey buildKey(IcebergManifestCache cache,
ManifestFile manifest) {
+ // Iceberg manifest files are immutable, so path uniquely identifies a
manifest
+ return cache.buildKey(manifest.path());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheKey.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheKey.java
new file mode 100644
index 00000000000..41b52187aec
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheKey.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import java.util.Objects;
+
+/**
+ * Cache key for a single Iceberg manifest file.
+ * Since Iceberg manifest files are immutable, path uniquely identifies a
manifest.
+ */
+public class ManifestCacheKey {
+ private final String path;
+
+ public ManifestCacheKey(String path) {
+ this.path = path;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ManifestCacheKey)) {
+ return false;
+ }
+ ManifestCacheKey that = (ManifestCacheKey) o;
+ return Objects.equals(path, that.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path);
+ }
+
+ @Override
+ public String toString() {
+ return "ManifestCacheKey{path='" + path + "'}";
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
new file mode 100644
index 00000000000..0c7c9154639
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/cache/ManifestCacheValue.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.cache;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Cached manifest payload containing parsed files and an estimated weight.
+ */
+public class ManifestCacheValue {
+ private final List<DataFile> dataFiles;
+ private final List<DeleteFile> deleteFiles;
+ private final long weightBytes;
+
+ private ManifestCacheValue(List<DataFile> dataFiles, List<DeleteFile>
deleteFiles, long weightBytes) {
+ this.dataFiles = dataFiles == null ? Collections.emptyList() :
dataFiles;
+ this.deleteFiles = deleteFiles == null ? Collections.emptyList() :
deleteFiles;
+ this.weightBytes = weightBytes;
+ }
+
+ public static ManifestCacheValue forDataFiles(List<DataFile> dataFiles) {
+ return new ManifestCacheValue(dataFiles, Collections.emptyList(),
+ estimateWeight(dataFiles, Collections.emptyList()));
+ }
+
+ public static ManifestCacheValue forDeleteFiles(List<DeleteFile>
deleteFiles) {
+ return new ManifestCacheValue(Collections.emptyList(), deleteFiles,
+ estimateWeight(Collections.emptyList(), deleteFiles));
+ }
+
+ public List<DataFile> getDataFiles() {
+ return dataFiles;
+ }
+
+ public List<DeleteFile> getDeleteFiles() {
+ return deleteFiles;
+ }
+
+ public long getWeightBytes() {
+ return weightBytes;
+ }
+
+ private static long estimateWeight(List<DataFile> dataFiles,
List<DeleteFile> deleteFiles) {
+ return ContentFileEstimater.estimate(dataFiles) +
ContentFileEstimater.estimate(deleteFiles);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index f5208397a0f..0ffe86edb31 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -23,6 +23,7 @@ import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.security.authentication.ExecutionAuthenticator;
@@ -38,6 +39,9 @@ import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
+import org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
+import org.apache.doris.datasource.iceberg.cache.IcebergManifestCacheLoader;
+import org.apache.doris.datasource.iceberg.cache.ManifestCacheValue;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.planner.PlanNodeId;
@@ -57,18 +61,27 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.iceberg.BaseFileScanTask;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.DeleteFileIndex;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.types.Conversions;
@@ -78,9 +91,12 @@ import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
@@ -358,8 +374,136 @@ public class IcebergScanNode extends FileQueryScanNode {
}
private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) {
+ if (!Config.iceberg_manifest_cache_enable) {
+ long targetSplitSize = getRealFileSplitSize(0);
+ return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+ }
+ try {
+ return planFileScanTaskWithManifestCache(scan);
+ } catch (Exception e) {
+ LOG.warn("Plan with manifest cache failed, fallback to original
scan: {}", e.getMessage());
+ long targetSplitSize = getRealFileSplitSize(0);
+ return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+ }
+ }
+
+ private CloseableIterable<FileScanTask>
planFileScanTaskWithManifestCache(TableScan scan) throws IOException {
+ // Get the snapshot from the scan; return empty if no snapshot exists
+ Snapshot snapshot = scan.snapshot();
+ if (snapshot == null) {
+ return CloseableIterable.withNoopClose(Collections.emptyList());
+ }
+
+ // Initialize manifest cache for efficient manifest file access
+ IcebergManifestCache cache = IcebergUtils.getManifestCache();
+
+ // Convert query conjuncts to Iceberg filter expression
+ // This combines all predicates with AND logic for partition/file
pruning
+ Expression filterExpr = conjuncts.stream()
+ .map(conjunct -> IcebergUtils.convertToIcebergExpr(conjunct,
icebergTable.schema()))
+ .filter(Objects::nonNull)
+ .reduce(Expressions.alwaysTrue(), Expressions::and);
+
+ // Get all partition specs by their IDs for later use
+ Map<Integer, PartitionSpec> specsById = icebergTable.specs();
+ boolean caseSensitive = true;
+
+ // Create residual evaluators for each partition spec
+ // Residual evaluators compute the remaining filter expression after
partition pruning
+ Map<Integer, ResidualEvaluator> residualEvaluators = new HashMap<>();
+ specsById.forEach((id, spec) -> residualEvaluators.put(id,
+ ResidualEvaluator.of(spec, filterExpr == null ?
Expressions.alwaysTrue() : filterExpr,
+ caseSensitive)));
+
+ // Create metrics evaluator for file-level pruning based on column
statistics
+ InclusiveMetricsEvaluator metricsEvaluator = filterExpr == null ? null
+ : new InclusiveMetricsEvaluator(icebergTable.schema(),
filterExpr, caseSensitive);
+
+ // ========== Phase 1: Load delete files from delete manifests
==========
+ List<DeleteFile> deleteFiles = new ArrayList<>();
+ List<ManifestFile> deleteManifests =
snapshot.deleteManifests(icebergTable.io());
+ for (ManifestFile manifest : deleteManifests) {
+ // Skip non-delete manifests
+ if (manifest.content() != ManifestContent.DELETES) {
+ continue;
+ }
+ // Get the partition spec for this manifest
+ PartitionSpec spec = specsById.get(manifest.partitionSpecId());
+ if (spec == null) {
+ continue;
+ }
+ // Create manifest evaluator for partition-level pruning
+ ManifestEvaluator evaluator = filterExpr == null ? null
+ : ManifestEvaluator.forPartitionFilter(filterExpr, spec,
caseSensitive);
+ // Skip manifest if it doesn't match the filter expression
(partition pruning)
+ if (evaluator != null && !evaluator.eval(manifest)) {
+ continue;
+ }
+ // Load delete files from cache (or from storage if not cached)
+ ManifestCacheValue value =
IcebergManifestCacheLoader.loadDeleteFilesWithCache(cache, manifest,
+ icebergTable);
+ deleteFiles.addAll(value.getDeleteFiles());
+ }
+
+ // Build delete file index for efficient lookup of deletes applicable
to each data file
+ DeleteFileIndex deleteIndex = DeleteFileIndex.builderFor(deleteFiles)
+ .specsById(specsById)
+ .caseSensitive(caseSensitive)
+ .build();
+
+ // ========== Phase 2: Load data files and create scan tasks ==========
+ List<FileScanTask> tasks = new ArrayList<>();
+ try (CloseableIterable<ManifestFile> dataManifests =
+
IcebergUtils.getMatchingManifest(snapshot.dataManifests(icebergTable.io()),
+ specsById, filterExpr)) {
+ for (ManifestFile manifest : dataManifests) {
+ // Skip non-data manifests
+ if (manifest.content() != ManifestContent.DATA) {
+ continue;
+ }
+ // Get the partition spec for this manifest
+ PartitionSpec spec = specsById.get(manifest.partitionSpecId());
+ if (spec == null) {
+ continue;
+ }
+ // Get the residual evaluator for this partition spec
+ ResidualEvaluator residualEvaluator =
residualEvaluators.get(manifest.partitionSpecId());
+
+ // Load data files from cache (or from storage if not cached)
+ ManifestCacheValue value =
IcebergManifestCacheLoader.loadDataFilesWithCache(cache, manifest,
+ icebergTable);
+
+ // Process each data file in the manifest
+ for (org.apache.iceberg.DataFile dataFile :
value.getDataFiles()) {
+ // Skip file if column statistics indicate no matching
rows (metrics-based pruning)
+ if (metricsEvaluator != null &&
!metricsEvaluator.eval(dataFile)) {
+ continue;
+ }
+ // Skip file if partition values don't match the residual
filter
+ if (residualEvaluator != null) {
+ if
(residualEvaluator.residualFor(dataFile.partition()).equals(Expressions.alwaysFalse()))
{
+ continue;
+ }
+ }
+ // Find all delete files that apply to this data file
based on sequence number
+ List<DeleteFile> deletes = Arrays.asList(
+
deleteIndex.forDataFile(dataFile.dataSequenceNumber(), dataFile));
+
+ // Create a FileScanTask containing the data file,
associated deletes, and metadata
+ tasks.add(new BaseFileScanTask(
+ dataFile,
+ deletes.toArray(new DeleteFile[0]),
+ SchemaParser.toJson(icebergTable.schema()),
+ PartitionSpecParser.toJson(spec),
+ residualEvaluator == null ?
ResidualEvaluator.unpartitioned(Expressions.alwaysTrue())
+ : residualEvaluator));
+ }
+ }
+ }
+
+ // Split tasks into smaller chunks based on target split size for
parallel processing
long targetSplitSize = getRealFileSplitSize(0);
- return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize);
+ return
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks),
targetSplitSize);
}
private Split createIcebergSplit(FileScanTask fileScanTask) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
index 2cc829c8743..88def12d2a5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java
@@ -43,6 +43,43 @@ public abstract class AbstractIcebergProperties extends
MetastoreProperties {
)
protected String warehouse;
+ @Getter
+ @ConnectorProperty(
+ names = {CatalogProperties.IO_MANIFEST_CACHE_ENABLED},
+ required = false,
+ description = "Controls whether to use caching during manifest
reads or not. Default: false."
+ )
+ protected String ioManifestCacheEnabled;
+
+ @Getter
+ @ConnectorProperty(
+ names =
{CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS},
+ required = false,
+ description = "Controls the maximum duration for which an entry
stays in the manifest cache. "
+ + "Must be a non-negative value. Zero means entries expire
only due to memory pressure. "
+ + "Default: 60000 (60s)."
+ )
+ protected String ioManifestCacheExpirationIntervalMs;
+
+ @Getter
+ @ConnectorProperty(
+ names = {CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES},
+ required = false,
+ description = "Controls the maximum total amount of bytes to cache
in manifest cache. "
+ + "Must be a positive value. Default: 104857600 (100MB)."
+ )
+ protected String ioManifestCacheMaxTotalBytes;
+
+ @Getter
+ @ConnectorProperty(
+ names = {CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH},
+ required = false,
+ description = "Controls the maximum length of file to be
considered for caching. "
+ + "An InputFile will not be cached if the length is longer
than this limit. "
+ + "Must be a positive value. Default: 8388608 (8MB)."
+ )
+ protected String ioManifestCacheMaxContentLength;
+
@Getter
protected ExecutionAuthenticator executionAuthenticator = new
ExecutionAuthenticator(){};
@@ -80,6 +117,9 @@ public abstract class AbstractIcebergProperties extends
MetastoreProperties {
catalogProps.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
}
+ // Add manifest cache properties if configured
+ addManifestCacheProperties(catalogProps);
+
Catalog catalog = initCatalog(catalogName, catalogProps,
storagePropertiesList);
if (catalog == null) {
@@ -88,6 +128,28 @@ public abstract class AbstractIcebergProperties extends
MetastoreProperties {
return catalog;
}
+ /**
+ * Add manifest cache related properties to catalog properties.
+ * These properties control caching behavior during manifest reads.
+ *
+ * @param catalogProps the catalog properties map to add manifest cache
properties to
+ */
+ protected void addManifestCacheProperties(Map<String, String>
catalogProps) {
+ if (StringUtils.isNotBlank(ioManifestCacheEnabled)) {
+ catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_ENABLED,
ioManifestCacheEnabled);
+ }
+ if (StringUtils.isNotBlank(ioManifestCacheExpirationIntervalMs)) {
+
catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS,
+ ioManifestCacheExpirationIntervalMs);
+ }
+ if (StringUtils.isNotBlank(ioManifestCacheMaxTotalBytes)) {
+
catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES,
ioManifestCacheMaxTotalBytes);
+ }
+ if (StringUtils.isNotBlank(ioManifestCacheMaxContentLength)) {
+
catalogProps.put(CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH,
ioManifestCacheMaxContentLength);
+ }
+ }
+
/**
* Subclasses must implement this to create the concrete Catalog instance.
*/
diff --git a/fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
b/fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
new file mode 100644
index 00000000000..5c9cdd93c45
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -0,0 +1,906 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.ContentFileUtil;
+import org.apache.iceberg.util.PartitionMap;
+import org.apache.iceberg.util.PartitionSet;
+import org.apache.iceberg.util.Tasks;
+
+/**
+ * An index of {@link DeleteFile delete files} by sequence number.
+ *
+ * <p>Use {@link #builderFor(FileIO, Iterable)} to construct an index, and
{@link #forDataFile(long,
+ * DataFile)} or {@link #forEntry(ManifestEntry)} to get the delete files to
apply to a given data
+ * file.
+ *
+ * Copyed from
https://github.com/apache/iceberg/blob/apache-iceberg-1.9.1/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+ * Change DeleteFileIndex and some methods to public.
+ */
+public class DeleteFileIndex {
+ private static final DeleteFile[] EMPTY_DELETES = new DeleteFile[0];
+
+ private final EqualityDeletes globalDeletes;
+ private final PartitionMap<EqualityDeletes> eqDeletesByPartition;
+ private final PartitionMap<PositionDeletes> posDeletesByPartition;
+ private final Map<String, PositionDeletes> posDeletesByPath;
+ private final Map<String, DeleteFile> dvByPath;
+ private final boolean hasEqDeletes;
+ private final boolean hasPosDeletes;
+ private final boolean isEmpty;
+
+ private DeleteFileIndex(
+ EqualityDeletes globalDeletes,
+ PartitionMap<EqualityDeletes> eqDeletesByPartition,
+ PartitionMap<PositionDeletes> posDeletesByPartition,
+ Map<String, PositionDeletes> posDeletesByPath,
+ Map<String, DeleteFile> dvByPath) {
+ this.globalDeletes = globalDeletes;
+ this.eqDeletesByPartition = eqDeletesByPartition;
+ this.posDeletesByPartition = posDeletesByPartition;
+ this.posDeletesByPath = posDeletesByPath;
+ this.dvByPath = dvByPath;
+ this.hasEqDeletes = globalDeletes != null || eqDeletesByPartition != null;
+ this.hasPosDeletes =
+ posDeletesByPartition != null || posDeletesByPath != null || dvByPath
!= null;
+ this.isEmpty = !hasEqDeletes && !hasPosDeletes;
+ }
+
+ public boolean isEmpty() {
+ return isEmpty;
+ }
+
+ public boolean hasEqualityDeletes() {
+ return hasEqDeletes;
+ }
+
+ public boolean hasPositionDeletes() {
+ return hasPosDeletes;
+ }
+
+ public Iterable<DeleteFile> referencedDeleteFiles() {
+ Iterable<DeleteFile> deleteFiles = Collections.emptyList();
+
+ if (globalDeletes != null) {
+ deleteFiles = Iterables.concat(deleteFiles,
globalDeletes.referencedDeleteFiles());
+ }
+
+ if (eqDeletesByPartition != null) {
+ for (EqualityDeletes deletes : eqDeletesByPartition.values()) {
+ deleteFiles = Iterables.concat(deleteFiles,
deletes.referencedDeleteFiles());
+ }
+ }
+
+ if (posDeletesByPartition != null) {
+ for (PositionDeletes deletes : posDeletesByPartition.values()) {
+ deleteFiles = Iterables.concat(deleteFiles,
deletes.referencedDeleteFiles());
+ }
+ }
+
+ if (posDeletesByPath != null) {
+ for (PositionDeletes deletes : posDeletesByPath.values()) {
+ deleteFiles = Iterables.concat(deleteFiles,
deletes.referencedDeleteFiles());
+ }
+ }
+
+ if (dvByPath != null) {
+ deleteFiles = Iterables.concat(deleteFiles, dvByPath.values());
+ }
+
+ return deleteFiles;
+ }
+
+ DeleteFile[] forEntry(ManifestEntry<DataFile> entry) {
+ return forDataFile(entry.dataSequenceNumber(), entry.file());
+ }
+
+ public DeleteFile[] forDataFile(DataFile file) {
+ return forDataFile(file.dataSequenceNumber(), file);
+ }
+
+ public DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
+ if (isEmpty) {
+ return EMPTY_DELETES;
+ }
+
+ DeleteFile[] global = findGlobalDeletes(sequenceNumber, file);
+ DeleteFile[] eqPartition = findEqPartitionDeletes(sequenceNumber, file);
+ DeleteFile dv = findDV(sequenceNumber, file);
+ if (dv != null && global == null && eqPartition == null) {
+ return new DeleteFile[] {dv};
+ } else if (dv != null) {
+ return concat(global, eqPartition, new DeleteFile[] {dv});
+ } else {
+ DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber,
file);
+ DeleteFile[] posPath = findPathDeletes(sequenceNumber, file);
+ return concat(global, eqPartition, posPartition, posPath);
+ }
+ }
+
+ private DeleteFile[] findGlobalDeletes(long seq, DataFile dataFile) {
+ return globalDeletes == null ? EMPTY_DELETES : globalDeletes.filter(seq,
dataFile);
+ }
+
+ private DeleteFile[] findPosPartitionDeletes(long seq, DataFile dataFile) {
+ if (posDeletesByPartition == null) {
+ return EMPTY_DELETES;
+ }
+
+ PositionDeletes deletes = posDeletesByPartition.get(dataFile.specId(),
dataFile.partition());
+ return deletes == null ? EMPTY_DELETES : deletes.filter(seq);
+ }
+
+ private DeleteFile[] findEqPartitionDeletes(long seq, DataFile dataFile) {
+ if (eqDeletesByPartition == null) {
+ return EMPTY_DELETES;
+ }
+
+ EqualityDeletes deletes = eqDeletesByPartition.get(dataFile.specId(),
dataFile.partition());
+ return deletes == null ? EMPTY_DELETES : deletes.filter(seq, dataFile);
+ }
+
+ @SuppressWarnings("CollectionUndefinedEquality")
+ private DeleteFile[] findPathDeletes(long seq, DataFile dataFile) {
+ if (posDeletesByPath == null) {
+ return EMPTY_DELETES;
+ }
+
+ PositionDeletes deletes = posDeletesByPath.get(dataFile.location());
+ return deletes == null ? EMPTY_DELETES : deletes.filter(seq);
+ }
+
+ private DeleteFile findDV(long seq, DataFile dataFile) {
+ if (dvByPath == null) {
+ return null;
+ }
+
+ DeleteFile dv = dvByPath.get(dataFile.location());
+ if (dv != null) {
+ ValidationException.check(
+ dv.dataSequenceNumber() >= seq,
+ "DV data sequence number (%s) must be greater than or equal to data
file sequence number (%s)",
+ dv.dataSequenceNumber(),
+ seq);
+ }
+ return dv;
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ private static boolean canContainEqDeletesForFile(
+ DataFile dataFile, EqualityDeleteFile deleteFile) {
+ Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
+ Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
+
+ // whether to check data ranges or to assume that the ranges match
+ // if upper/lower bounds are missing, null counts may still be used to
determine delete files
+ // can be skipped
+ boolean checkRanges =
+ dataLowers != null && dataUppers != null &&
deleteFile.hasLowerAndUpperBounds();
+
+ Map<Integer, Long> dataNullCounts = dataFile.nullValueCounts();
+ Map<Integer, Long> dataValueCounts = dataFile.valueCounts();
+ Map<Integer, Long> deleteNullCounts = deleteFile.nullValueCounts();
+ Map<Integer, Long> deleteValueCounts = deleteFile.valueCounts();
+
+ for (Types.NestedField field : deleteFile.equalityFields()) {
+ if (!field.type().isPrimitiveType()) {
+ // stats are not kept for nested types. assume that the delete file
may match
+ continue;
+ }
+
+ if (containsNull(dataNullCounts, field) &&
containsNull(deleteNullCounts, field)) {
+ // the data has null values and null has been deleted, so the deletes
must be applied
+ continue;
+ }
+
+ if (allNull(dataNullCounts, dataValueCounts, field) &&
allNonNull(deleteNullCounts, field)) {
+ // the data file contains only null values for this field, but there
are no deletes for null
+ // values
+ return false;
+ }
+
+ if (allNull(deleteNullCounts, deleteValueCounts, field)
+ && allNonNull(dataNullCounts, field)) {
+ // the delete file removes only null rows with null for this field,
but there are no data
+ // rows with null
+ return false;
+ }
+
+ if (!checkRanges) {
+ // some upper and lower bounds are missing, assume they match
+ continue;
+ }
+
+ int id = field.fieldId();
+ ByteBuffer dataLower = dataLowers.get(id);
+ ByteBuffer dataUpper = dataUppers.get(id);
+ Object deleteLower = deleteFile.lowerBound(id);
+ Object deleteUpper = deleteFile.upperBound(id);
+ if (dataLower == null || dataUpper == null || deleteLower == null ||
deleteUpper == null) {
+ // at least one bound is not known, assume the delete file may match
+ continue;
+ }
+
+ if (!rangesOverlap(field, dataLower, dataUpper, deleteLower,
deleteUpper)) {
+ // no values overlap between the data file and the deletes
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private static <T> boolean rangesOverlap(
+ Types.NestedField field,
+ ByteBuffer dataLowerBuf,
+ ByteBuffer dataUpperBuf,
+ T deleteLower,
+ T deleteUpper) {
+ Type.PrimitiveType type = field.type().asPrimitiveType();
+ Comparator<T> comparator = Comparators.forType(type);
+
+ T dataLower = Conversions.fromByteBuffer(type, dataLowerBuf);
+ if (comparator.compare(dataLower, deleteUpper) > 0) {
+ return false;
+ }
+
+ T dataUpper = Conversions.fromByteBuffer(type, dataUpperBuf);
+ if (comparator.compare(deleteLower, dataUpper) > 0) {
+ return false;
+ }
+
+ return true;
+ }
+
+ private static boolean allNonNull(Map<Integer, Long> nullValueCounts,
Types.NestedField field) {
+ if (field.isRequired()) {
+ return true;
+ }
+
+ if (nullValueCounts == null) {
+ return false;
+ }
+
+ Long nullValueCount = nullValueCounts.get(field.fieldId());
+ if (nullValueCount == null) {
+ return false;
+ }
+
+ return nullValueCount <= 0;
+ }
+
+ private static boolean allNull(
+ Map<Integer, Long> nullValueCounts, Map<Integer, Long> valueCounts,
Types.NestedField field) {
+ if (field.isRequired()) {
+ return false;
+ }
+
+ if (nullValueCounts == null || valueCounts == null) {
+ return false;
+ }
+
+ Long nullValueCount = nullValueCounts.get(field.fieldId());
+ Long valueCount = valueCounts.get(field.fieldId());
+ if (nullValueCount == null || valueCount == null) {
+ return false;
+ }
+
+ return nullValueCount.equals(valueCount);
+ }
+
+ private static boolean containsNull(Map<Integer, Long> nullValueCounts,
Types.NestedField field) {
+ if (field.isRequired()) {
+ return false;
+ }
+
+ if (nullValueCounts == null) {
+ return true;
+ }
+
+ Long nullValueCount = nullValueCounts.get(field.fieldId());
+ if (nullValueCount == null) {
+ return true;
+ }
+
+ return nullValueCount > 0;
+ }
+
+ static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests)
{
+ return new Builder(io, Sets.newHashSet(deleteManifests));
+ }
+
+ // changed to public method.
+ public static Builder builderFor(Iterable<DeleteFile> deleteFiles) {
+ return new Builder(deleteFiles);
+ }
+
+ // changed to public class.
+ public static class Builder {
+ private final FileIO io;
+ private final Set<ManifestFile> deleteManifests;
+ private final Iterable<DeleteFile> deleteFiles;
+ private long minSequenceNumber = 0L;
+ private Map<Integer, PartitionSpec> specsById = null;
+ private Expression dataFilter = Expressions.alwaysTrue();
+ private Expression partitionFilter = Expressions.alwaysTrue();
+ private PartitionSet partitionSet = null;
+ private boolean caseSensitive = true;
+ private ExecutorService executorService = null;
+ private ScanMetrics scanMetrics = ScanMetrics.noop();
+ private boolean ignoreResiduals = false;
+
+ Builder(FileIO io, Set<ManifestFile> deleteManifests) {
+ this.io = io;
+ this.deleteManifests = Sets.newHashSet(deleteManifests);
+ this.deleteFiles = null;
+ }
+
+ Builder(Iterable<DeleteFile> deleteFiles) {
+ this.io = null;
+ this.deleteManifests = null;
+ this.deleteFiles = deleteFiles;
+ }
+
+ Builder afterSequenceNumber(long seq) {
+ this.minSequenceNumber = seq;
+ return this;
+ }
+
+ public Builder specsById(Map<Integer, PartitionSpec> newSpecsById) {
+ this.specsById = newSpecsById;
+ return this;
+ }
+
+ Builder filterData(Expression newDataFilter) {
+ Preconditions.checkArgument(
+ deleteFiles == null, "Index constructed from files does not support
data filters");
+ this.dataFilter = Expressions.and(dataFilter, newDataFilter);
+ return this;
+ }
+
+ Builder filterPartitions(Expression newPartitionFilter) {
+ Preconditions.checkArgument(
+ deleteFiles == null, "Index constructed from files does not support
partition filters");
+ this.partitionFilter = Expressions.and(partitionFilter,
newPartitionFilter);
+ return this;
+ }
+
+ Builder filterPartitions(PartitionSet newPartitionSet) {
+ Preconditions.checkArgument(
+ deleteFiles == null, "Index constructed from files does not support
partition filters");
+ this.partitionSet = newPartitionSet;
+ return this;
+ }
+
+ public Builder caseSensitive(boolean newCaseSensitive) {
+ this.caseSensitive = newCaseSensitive;
+ return this;
+ }
+
+ Builder planWith(ExecutorService newExecutorService) {
+ this.executorService = newExecutorService;
+ return this;
+ }
+
+ Builder scanMetrics(ScanMetrics newScanMetrics) {
+ this.scanMetrics = newScanMetrics;
+ return this;
+ }
+
+ Builder ignoreResiduals() {
+ this.ignoreResiduals = true;
+ return this;
+ }
+
+ private Iterable<DeleteFile> filterDeleteFiles() {
+ return Iterables.filter(deleteFiles, file -> file.dataSequenceNumber() >
minSequenceNumber);
+ }
+
+ private Collection<DeleteFile> loadDeleteFiles() {
+ // read all of the matching delete manifests in parallel and accumulate
the matching files in
+ // a queue
+ Queue<DeleteFile> files = new ConcurrentLinkedQueue<>();
+ Tasks.foreach(deleteManifestReaders())
+ .stopOnFailure()
+ .throwFailureWhenFinished()
+ .executeWith(executorService)
+ .run(
+ deleteFile -> {
+ try (CloseableIterable<ManifestEntry<DeleteFile>> reader =
deleteFile) {
+ for (ManifestEntry<DeleteFile> entry : reader) {
+ if (entry.dataSequenceNumber() > minSequenceNumber) {
+ // copy with stats for better filtering against data
file stats
+ files.add(entry.file().copy());
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to close");
+ }
+ });
+ return files;
+ }
+
+ public DeleteFileIndex build() {
+ Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() :
loadDeleteFiles();
+
+ EqualityDeletes globalDeletes = new EqualityDeletes();
+ PartitionMap<EqualityDeletes> eqDeletesByPartition =
PartitionMap.create(specsById);
+ PartitionMap<PositionDeletes> posDeletesByPartition =
PartitionMap.create(specsById);
+ Map<String, PositionDeletes> posDeletesByPath = Maps.newHashMap();
+ Map<String, DeleteFile> dvByPath = Maps.newHashMap();
+
+ for (DeleteFile file : files) {
+ switch (file.content()) {
+ case POSITION_DELETES:
+ if (ContentFileUtil.isDV(file)) {
+ add(dvByPath, file);
+ } else {
+ add(posDeletesByPath, posDeletesByPartition, file);
+ }
+ break;
+ case EQUALITY_DELETES:
+ add(globalDeletes, eqDeletesByPartition, file);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported content: " +
file.content());
+ }
+ ScanMetricsUtil.indexedDeleteFile(scanMetrics, file);
+ }
+
+ return new DeleteFileIndex(
+ globalDeletes.isEmpty() ? null : globalDeletes,
+ eqDeletesByPartition.isEmpty() ? null : eqDeletesByPartition,
+ posDeletesByPartition.isEmpty() ? null : posDeletesByPartition,
+ posDeletesByPath.isEmpty() ? null : posDeletesByPath,
+ dvByPath.isEmpty() ? null : dvByPath);
+ }
+
+ private void add(Map<String, DeleteFile> dvByPath, DeleteFile dv) {
+ String path = dv.referencedDataFile();
+ DeleteFile existingDV = dvByPath.putIfAbsent(path, dv);
+ if (existingDV != null) {
+ throw new ValidationException(
+ "Can't index multiple DVs for %s: %s and %s",
+ path, ContentFileUtil.dvDesc(dv),
ContentFileUtil.dvDesc(existingDV));
+ }
+ }
+
+ private void add(
+ Map<String, PositionDeletes> deletesByPath,
+ PartitionMap<PositionDeletes> deletesByPartition,
+ DeleteFile file) {
+ String path = ContentFileUtil.referencedDataFileLocation(file);
+
+ PositionDeletes deletes;
+ if (path != null) {
+ deletes = deletesByPath.computeIfAbsent(path, ignored -> new
PositionDeletes());
+ } else {
+ int specId = file.specId();
+ StructLike partition = file.partition();
+ deletes = deletesByPartition.computeIfAbsent(specId, partition,
PositionDeletes::new);
+ }
+
+ deletes.add(file);
+ }
+
+ private void add(
+ EqualityDeletes globalDeletes,
+ PartitionMap<EqualityDeletes> deletesByPartition,
+ DeleteFile file) {
+ PartitionSpec spec = specsById.get(file.specId());
+
+ EqualityDeletes deletes;
+ if (spec.isUnpartitioned()) {
+ deletes = globalDeletes;
+ } else {
+ int specId = spec.specId();
+ StructLike partition = file.partition();
+ deletes = deletesByPartition.computeIfAbsent(specId, partition,
EqualityDeletes::new);
+ }
+
+ deletes.add(spec, file);
+ }
+
+ private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>>
deleteManifestReaders() {
+ Expression entryFilter = ignoreResiduals ? Expressions.alwaysTrue() :
dataFilter;
+
+ LoadingCache<Integer, Expression> partExprCache =
+ specsById == null
+ ? null
+ : Caffeine.newBuilder()
+ .build(
+ specId -> {
+ PartitionSpec spec = specsById.get(specId);
+ return Projections.inclusive(spec,
caseSensitive).project(dataFilter);
+ });
+
+ LoadingCache<Integer, ManifestEvaluator> evalCache =
+ specsById == null
+ ? null
+ : Caffeine.newBuilder()
+ .build(
+ specId -> {
+ PartitionSpec spec = specsById.get(specId);
+ return ManifestEvaluator.forPartitionFilter(
+ Expressions.and(partitionFilter,
partExprCache.get(specId)),
+ spec,
+ caseSensitive);
+ });
+
+ CloseableIterable<ManifestFile> closeableDeleteManifests =
+ CloseableIterable.withNoopClose(deleteManifests);
+ CloseableIterable<ManifestFile> matchingManifests =
+ evalCache == null
+ ? closeableDeleteManifests
+ : CloseableIterable.filter(
+ scanMetrics.skippedDeleteManifests(),
+ closeableDeleteManifests,
+ manifest ->
+ manifest.content() == ManifestContent.DELETES
+ && (manifest.hasAddedFiles() ||
manifest.hasExistingFiles())
+ &&
evalCache.get(manifest.partitionSpecId()).eval(manifest));
+
+ matchingManifests =
+ CloseableIterable.count(scanMetrics.scannedDeleteManifests(),
matchingManifests);
+ return Iterables.transform(
+ matchingManifests,
+ manifest ->
+ ManifestFiles.readDeleteManifest(manifest, io, specsById)
+ .filterRows(entryFilter)
+ .filterPartitions(
+ Expressions.and(
+ partitionFilter,
partExprCache.get(manifest.partitionSpecId())))
+ .filterPartitions(partitionSet)
+ .caseSensitive(caseSensitive)
+ .scanMetrics(scanMetrics)
+ .liveEntries());
+ }
+ }
+
+ /**
+ * Finds an index in the sorted array of sequence numbers where the given
sequence number should
+ * be inserted or is found.
+ *
+ * <p>If the sequence number is present in the array, this method returns
the index of the first
+ * occurrence of the sequence number. If the sequence number is not present,
the method returns
+ * the index where the sequence number would be inserted while maintaining
the sorted order of the
+ * array. This returned index ranges from 0 (inclusive) to the length of the
array (inclusive).
+ *
+ * <p>This method is used to determine the subset of delete files that apply
to a given data file.
+ *
+ * @param seqs an array of sequence numbers sorted in ascending order
+ * @param seq the sequence number to search for
+ * @return the index of the first occurrence or the insertion point
+ */
+ private static int findStartIndex(long[] seqs, long seq) {
+ int pos = Arrays.binarySearch(seqs, seq);
+ int start;
+ if (pos < 0) {
+ // the sequence number was not found, where it would be inserted is
-(pos + 1)
+ start = -(pos + 1);
+ } else {
+ // the sequence number was found, but may not be the first
+ // find the first delete file with the given sequence number by
decrementing the position
+ start = pos;
+ while (start > 0 && seqs[start - 1] >= seq) {
+ start -= 1;
+ }
+ }
+
+ return start;
+ }
+
+ private static DeleteFile[] concat(DeleteFile[]... deletes) {
+ return ArrayUtil.concat(DeleteFile.class, deletes);
+ }
+
+ // a group of position delete files sorted by the sequence number they apply
to
+ static class PositionDeletes {
+ private static final Comparator<DeleteFile> SEQ_COMPARATOR =
+ Comparator.comparingLong(DeleteFile::dataSequenceNumber);
+
+ // indexed state
+ private long[] seqs = null;
+ private DeleteFile[] files = null;
+
+ // a buffer that is used to hold files before indexing
+ private volatile List<DeleteFile> buffer = Lists.newArrayList();
+
+ public void add(DeleteFile file) {
+ Preconditions.checkState(buffer != null, "Can't add files upon
indexing");
+ buffer.add(file);
+ }
+
+ public DeleteFile[] filter(long seq) {
+ indexIfNeeded();
+
+ int start = findStartIndex(seqs, seq);
+
+ if (start >= files.length) {
+ return EMPTY_DELETES;
+ }
+
+ if (start == 0) {
+ return files;
+ }
+
+ int matchingFilesCount = files.length - start;
+ DeleteFile[] matchingFiles = new DeleteFile[matchingFilesCount];
+ System.arraycopy(files, start, matchingFiles, 0, matchingFilesCount);
+ return matchingFiles;
+ }
+
+ public Iterable<DeleteFile> referencedDeleteFiles() {
+ indexIfNeeded();
+ return Arrays.asList(files);
+ }
+
+ public boolean isEmpty() {
+ indexIfNeeded();
+ return files.length == 0;
+ }
+
+ private void indexIfNeeded() {
+ if (buffer != null) {
+ synchronized (this) {
+ if (buffer != null) {
+ this.files = indexFiles(buffer);
+ this.seqs = indexSeqs(files);
+ this.buffer = null;
+ }
+ }
+ }
+ }
+
+ private static DeleteFile[] indexFiles(List<DeleteFile> list) {
+ DeleteFile[] array = list.toArray(EMPTY_DELETES);
+ Arrays.sort(array, SEQ_COMPARATOR);
+ return array;
+ }
+
+ private static long[] indexSeqs(DeleteFile[] files) {
+ long[] seqs = new long[files.length];
+
+ for (int index = 0; index < files.length; index++) {
+ seqs[index] = files[index].dataSequenceNumber();
+ }
+
+ return seqs;
+ }
+ }
+
+ // a group of equality delete files sorted by the sequence number they apply
to
+ static class EqualityDeletes {
+ private static final Comparator<EqualityDeleteFile> SEQ_COMPARATOR =
+ Comparator.comparingLong(EqualityDeleteFile::applySequenceNumber);
+ private static final EqualityDeleteFile[] EMPTY_EQUALITY_DELETES = new
EqualityDeleteFile[0];
+
+ // indexed state
+ private long[] seqs = null;
+ private EqualityDeleteFile[] files = null;
+
+ // a buffer that is used to hold files before indexing
+ private volatile List<EqualityDeleteFile> buffer = Lists.newArrayList();
+
+ public void add(PartitionSpec spec, DeleteFile file) {
+ Preconditions.checkState(buffer != null, "Can't add files upon
indexing");
+ buffer.add(new EqualityDeleteFile(spec, file));
+ }
+
+ public DeleteFile[] filter(long seq, DataFile dataFile) {
+ indexIfNeeded();
+
+ int start = findStartIndex(seqs, seq);
+
+ if (start >= files.length) {
+ return EMPTY_DELETES;
+ }
+
+ List<DeleteFile> matchingFiles = Lists.newArrayList();
+
+ for (int index = start; index < files.length; index++) {
+ EqualityDeleteFile file = files[index];
+ if (canContainEqDeletesForFile(dataFile, file)) {
+ matchingFiles.add(file.wrapped());
+ }
+ }
+
+ return matchingFiles.toArray(EMPTY_DELETES);
+ }
+
+ public Iterable<DeleteFile> referencedDeleteFiles() {
+ indexIfNeeded();
+ return Iterables.transform(Arrays.asList(files),
EqualityDeleteFile::wrapped);
+ }
+
+ public boolean isEmpty() {
+ indexIfNeeded();
+ return files.length == 0;
+ }
+
+ private void indexIfNeeded() {
+ if (buffer != null) {
+ synchronized (this) {
+ if (buffer != null) {
+ this.files = indexFiles(buffer);
+ this.seqs = indexSeqs(files);
+ this.buffer = null;
+ }
+ }
+ }
+ }
+
+ private static EqualityDeleteFile[] indexFiles(List<EqualityDeleteFile>
list) {
+ EqualityDeleteFile[] array = list.toArray(EMPTY_EQUALITY_DELETES);
+ Arrays.sort(array, SEQ_COMPARATOR);
+ return array;
+ }
+
+ private static long[] indexSeqs(EqualityDeleteFile[] files) {
+ long[] seqs = new long[files.length];
+
+ for (int index = 0; index < files.length; index++) {
+ seqs[index] = files[index].applySequenceNumber();
+ }
+
+ return seqs;
+ }
+ }
+
+ // an equality delete file wrapper that caches the converted boundaries for
faster boundary checks
+ // this class is not meant to be exposed beyond the delete file index
+ private static class EqualityDeleteFile {
+ private final PartitionSpec spec;
+ private final DeleteFile wrapped;
+ private final long applySequenceNumber;
+ private volatile List<Types.NestedField> equalityFields = null;
+ private volatile Map<Integer, Object> convertedLowerBounds = null;
+ private volatile Map<Integer, Object> convertedUpperBounds = null;
+
+ EqualityDeleteFile(PartitionSpec spec, DeleteFile file) {
+ this.spec = spec;
+ this.wrapped = file;
+ this.applySequenceNumber = wrapped.dataSequenceNumber() - 1;
+ }
+
+ public DeleteFile wrapped() {
+ return wrapped;
+ }
+
+ public long applySequenceNumber() {
+ return applySequenceNumber;
+ }
+
+ public List<Types.NestedField> equalityFields() {
+ if (equalityFields == null) {
+ synchronized (this) {
+ if (equalityFields == null) {
+ List<Types.NestedField> fields = Lists.newArrayList();
+ for (int id : wrapped.equalityFieldIds()) {
+ Types.NestedField field = spec.schema().findField(id);
+ fields.add(field);
+ }
+ this.equalityFields = fields;
+ }
+ }
+ }
+
+ return equalityFields;
+ }
+
+ public Map<Integer, Long> valueCounts() {
+ return wrapped.valueCounts();
+ }
+
+ public Map<Integer, Long> nullValueCounts() {
+ return wrapped.nullValueCounts();
+ }
+
+ public boolean hasLowerAndUpperBounds() {
+ return wrapped.lowerBounds() != null && wrapped.upperBounds() != null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T lowerBound(int id) {
+ return (T) lowerBounds().get(id);
+ }
+
+ private Map<Integer, Object> lowerBounds() {
+ if (convertedLowerBounds == null) {
+ synchronized (this) {
+ if (convertedLowerBounds == null) {
+ this.convertedLowerBounds = convertBounds(wrapped.lowerBounds());
+ }
+ }
+ }
+
+ return convertedLowerBounds;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T upperBound(int id) {
+ return (T) upperBounds().get(id);
+ }
+
+ private Map<Integer, Object> upperBounds() {
+ if (convertedUpperBounds == null) {
+ synchronized (this) {
+ if (convertedUpperBounds == null) {
+ this.convertedUpperBounds = convertBounds(wrapped.upperBounds());
+ }
+ }
+ }
+
+ return convertedUpperBounds;
+ }
+
+ private Map<Integer, Object> convertBounds(Map<Integer, ByteBuffer>
bounds) {
+ Map<Integer, Object> converted = Maps.newHashMap();
+
+ if (bounds != null) {
+ for (Types.NestedField field : equalityFields()) {
+ int id = field.fieldId();
+ Type type = spec.schema().findField(id).type();
+ if (type.isPrimitiveType()) {
+ ByteBuffer bound = bounds.get(id);
+ if (bound != null) {
+ converted.put(id, Conversions.fromByteBuffer(type, bound));
+ }
+ }
+ }
+ }
+
+ return converted;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]