This is an automated email from the ASF dual-hosted git repository.
singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 2a006ba8d0 Core: Close planFiles() iterable in CatalogHandler (#14891)
2a006ba8d0 is described below
commit 2a006ba8d0f9b668efcfbc6c523c87ceddf35b94
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Dec 19 17:44:00 2025 +0100
Core: Close planFiles() iterable in CatalogHandler (#14891)
---
.../org/apache/iceberg/rest/CatalogHandlers.java | 57 ++++++++++++----------
1 file changed, 31 insertions(+), 26 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index e3eff1228a..229497576a 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -23,6 +23,7 @@ import static
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAUL
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
import static
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+import java.io.IOException;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collections;
@@ -64,6 +65,7 @@ import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -805,35 +807,38 @@ public class CatalogHandlers {
*/
private static Pair<List<FileScanTask>, String> planFilesFor(
Scan<?, FileScanTask, ?> scan, String planId, String tableId, int
tasksPerPlanTask) {
- Iterable<FileScanTask> planTasks = scan.planFiles();
- String planTaskPrefix = planId + "-" + tableId + "-";
-
- // Handle empty table scans
- if (!planTasks.iterator().hasNext()) {
- String planTaskKey = planTaskPrefix + "0";
- // Add empty scan to planning state so async calls know the scan
completed
- IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey,
Collections.emptyList());
- return Pair.of(Collections.emptyList(), planTaskKey);
- }
-
- Iterable<List<FileScanTask>> taskGroupings =
Iterables.partition(planTasks, tasksPerPlanTask);
- int planTaskSequence = 0;
- String previousPlanTask = null;
- String firstPlanTaskKey = null;
- List<FileScanTask> initialFileScanTasks = null;
- for (List<FileScanTask> taskGrouping : taskGroupings) {
- String planTaskKey = planTaskPrefix + planTaskSequence++;
- IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, taskGrouping);
- if (previousPlanTask != null) {
- IN_MEMORY_PLANNING_STATE.addNextPlanTask(previousPlanTask,
planTaskKey);
- } else {
- firstPlanTaskKey = planTaskKey;
- initialFileScanTasks = taskGrouping;
+ try (CloseableIterable<FileScanTask> planTasks = scan.planFiles()) {
+ String planTaskPrefix = planId + "-" + tableId + "-";
+
+ // Handle empty table scans
+ if (!planTasks.iterator().hasNext()) {
+ String planTaskKey = planTaskPrefix + "0";
+ // Add empty scan to planning state so async calls know the scan
completed
+ IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey,
Collections.emptyList());
+ return Pair.of(Collections.emptyList(), planTaskKey);
}
- previousPlanTask = planTaskKey;
+ Iterable<List<FileScanTask>> taskGroupings =
Iterables.partition(planTasks, tasksPerPlanTask);
+ int planTaskSequence = 0;
+ String previousPlanTask = null;
+ String firstPlanTaskKey = null;
+ List<FileScanTask> initialFileScanTasks = null;
+ for (List<FileScanTask> taskGrouping : taskGroupings) {
+ String planTaskKey = planTaskPrefix + planTaskSequence++;
+ IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, taskGrouping);
+ if (previousPlanTask != null) {
+ IN_MEMORY_PLANNING_STATE.addNextPlanTask(previousPlanTask,
planTaskKey);
+ } else {
+ firstPlanTaskKey = planTaskKey;
+ initialFileScanTasks = taskGrouping;
+ }
+
+ previousPlanTask = planTaskKey;
+ }
+ return Pair.of(initialFileScanTasks, firstPlanTaskKey);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- return Pair.of(initialFileScanTasks, firstPlanTaskKey);
}
@SuppressWarnings("FutureReturnValueIgnored")