laskoviymishka commented on code in PR #16052: URL: https://github.com/apache/iceberg/pull/16052#discussion_r3272095669
########## core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Coalesces consecutive position deletes into range inserts on a {@link PositionDeleteIndex}. */ +public final class PositionDeleteRangeConsumer { Review Comment: this is a V2-only optimization, V3 DVs go through `PositionDeleteIndex.deserialize` and don't touch this code, and the V3 spec asks writers to merge old position delete files into DVs at maintenance. i'd want a sentence in the class doc naming the workload this is meant to help, otherwise in a year or two it'll be hard to know whether to keep tuning it or let it bit-rot alongside V2 position deletes. wdyt? ########## core/src/main/java/org/apache/iceberg/deletes/Deletes.java: ########## @@ -177,7 +177,7 @@ private static PositionDeleteIndex toPositionIndex( CloseableIterable<Long> posDeletes, List<DeleteFile> files) { try (CloseableIterable<Long> deletes = posDeletes) { PositionDeleteIndex positionDeleteIndex = new BitmapPositionDeleteIndex(files); - deletes.forEach(positionDeleteIndex::delete); + PositionDeleteRangeConsumer.forEach(deletes, positionDeleteIndex); Review Comment: i'm a little confused why the sibling `toPositionIndexes` is left untouched in this PR. It's what `BaseDeleteLoader.readPosDeletes` calls for the cached multi-file path, so arguably the more-trafficked of the two. either the benchmark wins translate to real workloads and that loop wants the same treatment, or they don't and I'm not sure why we want it on this path. Thoughts? ########## core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Coalesces consecutive position deletes into range inserts on a {@link PositionDeleteIndex}. */ +public final class PositionDeleteRangeConsumer { + + /** + * Batch size for {@link #forEach}. Sized to fit comfortably in L1 (512 bytes). Smaller buffers + * miss the bulk-path branch elision; larger buffers add allocation cost without improving the + * inner-loop throughput (the {@code acceptAll} body is the same regardless of slice length). + */ + private static final int FOREACH_BATCH_SIZE = 64; + + private PositionDeleteRangeConsumer() {} + + /** + * Drains {@code positions} into a {@link RangeAccumulator} and flushes. + * + * <p>Boxed positions are buffered into a small primitive slice and then handed to {@link + * RangeAccumulator#acceptAll(long[], int, int)}, which keeps the sniff/escape state machine out + * of the inner loop. Compared to per-element {@link RangeAccumulator#accept(long)}, this gives a + * ~12% reduction in run-to-run time on dense inputs and -- more importantly -- removes the JIT + * inlining sensitivity that produces a 2 ms standard deviation on the per-element path. + */ + public static void forEach(Iterable<Long> positions, PositionDeleteIndex target) { + RangeAccumulator acc = new RangeAccumulator(target); + long[] buffer = new long[FOREACH_BATCH_SIZE]; + int filled = 0; + for (Long pos : positions) { + buffer[filled++] = pos; + if (filled == FOREACH_BATCH_SIZE) { + acc.acceptAll(buffer, 0, FOREACH_BATCH_SIZE); + filled = 0; + } + } + if (filled > 0) { + acc.acceptAll(buffer, 0, filled); + } + acc.flush(); + } + + /** + * Coalesces consecutive positions into range deletes on the target index. The first {@value + * #SNIFF_SIZE} positions are inspected; if more than {@value #BOUNDARY_THRESHOLD_PERCENT}% cross + * gaps, the accumulator falls back to per-position deletes for the rest of its life. + * + * <p>Single-threaded; one instance per target index. Callers that already have positions in a + * primitive {@code long[]} should call {@link #acceptAll(long[], int, int)} directly -- the bulk + * path keeps the state-machine dispatch out of the inner loop. {@link #accept(long)} exists for + * truly streaming callers that do not buffer; {@link PositionDeleteRangeConsumer#forEach} is the + * standard entry for boxed iterables and handles its own small primitive batching internally. + */ + public static final class RangeAccumulator { + + private static final int SNIFF_SIZE = 256; + private static final int BOUNDARY_THRESHOLD_PERCENT = 30; + + private final PositionDeleteIndex target; + private boolean hasRun; + private long rangeStart; + private long lastPosition; + + private int processed; + private int boundaries; + private boolean escaped; + + public RangeAccumulator(PositionDeleteIndex target) { + Preconditions.checkArgument(target != null, "Invalid target index: null"); + this.target = target; + } + + public void accept(long pos) { + if (escaped) { + target.delete(pos); + return; + } + if (!hasRun) { + initRun(pos); + return; + } + coalesceSniff(pos); + if (processed == SNIFF_SIZE && shouldEscape()) { + enterEscape(); + } + } + + /** + * Bulk variant of {@link #accept(long)}. Runs the entire sniff/coalesce loop inside this method + * so the per-element work in steady state is identical to the original tight inline loop -- one + * gap-check branch and one position store, with no per-call frame. The small private helpers + * are inlined by HotSpot on the hot path. + */ + public void acceptAll(long[] positions, int from, int to) { + Preconditions.checkArgument(positions != null, "Invalid positions array: null"); + Preconditions.checkPositionIndexes(from, to, positions.length); + if (from >= to) { + return; + } + + int cursor = from; + + if (escaped) { + drainEscaped(positions, cursor, to); + return; + } + + if (!hasRun) { + initRun(positions[cursor++]); + } + + while (cursor < to && processed < SNIFF_SIZE) { + coalesceSniff(positions[cursor++]); + } + + if (processed == SNIFF_SIZE && shouldEscape()) { + enterEscape(); + drainEscaped(positions, cursor, to); + return; + } + + while (cursor < to) { + coalesce(positions[cursor++]); + } + } + + /** Emits the active run, if any. The escape decision is sticky across flushes. */ + public void flush() { + if (hasRun) { + emit(); + hasRun = false; + } + } + + /** Starts a new active run anchored at {@code first}. */ + private void initRun(long first) { + rangeStart = first; + lastPosition = first; + hasRun = true; + processed = 1; + } + + /** Extends the active run with {@code pos} during sniffing; counts gaps to inform escape. */ + private void coalesceSniff(long pos) { + if (pos - lastPosition != 1) { + boundaries++; + emit(); + rangeStart = pos; + } + lastPosition = pos; + processed++; + } + + /** Extends the active run with {@code pos} after sniffing has decided not to escape. */ + private void coalesce(long pos) { + if (pos - lastPosition != 1) { + emit(); + rangeStart = pos; + } + lastPosition = pos; + } + + /** True if the sniffed prefix has too many gaps to make coalescing worthwhile. */ + private boolean shouldEscape() { + return boundaries * 100 > (SNIFF_SIZE - 1) * BOUNDARY_THRESHOLD_PERCENT; Review Comment: position delete files are spec-sorted by `(file_path, pos)`, so within a single file the stream is monotonic, at which point a greedy `pos == lastEnd + 1` check gets us coalescing without the sniff window, boundary counter, or escape mode. did you measure that shape? my intuition is the 2% sparse regression is mostly the sniff cost itself, not per-element branching so if that holds, we keep the dense win and drop the two magic constants. wdyt? ########## core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java: ########## @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** Coalesces consecutive position deletes into range inserts on a {@link PositionDeleteIndex}. */ +public final class PositionDeleteRangeConsumer { Review Comment: is this meant as a public API? the only caller is in-package, and the `accept`/`acceptAll`/`flush` shape plus the sniff-stickiness contract feel like internals to me. id make this and `RangeAccumulator` package-private, keeps our hands free if the design shifts based on the questions above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
