stevenzwu commented on code in PR #15590: URL: https://github.com/apache/iceberg/pull/15590#discussion_r2942206797
########## core/src/main/java/org/apache/iceberg/DataFileAccumulator.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.DataFileSet; + +/** Accumulates data files and flushes them to manifests when a count threshold is reached. */ +class DataFileAccumulator { + + private final int flushThreshold; + private final BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> writeManifests; + private final Function<ManifestFile, ManifestReader<DataFile>> readManifest; + + private final Map<Integer, DataFileSet> pendingBySpec = Maps.newHashMap(); + private final List<ManifestFile> flushedManifests = Lists.newLinkedList(); + private final Set<Integer> allSpecIds = Sets.newHashSet(); + private int pendingCount = 0; + + DataFileAccumulator( + int flushThreshold, + BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> writeManifests, + Function<ManifestFile, ManifestReader<DataFile>> readManifest) { + this.flushThreshold = flushThreshold; + this.writeManifests = writeManifests; + this.readManifest = readManifest; Review Comment: Missing input validation for `flushThreshold`. A threshold of `0` would flush on every `add()` call (writing a manifest per file), and a negative value would silently never flush. Consider: ```java Preconditions.checkArgument(flushThreshold > 0, "Flush threshold must be positive: %s", flushThreshold); ``` ########## core/src/main/java/org/apache/iceberg/DataFileAccumulator.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.DataFileSet; + +/** Accumulates data files and flushes them to manifests when a count threshold is reached. */ +class DataFileAccumulator { + + private final int flushThreshold; + private final BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> writeManifests; + private final Function<ManifestFile, ManifestReader<DataFile>> readManifest; + + private final Map<Integer, DataFileSet> pendingBySpec = Maps.newHashMap(); + private final List<ManifestFile> flushedManifests = Lists.newLinkedList(); + private final Set<Integer> allSpecIds = Sets.newHashSet(); + private int pendingCount = 0; + + DataFileAccumulator( + int flushThreshold, + BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> writeManifests, Review Comment: Nit: `ArrayList` would be a better fit here — this list is only appended to and iterated. `LinkedList` has higher per-element overhead and poor cache locality. ########## core/src/main/java/org/apache/iceberg/DataFileAccumulator.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.DataFileSet; + +/** Accumulates data files and flushes them to manifests when a count threshold is reached. */ +class DataFileAccumulator { + + private final int flushThreshold; + private final BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> writeManifests; + private final Function<ManifestFile, ManifestReader<DataFile>> readManifest; + + private final Map<Integer, DataFileSet> pendingBySpec = Maps.newHashMap(); + private final List<ManifestFile> flushedManifests = Lists.newLinkedList(); + private final Set<Integer> allSpecIds = Sets.newHashSet(); + private int pendingCount = 0; + + DataFileAccumulator( + int flushThreshold, + BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> writeManifests, + Function<ManifestFile, ManifestReader<DataFile>> readManifest) { + this.flushThreshold = flushThreshold; + this.writeManifests = writeManifests; + this.readManifest = readManifest; + } + + /** + * Adds a data file. Returns true if the file was new. May flush to manifests if the threshold is + * reached. + */ + boolean add(DataFile file, int specId) { + DataFileSet files = pendingBySpec.computeIfAbsent(specId, ignored -> DataFileSet.create()); + if (files.add(file)) { + allSpecIds.add(specId); + pendingCount++; + if (pendingCount >= flushThreshold) { + flush(); + } + return true; + } + return false; + } + + boolean hasFiles() { + return !pendingBySpec.isEmpty() || !flushedManifests.isEmpty(); + } + + boolean hasPendingFiles() { + return !pendingBySpec.isEmpty(); + } + + /** All partition spec IDs seen across adds. */ + Set<Integer> specIds() { + return Collections.unmodifiableSet(allSpecIds); + } + + Map<Integer, DataFileSet> pendingBySpec() { + return pendingBySpec; + } + + List<ManifestFile> flushedManifests() { + return flushedManifests; + } Review Comment: `pendingBySpec()` and `flushedManifests()` return the raw mutable internal collections, while `specIds()` correctly returns `Collections.unmodifiableSet(...)`. For consistency and to prevent accidental mutation from callers in `MergingSnapshotProducer`, consider returning unmodifiable views here too — or exposing purpose-specific methods instead of raw collection access. ########## core/src/main/java/org/apache/iceberg/DataFileAccumulator.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.DataFileSet; + +/** Accumulates data files and flushes them to manifests when a count threshold is reached. */ +class DataFileAccumulator { + + private final int flushThreshold; + private final BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> writeManifests; + private final Function<ManifestFile, ManifestReader<DataFile>> readManifest; + + private final Map<Integer, DataFileSet> pendingBySpec = Maps.newHashMap(); + private final List<ManifestFile> flushedManifests = Lists.newLinkedList(); + private final Set<Integer> allSpecIds = Sets.newHashSet(); + private int pendingCount = 0; + + DataFileAccumulator( + int flushThreshold, + BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> writeManifests, + Function<ManifestFile, ManifestReader<DataFile>> readManifest) { + this.flushThreshold = flushThreshold; + this.writeManifests = writeManifests; + this.readManifest = readManifest; + } + + /** + * Adds a data file. Returns true if the file was new. May flush to manifests if the threshold is + * reached. + */ + boolean add(DataFile file, int specId) { + DataFileSet files = pendingBySpec.computeIfAbsent(specId, ignored -> DataFileSet.create()); + if (files.add(file)) { + allSpecIds.add(specId); + pendingCount++; + if (pendingCount >= flushThreshold) { + flush(); + } + return true; + } + return false; + } + + boolean hasFiles() { + return !pendingBySpec.isEmpty() || !flushedManifests.isEmpty(); + } + + boolean hasPendingFiles() { + return !pendingBySpec.isEmpty(); + } + + /** All partition spec IDs seen across adds. */ + Set<Integer> specIds() { + return Collections.unmodifiableSet(allSpecIds); + } + + Map<Integer, DataFileSet> pendingBySpec() { + return pendingBySpec; + } + + List<ManifestFile> flushedManifests() { + return flushedManifests; + } + + /** + * All added data files (flushed + pending). Flushed files are lazily read back from their + * manifests to avoid holding them in memory during the add phase. + */ + List<DataFile> allAddedFiles() { + ImmutableList.Builder<DataFile> builder = ImmutableList.builder(); + + for (ManifestFile manifest : flushedManifests) { + try (ManifestReader<DataFile> reader = readManifest.apply(manifest)) { + for (DataFile file : reader) { + builder.add(file); + } + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to read manifest file: %s", manifest.path()); + } + } + + pendingBySpec.values().forEach(builder::addAll); + return builder.build(); + } + + void deleteUncommitted(Set<ManifestFile> committed, Consumer<String> deleteFunc) { + boolean anyDeleted = false; + for (ManifestFile manifest : flushedManifests) { + if (!committed.contains(manifest)) { + deleteFunc.accept(manifest.path()); + anyDeleted = true; + } Review Comment: On successful commit where all flushed manifests are in the committed set, `anyDeleted` is false and `flushedManifests` is never cleared. This is benign since the accumulator isn't reused after commit, but it's inconsistent with `SnapshotProducer.deleteUncommitted()` which always clears when asked. Simpler to always clear: ```java void deleteUncommitted(Set<ManifestFile> committed, Consumer<String> deleteFunc) { for (ManifestFile manifest : flushedManifests) { if (!committed.contains(manifest)) { deleteFunc.accept(manifest.path()); } } flushedManifests.clear(); } ``` ########## core/src/main/java/org/apache/iceberg/DataFileAccumulator.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.DataFileSet; + +/** Accumulates data files and flushes them to manifests when a count threshold is reached. */ +class DataFileAccumulator { + + private final int flushThreshold; + private final BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> writeManifests; + private final Function<ManifestFile, ManifestReader<DataFile>> readManifest; + + private final Map<Integer, DataFileSet> pendingBySpec = Maps.newHashMap(); + private final List<ManifestFile> flushedManifests = Lists.newLinkedList(); + private final Set<Integer> allSpecIds = Sets.newHashSet(); + private int pendingCount = 0; + + DataFileAccumulator( + int flushThreshold, + BiFunction<Collection<DataFile>, Integer, List<ManifestFile>> writeManifests, + Function<ManifestFile, ManifestReader<DataFile>> readManifest) { + this.flushThreshold = flushThreshold; + this.writeManifests = writeManifests; + this.readManifest = readManifest; + } + + /** + * Adds a data file. Returns true if the file was new. May flush to manifests if the threshold is + * reached. + */ + boolean add(DataFile file, int specId) { + DataFileSet files = pendingBySpec.computeIfAbsent(specId, ignored -> DataFileSet.create()); + if (files.add(file)) { + allSpecIds.add(specId); + pendingCount++; + if (pendingCount >= flushThreshold) { + flush(); + } + return true; + } + return false; + } + + boolean hasFiles() { + return !pendingBySpec.isEmpty() || !flushedManifests.isEmpty(); + } + + boolean hasPendingFiles() { + return !pendingBySpec.isEmpty(); + } + + /** All partition spec IDs seen across adds. */ + Set<Integer> specIds() { + return Collections.unmodifiableSet(allSpecIds); + } + + Map<Integer, DataFileSet> pendingBySpec() { + return pendingBySpec; + } + + List<ManifestFile> flushedManifests() { + return flushedManifests; + } + + /** + * All added data files (flushed + pending). Flushed files are lazily read back from their + * manifests to avoid holding them in memory during the add phase. + */ + List<DataFile> allAddedFiles() { + ImmutableList.Builder<DataFile> builder = ImmutableList.builder(); + + for (ManifestFile manifest : flushedManifests) { + try (ManifestReader<DataFile> reader = readManifest.apply(manifest)) { + for (DataFile file : reader) { + builder.add(file); Review Comment: This reads back every flushed manifest and builds an `ImmutableList` of all data files, re-materializing everything into memory. This is called from `addedDataFiles()` which `BaseOverwriteFiles.validate()` uses to check added files against the overwrite filter. For a 500K-file overwrite, this re-spikes memory to the same level as before the optimization. Could this return a `CloseableIterable<DataFile>` that streams lazily through the flushed manifests instead? That would be consistent with the project's preference for `CloseableIterable` over eager materialization. ########## core/src/test/java/org/apache/iceberg/TestDataFileAccumulator.java: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +class TestDataFileAccumulator { + + private static final PartitionSpec SPEC = PartitionSpec.unpartitioned(); + + private final AtomicInteger flushCount = new AtomicInteger(0); + + private DataFileAccumulator newAccumulator(int threshold) { + return new DataFileAccumulator(threshold, this::stubWrite, manifest -> null); + } + + private List<ManifestFile> stubWrite(Collection<DataFile> files, int specId) { + flushCount.incrementAndGet(); Review Comment: The stub reader is `manifest -> null`, so calling `allAddedFiles()` after a flush would NPE on the null reader. The lazy read-back path — the most novel part of this PR — has no unit test coverage here. Could you add a test that verifies `allAddedFiles()` returns the correct files after a flush? The integration tests in `TestMergeAppend` exercise this indirectly, but a focused unit test would be valuable. ########## core/src/main/java/org/apache/iceberg/TableProperties.java: ########## @@ -121,6 +121,10 @@ private TableProperties() {} public static final String MANIFEST_MERGE_ENABLED = "commit.manifest-merge.enabled"; public static final boolean MANIFEST_MERGE_ENABLED_DEFAULT = true; + public static final String MANIFEST_FLUSH_FILE_COUNT_THRESHOLD = Review Comment: Do we need a new property here? We already have `commit.manifest.target-size-bytes`, and the per-entry serialized size is estimable from the table schema. The six per-column stats maps (`column_sizes`, `value_counts`, `null_value_counts`, `nan_value_counts`, `lower_bounds`, `upper_bounds`) dominate entry size and scale with `min(schema.columns().size(), METRICS_MAX_INFERRED_COLUMN_DEFAULTS)`. A rough estimate: ``` estimatedEntryBytes ≈ 200 + 80 * min(numColumns, metricsMaxColumns) + 20 * partitionFields flushThreshold = targetManifestSizeBytes * N / estimatedEntryBytes ``` For a 10-column table this yields ~80K (same ballpark as the 100K default); for a 100-column table it yields ~10K, which correctly flushes sooner since each entry consumes ~8x more memory. The hardcoded 100K default treats both identically. All the inputs (`schema`, `spec`, `metricsMaxInferredColumnDefaults`, `targetManifestSizeBytes`) are already available in the `MergingSnapshotProducer` constructor. This would make the behavior self-tuning and avoid adding a new configuration knob. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
