This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new de816cbfea Core: Use lazy iterable in DeleteFileIndex (#8263)
de816cbfea is described below
commit de816cbfea2e33d77e7bc289cabf394e2903281f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Aug 11 16:20:31 2023 -0700
Core: Use lazy iterable in DeleteFileIndex (#8263)
---
.../java/org/apache/iceberg/DeleteFileIndex.java | 24 +++-----------
.../apache/iceberg/metrics/ScanMetricsUtil.java | 37 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 19 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index 977b2a387d..63fde28e3c 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -50,7 +51,6 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
@@ -445,10 +445,8 @@ class DeleteFileIndex {
return this;
}
- private Collection<DeleteFile> filterDeleteFiles() {
- return Streams.stream(deleteFiles)
- .filter(file -> file.dataSequenceNumber() > minSequenceNumber)
- .collect(Collectors.toList());
+ private Iterable<DeleteFile> filterDeleteFiles() {
+ return Iterables.filter(deleteFiles, file -> file.dataSequenceNumber() >
minSequenceNumber);
}
private Collection<DeleteFile> loadDeleteFiles() {
@@ -476,7 +474,7 @@ class DeleteFileIndex {
}
DeleteFileIndex build() {
- Collection<DeleteFile> files = deleteFiles != null ? filterDeleteFiles()
: loadDeleteFiles();
+ Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() :
loadDeleteFiles();
// build a map from (specId, partition) to delete file entries
Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap();
@@ -490,6 +488,7 @@ class DeleteFileIndex {
.computeIfAbsent(specId, id ->
StructLikeWrapper.forType(spec.partitionType()))
.copyFor(file.partition());
deleteFilesByPartition.put(Pair.of(specId, wrapper), new
IndexedDeleteFile(spec, file));
+ ScanMetricsUtil.indexedDeleteFile(scanMetrics, file);
}
// sort the entries in each map value by sequence number and split into
sequence numbers and
@@ -529,19 +528,6 @@ class DeleteFileIndex {
}
}
- scanMetrics.indexedDeleteFiles().increment(files.size());
- deleteFilesByPartition
- .values()
- .forEach(
- file -> {
- FileContent content = file.content();
- if (content == FileContent.EQUALITY_DELETES) {
- scanMetrics.equalityDeleteFiles().increment();
- } else if (content == FileContent.POSITION_DELETES) {
- scanMetrics.positionalDeleteFiles().increment();
- }
- });
-
return new DeleteFileIndex(specsById, globalDeletes,
sortedDeletesByPartition);
}
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
b/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
new file mode 100644
index 0000000000..102f48ee19
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+
+public class ScanMetricsUtil {
+
+ private ScanMetricsUtil() {}
+
+ public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile
deleteFile) {
+ metrics.indexedDeleteFiles().increment();
+
+ if (deleteFile.content() == FileContent.POSITION_DELETES) {
+ metrics.positionalDeleteFiles().increment();
+ } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
+ metrics.equalityDeleteFiles().increment();
+ }
+ }
+}