This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new de816cbfea Core: Use lazy iterable in DeleteFileIndex (#8263)
de816cbfea is described below

commit de816cbfea2e33d77e7bc289cabf394e2903281f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Aug 11 16:20:31 2023 -0700

    Core: Use lazy iterable in DeleteFileIndex (#8263)
---
 .../java/org/apache/iceberg/DeleteFileIndex.java   | 24 +++-----------
 .../apache/iceberg/metrics/ScanMetricsUtil.java    | 37 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 19 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java 
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index 977b2a387d..63fde28e3c 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.metrics.ScanMetrics;
+import org.apache.iceberg.metrics.ScanMetricsUtil;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -50,7 +51,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
@@ -445,10 +445,8 @@ class DeleteFileIndex {
       return this;
     }
 
-    private Collection<DeleteFile> filterDeleteFiles() {
-      return Streams.stream(deleteFiles)
-          .filter(file -> file.dataSequenceNumber() > minSequenceNumber)
-          .collect(Collectors.toList());
+    private Iterable<DeleteFile> filterDeleteFiles() {
+      return Iterables.filter(deleteFiles, file -> file.dataSequenceNumber() > 
minSequenceNumber);
     }
 
     private Collection<DeleteFile> loadDeleteFiles() {
@@ -476,7 +474,7 @@ class DeleteFileIndex {
     }
 
     DeleteFileIndex build() {
-      Collection<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() 
: loadDeleteFiles();
+      Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() : 
loadDeleteFiles();
 
       // build a map from (specId, partition) to delete file entries
       Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap();
@@ -490,6 +488,7 @@ class DeleteFileIndex {
                 .computeIfAbsent(specId, id -> 
StructLikeWrapper.forType(spec.partitionType()))
                 .copyFor(file.partition());
         deleteFilesByPartition.put(Pair.of(specId, wrapper), new 
IndexedDeleteFile(spec, file));
+        ScanMetricsUtil.indexedDeleteFile(scanMetrics, file);
       }
 
       // sort the entries in each map value by sequence number and split into 
sequence numbers and
@@ -529,19 +528,6 @@ class DeleteFileIndex {
         }
       }
 
-      scanMetrics.indexedDeleteFiles().increment(files.size());
-      deleteFilesByPartition
-          .values()
-          .forEach(
-              file -> {
-                FileContent content = file.content();
-                if (content == FileContent.EQUALITY_DELETES) {
-                  scanMetrics.equalityDeleteFiles().increment();
-                } else if (content == FileContent.POSITION_DELETES) {
-                  scanMetrics.positionalDeleteFiles().increment();
-                }
-              });
-
       return new DeleteFileIndex(specsById, globalDeletes, 
sortedDeletesByPartition);
     }
 
diff --git a/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java 
b/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
new file mode 100644
index 0000000000..102f48ee19
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.metrics;
+
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+
+public class ScanMetricsUtil {
+
+  private ScanMetricsUtil() {}
+
+  public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile 
deleteFile) {
+    metrics.indexedDeleteFiles().increment();
+
+    if (deleteFile.content() == FileContent.POSITION_DELETES) {
+      metrics.positionalDeleteFiles().increment();
+    } else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
+      metrics.equalityDeleteFiles().increment();
+    }
+  }
+}

Reply via email to