jtuglu1 opened a new issue, #18943: URL: https://github.com/apache/druid/issues/18943
### Affected Version All long-running (at least ~80days) Druid versions that have parallel merging on Broker enabled and are running JDK 17. ### Description Broker ForkJoinPool affected by [DK-8330017](https://bugs.openjdk.org/browse/JDK-8330017). The ctl field's RC (Release Count) is a 16-bit signed value tracking released workers. A masking bug causes RC to decrement on every worker release/re-acquire cycle. Root Cause: 1. RC starts near 0 2. Decrements over ~80 days of continuous operation 3. Reaches -32768, then overflows to +32767 (MAX_CAP) 4. Since RC = MAX_CAP, the pool believes all workers are available but refuses to schedule new tasks. Workers sit idle in `awaitWork()` forever. This causes all aggregation queries on the broker to stall, like `select count(*) from A` Reproducing test: ```java /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.apache.druid.concurrent; import com.google.common.collect.Ordering; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Yielder; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QueryTimeoutException; import org.junit.Assume; import org.junit.Test; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.BinaryOperator; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** * Test that demonstrates JDK-8330017: ForkJoinPool stops executing tasks due to * ctl field Release Count (RC) overflow. * * Bug details: https://bugs.openjdk.org/browse/JDK-8330017 * * The FJP internal ctl field has an RC (Release Count) that tracks worker thread state. * Due to a bug in JDK 11/17: * - RC decrements over time as threads are released/re-acquired * - After ~80 days of continuous operation, RC can reach -32768 (minimum 16-bit value) * - On next decrement, it overflows to +32767 (MAX_CAP) * - Pool then thinks it has max threads and stops scheduling tasks * - Workers sit idle at awaitWork() forever, never picking up new tasks * */ public class ForkJoinPoolRCOverflowTest { private static final Logger LOG = new Logger(ForkJoinPoolRCOverflowTest.class); // ForkJoinPool ctl field bit layout (JDK 11/17): // Bits 48-63: RC (Release Count) - released workers minus target parallelism private static final int RC_SHIFT = 48; private static final long RC_MASK = 0xffffL << RC_SHIFT; private static final int MAX_CAP = 0x7fff; // 32767 @Test public void testRCOverflowWithParallelMergeCombiningSequence() throws Exception { ForkJoinPool pool = new ForkJoinPool( 4, ForkJoinPool.defaultForkJoinWorkerThreadFactory, (t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t), true ); Field ctlField; try { ctlField = ForkJoinPool.class.getDeclaredField("ctl"); ctlField.setAccessible(true); } catch (Exception e) { LOG.warn("Cannot access ForkJoinPool.ctl field, skipping test: %s", e.getMessage()); pool.shutdown(); Assume.assumeTrue("Cannot access ForkJoinPool.ctl field via reflection", false); return; } try { // First verify the pool works with ParallelMergeCombiningSequence List<Sequence<IntPair>> normalInput = new ArrayList<>(); normalInput.add(generateSequence(100)); normalInput.add(generateSequence(100)); normalInput.add(generateSequence(100)); normalInput.add(generateSequence(100)); ParallelMergeCombiningSequence<IntPair> normalSequence = new ParallelMergeCombiningSequence<>( pool, normalInput, INT_PAIR_ORDERING, INT_PAIR_MERGE_FN, true, 5000, 0, 4, 128, 64, ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS, null ); // Verify normal operation works Yielder<IntPair> normalYielder = Yielders.each(normalSequence); int normalCount = 0; while (!normalYielder.isDone()) { normalYielder = normalYielder.next(normalYielder.get()); normalCount++; } normalYielder.close(); LOG.info("Normal ParallelMergeCombiningSequence processed %d items", normalCount); assertTrue("Should process items normally", normalCount > 0); // Wait for pool to be quiescent assertTrue("Pool should be quiescent", pool.awaitQuiescence(5, TimeUnit.SECONDS)); // Simulate the RC overflow by manipulating the ctl field long currentCtl = (long) ctlField.get(pool); long preOverflowRC = extractRC(currentCtl); LOG.info("Pre-overflow ctl: 0x%016X, RC=%d", currentCtl, preOverflowRC); // Set RC to MAX_CAP to simulate the overflow condition long overflowCtl = setRC(currentCtl, MAX_CAP); ctlField.set(pool, overflowCtl); // Verify the ctl field was actually modified long verifyCtl = (long) ctlField.get(pool); long postOverflowRC = extractRC(verifyCtl); LOG.info("Post-overflow ctl: 0x%016X, RC=%d", verifyCtl, postOverflowRC); assertEquals("RC should be set to MAX_CAP (32767)", MAX_CAP, postOverflowRC); // Now try to use ParallelMergeCombiningSequence with the corrupted pool List<Sequence<IntPair>> overflowInput = new ArrayList<>(); overflowInput.add(generateSequence(1000)); overflowInput.add(generateSequence(1000)); overflowInput.add(generateSequence(1000)); overflowInput.add(generateSequence(1000)); // Use a short timeout to detect the hang ParallelMergeCombiningSequence<IntPair> overflowSequence = new ParallelMergeCombiningSequence<>( pool, overflowInput, INT_PAIR_ORDERING, INT_PAIR_MERGE_FN, true, 2000, // 2 second timeout - should hit this if bug is present 0, 4, 128, 64, ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS, null ); // With RC at MAX_CAP, the pool should fail to schedule tasks and timeout try { Yielder<IntPair> overflowYielder = Yielders.each(overflowSequence); int overflowCount = 0; while (!overflowYielder.isDone()) { overflowYielder = overflowYielder.next(overflowYielder.get()); overflowCount++; } overflowYielder.close(); // If we get here, the pool somehow recovered - this can happen if the JDK // implementation handles the corrupted state gracefully LOG.info("Overflow sequence processed %d items (pool recovered from corrupted state)", overflowCount); fail("Expected QueryTimeoutException due to RC overflow, but pool recovered and processed " + overflowCount + " items"); } catch (QueryTimeoutException e) { // This is the expected behavior when the bug is active: // Pool thinks it has MAX_CAP workers available but won't schedule new tasks LOG.info("ParallelMergeCombiningSequence timed out as expected with RC overflow: %s", e.getMessage()); assertTrue("Timeout message should mention timeout", e.getMessage().contains("timeout") || e.getMessage().contains("complete")); } } finally { pool.shutdownNow(); pool.awaitTermination(5, TimeUnit.SECONDS); } } private static final Ordering<IntPair> INT_PAIR_ORDERING = Ordering.natural().onResultOf(p -> p.lhs); private static final BinaryOperator<IntPair> INT_PAIR_MERGE_FN = (lhs, rhs) -> { if (lhs == null) { return rhs; } if (rhs == null) { return lhs; } return new IntPair(lhs.lhs, lhs.rhs + rhs.rhs); }; private static class IntPair extends Pair<Integer, Integer> { IntPair(Integer lhs, Integer rhs) { super(lhs, rhs); } } private static Sequence<IntPair> generateSequence(int size) { return new BaseSequence<>( new BaseSequence.IteratorMaker<IntPair, Iterator<IntPair>>() { @Override public Iterator<IntPair> make() { return new Iterator<IntPair>() { int mergeKey = 0; int rowCounter = 0; @Override public boolean hasNext() { return rowCounter < size; } @Override public IntPair next() { rowCounter++; mergeKey += ThreadLocalRandom.current().nextInt(1, 3); return new IntPair(mergeKey, ThreadLocalRandom.current().nextInt(1, 100)); } }; } @Override public void cleanup(Iterator<IntPair> iterFromMake) { // nothing to cleanup } } ); } private static long extractRC(long ctl) { return (short) (ctl >> RC_SHIFT); } private static long setRC(long ctl, int rc) { return (ctl & ~RC_MASK) | (((long) rc & 0xffffL) << RC_SHIFT); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
