Baunsgaard commented on code in PR #16052:
URL: https://github.com/apache/iceberg/pull/16052#discussion_r3272647001


##########
core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Coalesces consecutive position deletes into range inserts on a {@link 
PositionDeleteIndex}. */
+public final class PositionDeleteRangeConsumer {

Review Comment:
   While the position delete files are spec-sorted, the implementation I made 
is agnostic to the sortedness of the input. This would allow engines to have 
any iterable or `long[]` list of indexes they want to delete, and flush them 
through this API to quickly get the `PositionDeleteIndex` updated.
   
   To further justify the addition, I also wired the merging of two 
`PositionDeleteIndex`es through
   the same accumulator. Both `PositionDeleteIndex.merge(default)` and the 
`else` branch in `BitmapPositionDeleteIndex.merge` now delegate to a new 
`PositionDeleteRangeConsumer.forEach( source, target)` overload, so the 
per-position fallback gets the range-coalescing fast path. The new helper is 
exercised by every caller of `PositionDeleteIndex.merge`, which today includes:
   
   - `PositionDeleteIndexUtil.merge` — used by 
`BaseDeleteLoader.getOrReadPosDeletes` on the cached
     multi-file V2 read path (the sibling you flagged below).
   - `BaseDVFileWriter.delete(path, index, ...)` and `BaseDVFileWriter.close()` 
— V3 DV writer,
     bulk-add and merge-with-previous on finalize.
   - `SortingPositionOnlyDeleteWriter.complete()` — V2 writer, 
merge-with-previous before sort+write.
   
   Honest caveat: in production today the source is almost always another 
`BitmapPositionDeleteIndex`, so the `bitmap.setAll` fast path is taken and our 
new helper is not invoked. The win is defensive since any non-bitmap source 
(custom engine impls, test doubles, future implementations) now gets coalescing 
for free, and we have one implementation of the fallback instead of two copies 
of `that.forEach(this::delete)`.
   
   But just to be accommodating, I added a class-level comment to name the V2 
workload, and to make it explicit that V3 DVs bypass this class via 
`PositionDeleteIndex.deserialize`. When V2 position delete files are no longer 
read, this class can retire with them.



##########
core/src/main/java/org/apache/iceberg/deletes/Deletes.java:
##########
@@ -177,7 +177,7 @@ private static PositionDeleteIndex toPositionIndex(
       CloseableIterable<Long> posDeletes, List<DeleteFile> files) {
     try (CloseableIterable<Long> deletes = posDeletes) {
       PositionDeleteIndex positionDeleteIndex = new 
BitmapPositionDeleteIndex(files);
-      deletes.forEach(positionDeleteIndex::delete);
+      PositionDeleteRangeConsumer.forEach(deletes, positionDeleteIndex);

Review Comment:
   Good point, the cached path is indeed the more-trafficked of the two for 
engines that don't bundle `iceberg-arrow`. The Arrow PR (#16440) only registers 
`PositionDeleteIndexReader` for Parquet, so:
   
   - Flink reads of position delete files
   - Avro / ORC position delete files in any engine
   
   …all still flow through `Deletes.toPositionIndexes` to 
`index.delete(position)` per row.
   
   I've wired `Deletes.toPositionIndexes` through 
`PositionDeleteRangeConsumer.forEach`, grouping records by adjacent `file_path` 
with a `PeekingIterator`. The fast path now kicks in per data-file group, and 
out-of-order revisits append correctly into the existing bitmap. To keep 
autoboxing out of the inner loop on this many-path shape I added a 
package-private `forEach(PrimitiveIterator.OfLong, target)` overload and let 
`drainPositionsForPath` return that primitive iterator directly.
   
   Covered by `TestDeletesToPositionIndexes` (empty / single path / multi-path 
/ out-of-order revisit / mixed `String`/`Utf8` / sparse / `DeleteFile` 
propagation / close-failure → `UncheckedIOException`) and exercised end-to-end 
by `TestGenericReaderDeletes`.
   



##########
core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Coalesces consecutive position deletes into range inserts on a {@link 
PositionDeleteIndex}. */
+public final class PositionDeleteRangeConsumer {

Review Comment:
   It is, and makes sense to be, for faster combining of PositionDeleteIndexes. 
   
   However, the public surface I suggested was wider than the design warrants. 
I reshaped it
   based on what the Arrow stack on PR #16440 actually needs.
   
   The  public surface is now exactly three functions, and some static forEach 
overloads.
     ```
     new PositionDeleteRangeConsumer(target);
     consumer.acceptAll(long[] positions, int from, int to);
     consumer.flush();
     public static void forEach(Iterable<Long> positions, PositionDeleteIndex 
target);
     public static void forEach(PositionDeleteIndex source, PositionDeleteIndex 
target);
     ```
   



##########
core/src/main/java/org/apache/iceberg/deletes/PositionDeleteRangeConsumer.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.deletes;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+/** Coalesces consecutive position deletes into range inserts on a {@link 
PositionDeleteIndex}. */
+public final class PositionDeleteRangeConsumer {
+
+  /**
+   * Batch size for {@link #forEach}. Sized to fit comfortably in L1 (512 
bytes). Smaller buffers
+   * miss the bulk-path branch elision; larger buffers add allocation cost 
without improving the
+   * inner-loop throughput (the {@code acceptAll} body is the same regardless 
of slice length).
+   */
+  private static final int FOREACH_BATCH_SIZE = 64;
+
+  private PositionDeleteRangeConsumer() {}
+
+  /**
+   * Drains {@code positions} into a {@link RangeAccumulator} and flushes.
+   *
+   * <p>Boxed positions are buffered into a small primitive slice and then 
handed to {@link
+   * RangeAccumulator#acceptAll(long[], int, int)}, which keeps the 
sniff/escape state machine out
+   * of the inner loop. Compared to per-element {@link 
RangeAccumulator#accept(long)}, this gives a
+   * ~12% reduction in run-to-run time on dense inputs and -- more importantly 
-- removes the JIT
+   * inlining sensitivity that produces a 2 ms standard deviation on the 
per-element path.
+   */
+  public static void forEach(Iterable<Long> positions, PositionDeleteIndex 
target) {
+    RangeAccumulator acc = new RangeAccumulator(target);
+    long[] buffer = new long[FOREACH_BATCH_SIZE];
+    int filled = 0;
+    for (Long pos : positions) {
+      buffer[filled++] = pos;
+      if (filled == FOREACH_BATCH_SIZE) {
+        acc.acceptAll(buffer, 0, FOREACH_BATCH_SIZE);
+        filled = 0;
+      }
+    }
+    if (filled > 0) {
+      acc.acceptAll(buffer, 0, filled);
+    }
+    acc.flush();
+  }
+
+  /**
+   * Coalesces consecutive positions into range deletes on the target index. 
The first {@value
+   * #SNIFF_SIZE} positions are inspected; if more than {@value 
#BOUNDARY_THRESHOLD_PERCENT}% cross
+   * gaps, the accumulator falls back to per-position deletes for the rest of 
its life.
+   *
+   * <p>Single-threaded; one instance per target index. Callers that already 
have positions in a
+   * primitive {@code long[]} should call {@link #acceptAll(long[], int, int)} 
directly -- the bulk
+   * path keeps the state-machine dispatch out of the inner loop. {@link 
#accept(long)} exists for
+   * truly streaming callers that do not buffer; {@link 
PositionDeleteRangeConsumer#forEach} is the
+   * standard entry for boxed iterables and handles its own small primitive 
batching internally.
+   */
+  public static final class RangeAccumulator {
+
+    private static final int SNIFF_SIZE = 256;
+    private static final int BOUNDARY_THRESHOLD_PERCENT = 30;
+
+    private final PositionDeleteIndex target;
+    private boolean hasRun;
+    private long rangeStart;
+    private long lastPosition;
+
+    private int processed;
+    private int boundaries;
+    private boolean escaped;
+
+    public RangeAccumulator(PositionDeleteIndex target) {
+      Preconditions.checkArgument(target != null, "Invalid target index: 
null");
+      this.target = target;
+    }
+
+    public void accept(long pos) {
+      if (escaped) {
+        target.delete(pos);
+        return;
+      }
+      if (!hasRun) {
+        initRun(pos);
+        return;
+      }
+      coalesceSniff(pos);
+      if (processed == SNIFF_SIZE && shouldEscape()) {
+        enterEscape();
+      }
+    }
+
+    /**
+     * Bulk variant of {@link #accept(long)}. Runs the entire sniff/coalesce 
loop inside this method
+     * so the per-element work in steady state is identical to the original 
tight inline loop -- one
+     * gap-check branch and one position store, with no per-call frame. The 
small private helpers
+     * are inlined by HotSpot on the hot path.
+     */
+    public void acceptAll(long[] positions, int from, int to) {
+      Preconditions.checkArgument(positions != null, "Invalid positions array: 
null");
+      Preconditions.checkPositionIndexes(from, to, positions.length);
+      if (from >= to) {
+        return;
+      }
+
+      int cursor = from;
+
+      if (escaped) {
+        drainEscaped(positions, cursor, to);
+        return;
+      }
+
+      if (!hasRun) {
+        initRun(positions[cursor++]);
+      }
+
+      while (cursor < to && processed < SNIFF_SIZE) {
+        coalesceSniff(positions[cursor++]);
+      }
+
+      if (processed == SNIFF_SIZE && shouldEscape()) {
+        enterEscape();
+        drainEscaped(positions, cursor, to);
+        return;
+      }
+
+      while (cursor < to) {
+        coalesce(positions[cursor++]);
+      }
+    }
+
+    /** Emits the active run, if any. The escape decision is sticky across 
flushes. */
+    public void flush() {
+      if (hasRun) {
+        emit();
+        hasRun = false;
+      }
+    }
+
+    /** Starts a new active run anchored at {@code first}. */
+    private void initRun(long first) {
+      rangeStart = first;
+      lastPosition = first;
+      hasRun = true;
+      processed = 1;
+    }
+
+    /** Extends the active run with {@code pos} during sniffing; counts gaps 
to inform escape. */
+    private void coalesceSniff(long pos) {
+      if (pos - lastPosition != 1) {
+        boundaries++;
+        emit();
+        rangeStart = pos;
+      }
+      lastPosition = pos;
+      processed++;
+    }
+
+    /** Extends the active run with {@code pos} after sniffing has decided not 
to escape. */
+    private void coalesce(long pos) {
+      if (pos - lastPosition != 1) {
+        emit();
+        rangeStart = pos;
+      }
+      lastPosition = pos;
+    }
+
+    /** True if the sniffed prefix has too many gaps to make coalescing 
worthwhile. */
+    private boolean shouldEscape() {
+      return boundaries * 100 > (SNIFF_SIZE - 1) * BOUNDARY_THRESHOLD_PERCENT;

Review Comment:
   > position delete files are spec-sorted by (file_path, pos), so within a 
single file the stream is monotonic, at which point a greedy pos == lastEnd + 1 
check gets us coalescing without the sniff window, boundary counter, or escape 
mode.
   
   The consumer is intentionally agnostic to input sortedness — the goal is a 
single primitive that any caller (V2 file reader, the 
`PositionDeleteIndex.merge` callsites we now route through here, future query 
paths) can drop a batch of positions into without having to know that the 
producer happened to emit them in order. Relying on the V2 spec for correctness 
would make this a file-format helper rather than a general utility.
   
   > did you measure that shape? my intuition is the 2% sparse regression is 
mostly the sniff cost itself, not per-element branching so if that holds, we 
keep the dense win and drop the two magic constants. wdyt?
   
   I'd like that to be true, but the numbers say otherwise. I added a 
`GreedyRangeConsumer` (same coalescing logic, no sniff/escape, no boundary 
counter) and ran the full distribution sweep:
   
   
   | distribution | prod `acceptAll` | greedy `acceptAll` | Δ% | prod `forEach` 
(boxed) | greedy `forEach` (boxed) | Δ% |
   |---|---:|---:|---:|---:|---:|---:|
   | FULL      |  2.24 ± 0.44 |  2.25 ± 0.24 |  +0.6 % |  9.17 ± 2.01 |  6.36 ± 
0.42 | −30.7 % * |
   | MEDIUM    |  6.28 ± 0.11 |  5.90 ± 0.10 |  −6.1 % | 13.61 ± 1.53 | 12.48 ± 
0.58 |  −8.3 % |
   | SHORT     | 41.28 ± 0.17 | 42.30 ± 1.71 |  +2.5 % | 48.41 ± 2.47 | 48.93 ± 
0.39 |  +1.1 % |
   | SPARSE_95 | 36.72 ± 3.49 | 36.64 ± 0.35 |  −0.2 % | 45.43 ± 3.11 | 44.98 ± 
0.48 |  −1.0 % |
   | **SPARSE_50** | **48.88 ± 4.80** | **81.59 ± 9.62** | **+66.9 %** | 
**60.95 ± 14.99** | **91.74 ± 0.42** | **+50.5 %** |
   | SPARSE_5  | 16.69 ± 0.79 | 16.33 ± 4.57 |  −2.2 % | 23.97 ± 1.54 | 23.57 ± 
0.16 |  −1.7 % |
   | NONE      | 43.82 ± 2.86 | 44.76 ± 1.01 |  +2.2 % | 44.38 ± 0.72 | 39.36 ± 
0.19 | −11.3 % |
   
   
   SPARSE_50 (alternating consecutive/gap — roughly "every other row deleted in 
a file") regresses 1.5–1.7× under greedy. The cost isn't sniff overhead; it's 
that every gap forces greedy to emit the previous singleton via 
`target.delete(rangeStart)` and re-anchor a fresh run. The adaptive escape 
avoids that by detecting the gap density during the first 256 positions and 
falling through to the per-position `target.delete(pos)` path, which doesn't 
pay the `rangeStart = pos` write.
   
   
   The other shapes are within 8 %, so the greedy wins don't offset the 
SPARSE_50 cost. I'd rather keep the two constants than give up that ratio on a 
plausible workload.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to