iprithv commented on code in PR #16207: URL: https://github.com/apache/lucene/pull/16207#discussion_r3368270595
########## lucene/CHANGES.txt: ########## @@ -232,6 +232,8 @@ Optimizations * GITHUB#16172: Bulk-evaluate skip-indexed sorted and sorted-set doc-values range queries via a block-aware DocValuesRangeIterator#intoBitSet. (Costin Leau) +* GITHUB#16033: Fix data race in BlockingFloatHeap.poll function. (Vijay) Review Comment: shouldn't this be under bug fixes? ########## lucene/core.tests/src/test/org/apache/lucene/core/tests/util/hnsw/TestBlockingFloatHeap.java: ########## @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.core.tests.util.hnsw; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.hnsw.BlockingFloatHeap; + +/** + * Unit tests for {@link BlockingFloatHeap} with focus on thread-safety and data race conditions. + */ +public class TestBlockingFloatHeap extends LuceneTestCase { + + /** Test basic offer and poll operations */ + public void testBasicOfferAndPoll() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + heap.offer(5.0f); + assertEquals(1, heap.size()); + assertEquals(5.0f, heap.peek(), 0.0f); + + heap.offer(3.0f); + assertEquals(2, heap.size()); + assertEquals(3.0f, heap.peek(), 0.0f); + + heap.offer(7.0f); + assertEquals(3, heap.size()); + assertEquals(3.0f, heap.peek(), 0.0f); + + float result = heap.poll(); + assertEquals(3.0f, result, 0.0f); + assertEquals(2, heap.size()); + assertEquals(5.0f, heap.peek(), 0.0f); + + result = heap.poll(); + assertEquals(5.0f, result, 0.0f); + assertEquals(1, heap.size()); + assertEquals(7.0f, heap.peek(), 0.0f); + + result = heap.poll(); + assertEquals(7.0f, result, 0.0f); + assertEquals(0, heap.size()); + } + + /** Test poll on empty heap throws exception */ + public void testPollEmptyHeap() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + expectThrows(IllegalStateException.class, heap::poll); + } + + /** Test offer with array */ + public void testOfferArray() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + float[] values = {1.0f, 2.0f, 3.0f, 4.0f, 5.0f}; + heap.offer(values, 5); + assertEquals(5, heap.size()); + assertEquals(1.0f, heap.peek(), 0.0f); + + float result = heap.poll(); + assertEquals(1.0f, result, 0.0f); + assertEquals(4, heap.size()); + assertEquals(2.0f, heap.peek(), 0.0f); + } + + /** Test heap maintains min-heap property */ + public void testHeapProperty() { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + int[] testValues = {42, 17, 93, 8, 55, 34, 71, 2, 99, 11}; + + for (int val : testValues) { + heap.offer(val); + } + + assertEquals(10, heap.size()); + + int previous = (int) heap.poll(); + while (heap.size() > 0) { + int current = (int) heap.poll(); + assertTrue( + "Heap property violated: " + current + " should be >= " + previous, current >= previous); + previous = current; + } + } + + /** + * Test concurrent offer and poll operations. This specifically tests the data race fix where + * poll() was checking size > 0 outside the lock. + */ + @SuppressWarnings("unused") + public void testConcurrentOfferAndPoll() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(1000); + int numThreads = 8; + int operationsPerThread = 100; + AtomicInteger errors = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CountDownLatch producersReady = new CountDownLatch(numThreads / 2); + + Thread[] threads = new Thread[numThreads]; + + // Create producer/consumer threads + for (int i = 0; i < numThreads / 2; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Synchronize thread start + for (int j = 0; j < operationsPerThread; j++) { + heap.offer((float) (random().nextDouble() * 1000)); + } + producersReady.countDown(); // Signal that this producer has completed + } catch (InterruptedException | BrokenBarrierException ignored) { + errors.incrementAndGet(); + } + }); + } + + for (int i = numThreads / 2; i < numThreads; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Synchronize thread start + // Wait for producers to add items to the heap + producersReady.await(); + for (int j = 0; j < operationsPerThread / 2; j++) { + if (heap.size() > 0) { + heap.poll(); + } + } + } catch (InterruptedException | BrokenBarrierException ignored) { + errors.incrementAndGet(); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + assertEquals("Errors occurred during concurrent operations", 0, errors.get()); + } + + /** + * Test the specific data race scenario: multiple threads calling poll() simultaneously when heap + * is near empty + */ + @SuppressWarnings("unused") + public void testDataRaceOnPoll() throws InterruptedException { + int numThreads = 10; + for (int iteration = 0; iteration < 10; iteration++) { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + + // Fill heap with exactly numThreads elements + for (int i = 0; i < numThreads; i++) { + heap.offer((float) i); + } + + AtomicInteger successfulPolls = new AtomicInteger(0); + AtomicInteger failedPolls = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + + Thread[] threads = new Thread[numThreads]; + + // All threads try to poll simultaneously + for (int i = 0; i < numThreads; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Ensure all threads start simultaneously + try { + heap.poll(); + successfulPolls.incrementAndGet(); + } catch (IllegalStateException ignored) { + failedPolls.incrementAndGet(); + } + } catch (InterruptedException | BrokenBarrierException ignored) { + Thread.currentThread().interrupt(); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // Verify exactly numThreads successful polls and 0 failed + assertEquals( + "Iteration " + iteration + ": Expected " + numThreads + " successful polls", + numThreads, + successfulPolls.get()); + assertEquals("Iteration " + iteration + ": Expected 0 failed polls", 0, failedPolls.get()); + assertEquals("Iteration " + iteration + ": Heap should be empty", 0, heap.size()); + } + } + + /** + * Test the data race scenario where one thread polls while another calls peek. This ensures + * peek() reads consistent state. + */ + @SuppressWarnings("unused") + public void testConcurrentPollAndPeek() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + + for (int i = 0; i < 100; i++) { + heap.offer((float) i); + } + + AtomicInteger peekErrors = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(2); + + Thread pollThread = + new Thread( + () -> { + try { + barrier.await(); + for (int i = 0; i < 50; i++) { + if (heap.size() > 0) { + heap.poll(); + } + } + } catch (InterruptedException | BrokenBarrierException ignored) { + Thread.currentThread().interrupt(); + } + }); + + Thread peekThread = + new Thread( + () -> { + try { + barrier.await(); + for (int i = 0; i < 100; i++) { + if (heap.size() > 0) { + float peek = heap.peek(); + assertTrue("Peek returned invalid value", peek >= 0); + } + } + } catch (InterruptedException | BrokenBarrierException | IllegalStateException e) { + // IllegalStateException is possible if heap becomes empty + if (!(e instanceof IllegalStateException)) { + peekErrors.incrementAndGet(); + } + } + }); + + pollThread.start(); + peekThread.start(); + + pollThread.join(); + peekThread.join(); + + assertEquals("No errors should occur during concurrent peek/poll", 0, peekErrors.get()); + } + + /** + * Test that the fix prevents negative size values. This would be a clear indicator of the data + * race. + */ + @SuppressWarnings("unused") + public void testSizeNeverNegative() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + + // Pre-fill the heap + for (int i = 0; i < 50; i++) { + heap.offer((float) i); + } + + AtomicInteger negativeDetected = new AtomicInteger(0); + int numThreads = 10; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + + Thread[] threads = new Thread[numThreads]; + + for (int i = 0; i < numThreads; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); + for (int j = 0; j < 100; j++) { + try { + int size = heap.size(); + if (size < 0) { + negativeDetected.incrementAndGet(); + } + if (size > 0) { + heap.poll(); + } + } catch (IllegalStateException ignored) { + // Expected when heap is empty + } + } + } catch (InterruptedException | BrokenBarrierException ignored) { + Thread.currentThread().interrupt(); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + assertEquals("Size should never be negative", 0, negativeDetected.get()); + } + + /** Test heap remains valid after many concurrent operations */ + @SuppressWarnings("unused") + public void testHeapValidityAfterConcurrentAccess() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(500); + int numProducers = 4; + int numConsumers = 4; + int operationsPerProducer = 100; + + List<Thread> threads = new ArrayList<>(); + CountDownLatch producersStarted = new CountDownLatch(numProducers); + + // Producer threads + for (int p = 0; p < numProducers; p++) { + threads.add( + new Thread( + () -> { + producersStarted.countDown(); + for (int i = 0; i < operationsPerProducer; i++) { + heap.offer((float) (random().nextDouble() * 1000)); + } + })); + } + + // Consumer threads + for (int c = 0; c < numConsumers; c++) { + threads.add( + new Thread( + () -> { + try { + producersStarted.await(); // Wait for producers to start + } catch (InterruptedException ignored) { + // Continue + } + for (int i = 0; i < operationsPerProducer; i++) { + if (heap.size() > 0) { + heap.poll(); + } + } + })); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // Verify heap is still valid - all remaining elements should be in min-heap order + List<Float> elements = new ArrayList<>(); + while (heap.size() > 0) { + elements.add(heap.poll()); + } + + // Check that elements are in ascending order (min-heap property) + for (int i = 1; i < elements.size(); i++) { + assertTrue( + "Heap property violated at index " + + i + + ": " + + elements.get(i - 1) + + " > " + + elements.get(i), + elements.get(i - 1) <= elements.get(i)); + } + } + + /** Test bounded heap behavior */ + public void testBoundedHeapBehavior() { + BlockingFloatHeap heap = new BlockingFloatHeap(5); + + float[] values = {10.0f, 5.0f, 15.0f, 2.0f, 8.0f, 20.0f, 1.0f}; + for (float val : values) { + heap.offer(val); + } + + // Heap is bounded to 5, so should only contain 5 elements + assertEquals(5, heap.size()); + + // The min should be the top, and given the values, it should be around 2.0 + float top = heap.peek(); + assertTrue("Top should be relatively small", top <= 10.0f); + } + + /** Test that offer with sorted array works correctly under concurrent access */ + @SuppressWarnings("unused") + public void testConcurrentOfferArrayAndPoll() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(500); + AtomicInteger errors = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(6); + + Thread[] threads = new Thread[6]; + + // Threads that offer arrays + for (int i = 0; i < 3; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); + for (int j = 0; j < 10; j++) { + float[] sortedArray = new float[] {1.0f, 2.0f, 3.0f, 4.0f, 5.0f}; + heap.offer(sortedArray, 5); + } + } catch (InterruptedException | BrokenBarrierException ignored) { + errors.incrementAndGet(); + } + }); + } + + // Threads that poll + for (int i = 3; i < 6; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); + for (int j = 0; j < 25; j++) { + if (heap.size() > 0) { + heap.poll(); Review Comment: same as above..should catch IllegalStateException from poll() as an expected outcome, or wrap the check-then-poll in a try/catch. ########## lucene/core.tests/src/test/org/apache/lucene/core/tests/util/hnsw/TestBlockingFloatHeap.java: ########## @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.core.tests.util.hnsw; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.hnsw.BlockingFloatHeap; + +/** + * Unit tests for {@link BlockingFloatHeap} with focus on thread-safety and data race conditions. + */ +public class TestBlockingFloatHeap extends LuceneTestCase { + + /** Test basic offer and poll operations */ + public void testBasicOfferAndPoll() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + heap.offer(5.0f); + assertEquals(1, heap.size()); + assertEquals(5.0f, heap.peek(), 0.0f); + + heap.offer(3.0f); + assertEquals(2, heap.size()); + assertEquals(3.0f, heap.peek(), 0.0f); + + heap.offer(7.0f); + assertEquals(3, heap.size()); + assertEquals(3.0f, heap.peek(), 0.0f); + + float result = heap.poll(); + assertEquals(3.0f, result, 0.0f); + assertEquals(2, heap.size()); + assertEquals(5.0f, heap.peek(), 0.0f); + + result = heap.poll(); + assertEquals(5.0f, result, 0.0f); + assertEquals(1, heap.size()); + assertEquals(7.0f, heap.peek(), 0.0f); + + result = heap.poll(); + assertEquals(7.0f, result, 0.0f); + assertEquals(0, heap.size()); + } + + /** Test poll on empty heap throws exception */ + public void testPollEmptyHeap() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + expectThrows(IllegalStateException.class, heap::poll); + } + + /** Test offer with array */ + public void testOfferArray() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + float[] values = {1.0f, 2.0f, 3.0f, 4.0f, 5.0f}; + heap.offer(values, 5); + assertEquals(5, heap.size()); + assertEquals(1.0f, heap.peek(), 0.0f); + + float result = heap.poll(); + assertEquals(1.0f, result, 0.0f); + assertEquals(4, heap.size()); + assertEquals(2.0f, heap.peek(), 0.0f); + } + + /** Test heap maintains min-heap property */ + public void testHeapProperty() { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + int[] testValues = {42, 17, 93, 8, 55, 34, 71, 2, 99, 11}; + + for (int val : testValues) { + heap.offer(val); + } + + assertEquals(10, heap.size()); + + int previous = (int) heap.poll(); + while (heap.size() > 0) { + int current = (int) heap.poll(); + assertTrue( + "Heap property violated: " + current + " should be >= " + previous, current >= previous); + previous = current; + } + } + + /** + * Test concurrent offer and poll operations. This specifically tests the data race fix where + * poll() was checking size > 0 outside the lock. + */ + @SuppressWarnings("unused") + public void testConcurrentOfferAndPoll() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(1000); + int numThreads = 8; + int operationsPerThread = 100; + AtomicInteger errors = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CountDownLatch producersReady = new CountDownLatch(numThreads / 2); + + Thread[] threads = new Thread[numThreads]; + + // Create producer/consumer threads + for (int i = 0; i < numThreads / 2; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Synchronize thread start + for (int j = 0; j < operationsPerThread; j++) { + heap.offer((float) (random().nextDouble() * 1000)); + } + producersReady.countDown(); // Signal that this producer has completed + } catch (InterruptedException | BrokenBarrierException ignored) { + errors.incrementAndGet(); + } + }); + } + + for (int i = numThreads / 2; i < numThreads; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Synchronize thread start + // Wait for producers to add items to the heap + producersReady.await(); + for (int j = 0; j < operationsPerThread / 2; j++) { + if (heap.size() > 0) { + heap.poll(); + } + } Review Comment: checking heap.size() > 0 from one thread and then calling heap.poll() from that same thread, but another thread can poll() between those two calls. this is the exact same TOCTOU thing that you are fixing here :) can we either catch IllegalStateException from poll() as an expected outcome, or can wrap the check-then-poll in a try/catch? ########## lucene/core.tests/src/test/org/apache/lucene/core/tests/util/hnsw/TestBlockingFloatHeap.java: ########## @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.core.tests.util.hnsw; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.hnsw.BlockingFloatHeap; + +/** + * Unit tests for {@link BlockingFloatHeap} with focus on thread-safety and data race conditions. + */ +public class TestBlockingFloatHeap extends LuceneTestCase { Review Comment: I see more things here, which doesn't test this bug except testDataRaceOnPoll...other things i dont think other tests fail without the fix, we can keep testDataRaceOnPoll and remove others? ########## lucene/core.tests/src/test/org/apache/lucene/core/tests/util/hnsw/TestBlockingFloatHeap.java: ########## @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.core.tests.util.hnsw; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.hnsw.BlockingFloatHeap; + +/** + * Unit tests for {@link BlockingFloatHeap} with focus on thread-safety and data race conditions. + */ +public class TestBlockingFloatHeap extends LuceneTestCase { + + /** Test basic offer and poll operations */ + public void testBasicOfferAndPoll() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + heap.offer(5.0f); + assertEquals(1, heap.size()); + assertEquals(5.0f, heap.peek(), 0.0f); + + heap.offer(3.0f); + assertEquals(2, heap.size()); + assertEquals(3.0f, heap.peek(), 0.0f); + + heap.offer(7.0f); + assertEquals(3, heap.size()); + assertEquals(3.0f, heap.peek(), 0.0f); + + float result = heap.poll(); + assertEquals(3.0f, result, 0.0f); + assertEquals(2, heap.size()); + assertEquals(5.0f, heap.peek(), 0.0f); + + result = heap.poll(); + assertEquals(5.0f, result, 0.0f); + assertEquals(1, heap.size()); + assertEquals(7.0f, heap.peek(), 0.0f); + + result = heap.poll(); + assertEquals(7.0f, result, 0.0f); + assertEquals(0, heap.size()); + } + + /** Test poll on empty heap throws exception */ + public void testPollEmptyHeap() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + expectThrows(IllegalStateException.class, heap::poll); + } + + /** Test offer with array */ + public void testOfferArray() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + float[] values = {1.0f, 2.0f, 3.0f, 4.0f, 5.0f}; + heap.offer(values, 5); + assertEquals(5, heap.size()); + assertEquals(1.0f, heap.peek(), 0.0f); + + float result = heap.poll(); + assertEquals(1.0f, result, 0.0f); + assertEquals(4, heap.size()); + assertEquals(2.0f, heap.peek(), 0.0f); + } + + /** Test heap maintains min-heap property */ + public void testHeapProperty() { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + int[] testValues = {42, 17, 93, 8, 55, 34, 71, 2, 99, 11}; + + for (int val : testValues) { + heap.offer(val); + } + + assertEquals(10, heap.size()); + + int previous = (int) heap.poll(); + while (heap.size() > 0) { + int current = (int) heap.poll(); + assertTrue( + "Heap property violated: " + current + " should be >= " + previous, current >= previous); + previous = current; + } + } + + /** + * Test concurrent offer and poll operations. This specifically tests the data race fix where + * poll() was checking size > 0 outside the lock. + */ + @SuppressWarnings("unused") + public void testConcurrentOfferAndPoll() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(1000); + int numThreads = 8; + int operationsPerThread = 100; + AtomicInteger errors = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CountDownLatch producersReady = new CountDownLatch(numThreads / 2); + + Thread[] threads = new Thread[numThreads]; + + // Create producer/consumer threads + for (int i = 0; i < numThreads / 2; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Synchronize thread start + for (int j = 0; j < operationsPerThread; j++) { + heap.offer((float) (random().nextDouble() * 1000)); + } + producersReady.countDown(); // Signal that this producer has completed + } catch (InterruptedException | BrokenBarrierException ignored) { + errors.incrementAndGet(); + } + }); + } + + for (int i = numThreads / 2; i < numThreads; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Synchronize thread start + // Wait for producers to add items to the heap + producersReady.await(); + for (int j = 0; j < operationsPerThread / 2; j++) { + if (heap.size() > 0) { + heap.poll(); + } + } + } catch (InterruptedException | BrokenBarrierException ignored) { + errors.incrementAndGet(); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + assertEquals("Errors occurred during concurrent operations", 0, errors.get()); + } + + /** + * Test the specific data race scenario: multiple threads calling poll() simultaneously when heap + * is near empty + */ + @SuppressWarnings("unused") + public void testDataRaceOnPoll() throws InterruptedException { + int numThreads = 10; + for (int iteration = 0; iteration < 10; iteration++) { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + + // Fill heap with exactly numThreads elements + for (int i = 0; i < numThreads; i++) { + heap.offer((float) i); + } + + AtomicInteger successfulPolls = new AtomicInteger(0); + AtomicInteger failedPolls = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + + Thread[] threads = new Thread[numThreads]; + + // All threads try to poll simultaneously + for (int i = 0; i < numThreads; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Ensure all threads start simultaneously + try { + heap.poll(); + successfulPolls.incrementAndGet(); + } catch (IllegalStateException ignored) { + failedPolls.incrementAndGet(); + } + } catch (InterruptedException | BrokenBarrierException ignored) { + Thread.currentThread().interrupt(); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // Verify exactly numThreads successful polls and 0 failed + assertEquals( + "Iteration " + iteration + ": Expected " + numThreads + " successful polls", + numThreads, + successfulPolls.get()); + assertEquals("Iteration " + iteration + ": Expected 0 failed polls", 0, failedPolls.get()); + assertEquals("Iteration " + iteration + ": Heap should be empty", 0, heap.size()); + } + } + + /** + * Test the data race scenario where one thread polls while another calls peek. This ensures + * peek() reads consistent state. + */ + @SuppressWarnings("unused") + public void testConcurrentPollAndPeek() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + + for (int i = 0; i < 100; i++) { + heap.offer((float) i); + } + + AtomicInteger peekErrors = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(2); + + Thread pollThread = + new Thread( + () -> { + try { + barrier.await(); + for (int i = 0; i < 50; i++) { + if (heap.size() > 0) { + heap.poll(); + } + } + } catch (InterruptedException | BrokenBarrierException ignored) { + Thread.currentThread().interrupt(); + } + }); + + Thread peekThread = + new Thread( + () -> { + try { + barrier.await(); + for (int i = 0; i < 100; i++) { + if (heap.size() > 0) { + float peek = heap.peek(); + assertTrue("Peek returned invalid value", peek >= 0); + } + } + } catch (InterruptedException | BrokenBarrierException | IllegalStateException e) { + // IllegalStateException is possible if heap becomes empty + if (!(e instanceof IllegalStateException)) { + peekErrors.incrementAndGet(); + } + } + }); + + pollThread.start(); + peekThread.start(); + + pollThread.join(); + peekThread.join(); + + assertEquals("No errors should occur during concurrent peek/poll", 0, peekErrors.get()); + } + + /** + * Test that the fix prevents negative size values. This would be a clear indicator of the data + * race. + */ + @SuppressWarnings("unused") + public void testSizeNeverNegative() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + + // Pre-fill the heap + for (int i = 0; i < 50; i++) { + heap.offer((float) i); + } + + AtomicInteger negativeDetected = new AtomicInteger(0); + int numThreads = 10; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + + Thread[] threads = new Thread[numThreads]; + + for (int i = 0; i < numThreads; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); + for (int j = 0; j < 100; j++) { + try { + int size = heap.size(); + if (size < 0) { + negativeDetected.incrementAndGet(); + } + if (size > 0) { + heap.poll(); Review Comment: same as above..should catch IllegalStateException from poll() as an expected outcome, or wrap the check-then-poll in a try/catch. ########## lucene/core.tests/src/test/org/apache/lucene/core/tests/util/hnsw/TestBlockingFloatHeap.java: ########## @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.core.tests.util.hnsw; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.hnsw.BlockingFloatHeap; + +/** + * Unit tests for {@link BlockingFloatHeap} with focus on thread-safety and data race conditions. + */ +public class TestBlockingFloatHeap extends LuceneTestCase { + + /** Test basic offer and poll operations */ + public void testBasicOfferAndPoll() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + heap.offer(5.0f); + assertEquals(1, heap.size()); + assertEquals(5.0f, heap.peek(), 0.0f); + + heap.offer(3.0f); + assertEquals(2, heap.size()); + assertEquals(3.0f, heap.peek(), 0.0f); + + heap.offer(7.0f); + assertEquals(3, heap.size()); + assertEquals(3.0f, heap.peek(), 0.0f); + + float result = heap.poll(); + assertEquals(3.0f, result, 0.0f); + assertEquals(2, heap.size()); + assertEquals(5.0f, heap.peek(), 0.0f); + + result = heap.poll(); + assertEquals(5.0f, result, 0.0f); + assertEquals(1, heap.size()); + assertEquals(7.0f, heap.peek(), 0.0f); + + result = heap.poll(); + assertEquals(7.0f, result, 0.0f); + assertEquals(0, heap.size()); + } + + /** Test poll on empty heap throws exception */ + public void testPollEmptyHeap() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + expectThrows(IllegalStateException.class, heap::poll); + } + + /** Test offer with array */ + public void testOfferArray() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + float[] values = {1.0f, 2.0f, 3.0f, 4.0f, 5.0f}; + heap.offer(values, 5); + assertEquals(5, heap.size()); + assertEquals(1.0f, heap.peek(), 0.0f); + + float result = heap.poll(); + assertEquals(1.0f, result, 0.0f); + assertEquals(4, heap.size()); + assertEquals(2.0f, heap.peek(), 0.0f); + } + + /** Test heap maintains min-heap property */ + public void testHeapProperty() { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + int[] testValues = {42, 17, 93, 8, 55, 34, 71, 2, 99, 11}; + + for (int val : testValues) { + heap.offer(val); + } + + assertEquals(10, heap.size()); + + int previous = (int) heap.poll(); + while (heap.size() > 0) { + int current = (int) heap.poll(); + assertTrue( + "Heap property violated: " + current + " should be >= " + previous, current >= previous); + previous = current; + } + } + + /** + * Test concurrent offer and poll operations. This specifically tests the data race fix where + * poll() was checking size > 0 outside the lock. + */ + @SuppressWarnings("unused") + public void testConcurrentOfferAndPoll() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(1000); + int numThreads = 8; + int operationsPerThread = 100; + AtomicInteger errors = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CountDownLatch producersReady = new CountDownLatch(numThreads / 2); + + Thread[] threads = new Thread[numThreads]; + + // Create producer/consumer threads + for (int i = 0; i < numThreads / 2; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Synchronize thread start + for (int j = 0; j < operationsPerThread; j++) { + heap.offer((float) (random().nextDouble() * 1000)); + } + producersReady.countDown(); // Signal that this producer has completed + } catch (InterruptedException | BrokenBarrierException ignored) { + errors.incrementAndGet(); + } + }); + } + + for (int i = numThreads / 2; i < numThreads; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Synchronize thread start + // Wait for producers to add items to the heap + producersReady.await(); + for (int j = 0; j < operationsPerThread / 2; j++) { + if (heap.size() > 0) { + heap.poll(); + } + } + } catch (InterruptedException | BrokenBarrierException ignored) { + errors.incrementAndGet(); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + assertEquals("Errors occurred during concurrent operations", 0, errors.get()); + } + + /** + * Test the specific data race scenario: multiple threads calling poll() simultaneously when heap + * is near empty + */ + @SuppressWarnings("unused") + public void testDataRaceOnPoll() throws InterruptedException { + int numThreads = 10; + for (int iteration = 0; iteration < 10; iteration++) { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + + // Fill heap with exactly numThreads elements + for (int i = 0; i < numThreads; i++) { + heap.offer((float) i); + } + + AtomicInteger successfulPolls = new AtomicInteger(0); + AtomicInteger failedPolls = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + + Thread[] threads = new Thread[numThreads]; + + // All threads try to poll simultaneously + for (int i = 0; i < numThreads; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Ensure all threads start simultaneously + try { + heap.poll(); + successfulPolls.incrementAndGet(); + } catch (IllegalStateException ignored) { + failedPolls.incrementAndGet(); + } + } catch (InterruptedException | BrokenBarrierException ignored) { + Thread.currentThread().interrupt(); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // Verify exactly numThreads successful polls and 0 failed + assertEquals( + "Iteration " + iteration + ": Expected " + numThreads + " successful polls", + numThreads, + successfulPolls.get()); + assertEquals("Iteration " + iteration + ": Expected 0 failed polls", 0, failedPolls.get()); + assertEquals("Iteration " + iteration + ": Heap should be empty", 0, heap.size()); + } + } + + /** + * Test the data race scenario where one thread polls while another calls peek. This ensures + * peek() reads consistent state. + */ + @SuppressWarnings("unused") + public void testConcurrentPollAndPeek() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + + for (int i = 0; i < 100; i++) { + heap.offer((float) i); + } + + AtomicInteger peekErrors = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(2); + + Thread pollThread = + new Thread( + () -> { + try { + barrier.await(); + for (int i = 0; i < 50; i++) { + if (heap.size() > 0) { + heap.poll(); + } + } + } catch (InterruptedException | BrokenBarrierException ignored) { + Thread.currentThread().interrupt(); + } + }); + + Thread peekThread = + new Thread( + () -> { + try { + barrier.await(); + for (int i = 0; i < 100; i++) { + if (heap.size() > 0) { + float peek = heap.peek(); + assertTrue("Peek returned invalid value", peek >= 0); + } + } + } catch (InterruptedException | BrokenBarrierException | IllegalStateException e) { + // IllegalStateException is possible if heap becomes empty + if (!(e instanceof IllegalStateException)) { + peekErrors.incrementAndGet(); + } + } + }); + + pollThread.start(); + peekThread.start(); + + pollThread.join(); + peekThread.join(); + + assertEquals("No errors should occur during concurrent peek/poll", 0, peekErrors.get()); + } + + /** + * Test that the fix prevents negative size values. This would be a clear indicator of the data + * race. + */ + @SuppressWarnings("unused") + public void testSizeNeverNegative() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + + // Pre-fill the heap + for (int i = 0; i < 50; i++) { + heap.offer((float) i); + } + + AtomicInteger negativeDetected = new AtomicInteger(0); + int numThreads = 10; + CyclicBarrier barrier = new CyclicBarrier(numThreads); + + Thread[] threads = new Thread[numThreads]; + + for (int i = 0; i < numThreads; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); + for (int j = 0; j < 100; j++) { + try { + int size = heap.size(); + if (size < 0) { + negativeDetected.incrementAndGet(); + } + if (size > 0) { + heap.poll(); + } + } catch (IllegalStateException ignored) { + // Expected when heap is empty + } + } + } catch (InterruptedException | BrokenBarrierException ignored) { + Thread.currentThread().interrupt(); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + assertEquals("Size should never be negative", 0, negativeDetected.get()); + } + + /** Test heap remains valid after many concurrent operations */ + @SuppressWarnings("unused") + public void testHeapValidityAfterConcurrentAccess() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(500); + int numProducers = 4; + int numConsumers = 4; + int operationsPerProducer = 100; + + List<Thread> threads = new ArrayList<>(); + CountDownLatch producersStarted = new CountDownLatch(numProducers); + + // Producer threads + for (int p = 0; p < numProducers; p++) { + threads.add( + new Thread( + () -> { + producersStarted.countDown(); + for (int i = 0; i < operationsPerProducer; i++) { + heap.offer((float) (random().nextDouble() * 1000)); + } + })); + } + + // Consumer threads + for (int c = 0; c < numConsumers; c++) { + threads.add( + new Thread( + () -> { + try { + producersStarted.await(); // Wait for producers to start + } catch (InterruptedException ignored) { + // Continue + } + for (int i = 0; i < operationsPerProducer; i++) { + if (heap.size() > 0) { Review Comment: same as above..should catch IllegalStateException from poll() as an expected outcome, or wrap the check-then-poll in a try/catch. ########## lucene/core.tests/src/test/org/apache/lucene/core/tests/util/hnsw/TestBlockingFloatHeap.java: ########## @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.lucene.core.tests.util.hnsw; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.hnsw.BlockingFloatHeap; + +/** + * Unit tests for {@link BlockingFloatHeap} with focus on thread-safety and data race conditions. + */ +public class TestBlockingFloatHeap extends LuceneTestCase { + + /** Test basic offer and poll operations */ + public void testBasicOfferAndPoll() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + heap.offer(5.0f); + assertEquals(1, heap.size()); + assertEquals(5.0f, heap.peek(), 0.0f); + + heap.offer(3.0f); + assertEquals(2, heap.size()); + assertEquals(3.0f, heap.peek(), 0.0f); + + heap.offer(7.0f); + assertEquals(3, heap.size()); + assertEquals(3.0f, heap.peek(), 0.0f); + + float result = heap.poll(); + assertEquals(3.0f, result, 0.0f); + assertEquals(2, heap.size()); + assertEquals(5.0f, heap.peek(), 0.0f); + + result = heap.poll(); + assertEquals(5.0f, result, 0.0f); + assertEquals(1, heap.size()); + assertEquals(7.0f, heap.peek(), 0.0f); + + result = heap.poll(); + assertEquals(7.0f, result, 0.0f); + assertEquals(0, heap.size()); + } + + /** Test poll on empty heap throws exception */ + public void testPollEmptyHeap() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + expectThrows(IllegalStateException.class, heap::poll); + } + + /** Test offer with array */ + public void testOfferArray() { + BlockingFloatHeap heap = new BlockingFloatHeap(10); + float[] values = {1.0f, 2.0f, 3.0f, 4.0f, 5.0f}; + heap.offer(values, 5); + assertEquals(5, heap.size()); + assertEquals(1.0f, heap.peek(), 0.0f); + + float result = heap.poll(); + assertEquals(1.0f, result, 0.0f); + assertEquals(4, heap.size()); + assertEquals(2.0f, heap.peek(), 0.0f); + } + + /** Test heap maintains min-heap property */ + public void testHeapProperty() { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + int[] testValues = {42, 17, 93, 8, 55, 34, 71, 2, 99, 11}; + + for (int val : testValues) { + heap.offer(val); + } + + assertEquals(10, heap.size()); + + int previous = (int) heap.poll(); + while (heap.size() > 0) { + int current = (int) heap.poll(); + assertTrue( + "Heap property violated: " + current + " should be >= " + previous, current >= previous); + previous = current; + } + } + + /** + * Test concurrent offer and poll operations. This specifically tests the data race fix where + * poll() was checking size > 0 outside the lock. + */ + @SuppressWarnings("unused") + public void testConcurrentOfferAndPoll() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(1000); + int numThreads = 8; + int operationsPerThread = 100; + AtomicInteger errors = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + CountDownLatch producersReady = new CountDownLatch(numThreads / 2); + + Thread[] threads = new Thread[numThreads]; + + // Create producer/consumer threads + for (int i = 0; i < numThreads / 2; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Synchronize thread start + for (int j = 0; j < operationsPerThread; j++) { + heap.offer((float) (random().nextDouble() * 1000)); + } + producersReady.countDown(); // Signal that this producer has completed + } catch (InterruptedException | BrokenBarrierException ignored) { + errors.incrementAndGet(); + } + }); + } + + for (int i = numThreads / 2; i < numThreads; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Synchronize thread start + // Wait for producers to add items to the heap + producersReady.await(); + for (int j = 0; j < operationsPerThread / 2; j++) { + if (heap.size() > 0) { + heap.poll(); + } + } + } catch (InterruptedException | BrokenBarrierException ignored) { + errors.incrementAndGet(); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + assertEquals("Errors occurred during concurrent operations", 0, errors.get()); + } + + /** + * Test the specific data race scenario: multiple threads calling poll() simultaneously when heap + * is near empty + */ + @SuppressWarnings("unused") + public void testDataRaceOnPoll() throws InterruptedException { + int numThreads = 10; + for (int iteration = 0; iteration < 10; iteration++) { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + + // Fill heap with exactly numThreads elements + for (int i = 0; i < numThreads; i++) { + heap.offer((float) i); + } + + AtomicInteger successfulPolls = new AtomicInteger(0); + AtomicInteger failedPolls = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(numThreads); + + Thread[] threads = new Thread[numThreads]; + + // All threads try to poll simultaneously + for (int i = 0; i < numThreads; i++) { + threads[i] = + new Thread( + () -> { + try { + barrier.await(); // Ensure all threads start simultaneously + try { + heap.poll(); + successfulPolls.incrementAndGet(); + } catch (IllegalStateException ignored) { + failedPolls.incrementAndGet(); + } + } catch (InterruptedException | BrokenBarrierException ignored) { + Thread.currentThread().interrupt(); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + // Verify exactly numThreads successful polls and 0 failed + assertEquals( + "Iteration " + iteration + ": Expected " + numThreads + " successful polls", + numThreads, + successfulPolls.get()); + assertEquals("Iteration " + iteration + ": Expected 0 failed polls", 0, failedPolls.get()); + assertEquals("Iteration " + iteration + ": Heap should be empty", 0, heap.size()); + } + } + + /** + * Test the data race scenario where one thread polls while another calls peek. This ensures + * peek() reads consistent state. + */ + @SuppressWarnings("unused") + public void testConcurrentPollAndPeek() throws InterruptedException { + BlockingFloatHeap heap = new BlockingFloatHeap(100); + + for (int i = 0; i < 100; i++) { + heap.offer((float) i); + } + + AtomicInteger peekErrors = new AtomicInteger(0); + CyclicBarrier barrier = new CyclicBarrier(2); + + Thread pollThread = + new Thread( + () -> { + try { + barrier.await(); + for (int i = 0; i < 50; i++) { + if (heap.size() > 0) { + heap.poll(); + } + } + } catch (InterruptedException | BrokenBarrierException ignored) { + Thread.currentThread().interrupt(); + } + }); + + Thread peekThread = + new Thread( Review Comment: peek i don't see is being the fix? guess this test is outside the scope.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
