jtuglu1 opened a new issue, #18943:
URL: https://github.com/apache/druid/issues/18943

   ### Affected Version
   
   All long-running (at least ~80days) Druid versions that have parallel 
merging on Broker enabled and are running JDK 17. 
   ### Description
   
   Broker ForkJoinPool affected by 
[DK-8330017](https://bugs.openjdk.org/browse/JDK-8330017). The ctl field's RC 
(Release Count) is a 16-bit signed value tracking released workers. A masking 
bug causes RC to decrement on every worker release/re-acquire cycle.
   
   Root Cause:
   1. RC starts near 0
   2. Decrements over ~80 days of continuous operation
   3. Reaches -32768, then overflows to +32767 (MAX_CAP)
   4. Since RC = MAX_CAP, the pool believes all workers are available but 
refuses to schedule new tasks. Workers sit idle in `awaitWork()` forever. This 
causes all aggregation queries on the broker to stall, like `select count(*) 
from A`
   
   Reproducing test:
   
   ```java
   /*
    * Licensed to the Apache Software Foundation (ASF) under one
    * or more contributor license agreements.  See the NOTICE file
    * distributed with this work for additional information
    * regarding copyright ownership.  The ASF licenses this file
    * to you under the Apache License, Version 2.0 (the
    * "License"); you may not use this file except in compliance
    * with the License.  You may obtain a copy of the License at
    *
    *   http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing,
    * software distributed under the License is distributed on an
    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    * KIND, either express or implied.  See the License for the
    * specific language governing permissions and limitations
    * under the License.
    */
   
   package org.apache.druid.concurrent;
   
   import com.google.common.collect.Ordering;
   import org.apache.druid.java.util.common.Pair;
   import org.apache.druid.java.util.common.guava.BaseSequence;
   import 
org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
   import org.apache.druid.java.util.common.guava.Sequence;
   import org.apache.druid.java.util.common.guava.Yielder;
   import org.apache.druid.java.util.common.guava.Yielders;
   import org.apache.druid.java.util.common.logger.Logger;
   import org.apache.druid.query.QueryTimeoutException;
   import org.junit.Assume;
   import org.junit.Test;
   
   import java.lang.reflect.Field;
   import java.util.ArrayList;
   import java.util.Iterator;
   import java.util.List;
   import java.util.concurrent.ForkJoinPool;
   import java.util.concurrent.ThreadLocalRandom;
   import java.util.concurrent.TimeUnit;
   import java.util.function.BinaryOperator;
   
   import static org.junit.Assert.assertEquals;
   import static org.junit.Assert.assertTrue;
   import static org.junit.Assert.fail;
   
   /**
    * Test that demonstrates JDK-8330017: ForkJoinPool stops executing tasks 
due to
    * ctl field Release Count (RC) overflow.
    *
    * Bug details: https://bugs.openjdk.org/browse/JDK-8330017
    *
    * The FJP internal ctl field has an RC (Release Count) that tracks worker 
thread state.
    * Due to a bug in JDK 11/17:
    * - RC decrements over time as threads are released/re-acquired
    * - After ~80 days of continuous operation, RC can reach -32768 (minimum 
16-bit value)
    * - On next decrement, it overflows to +32767 (MAX_CAP)
    * - Pool then thinks it has max threads and stops scheduling tasks
    * - Workers sit idle at awaitWork() forever, never picking up new tasks
    *
    */
   public class ForkJoinPoolRCOverflowTest
   {
     private static final Logger LOG = new 
Logger(ForkJoinPoolRCOverflowTest.class);
   
     // ForkJoinPool ctl field bit layout (JDK 11/17):
     // Bits 48-63: RC (Release Count) - released workers minus target 
parallelism
     private static final int RC_SHIFT = 48;
     private static final long RC_MASK = 0xffffL << RC_SHIFT;
     private static final int MAX_CAP = 0x7fff; // 32767
   
     @Test
     public void testRCOverflowWithParallelMergeCombiningSequence() throws 
Exception
     {
       ForkJoinPool pool = new ForkJoinPool(
           4,
           ForkJoinPool.defaultForkJoinWorkerThreadFactory,
           (t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t),
           true
       );
   
       Field ctlField;
       try {
         ctlField = ForkJoinPool.class.getDeclaredField("ctl");
         ctlField.setAccessible(true);
       }
       catch (Exception e) {
         LOG.warn("Cannot access ForkJoinPool.ctl field, skipping test: %s", 
e.getMessage());
         pool.shutdown();
         Assume.assumeTrue("Cannot access ForkJoinPool.ctl field via 
reflection", false);
         return;
       }
   
       try {
         // First verify the pool works with ParallelMergeCombiningSequence
         List<Sequence<IntPair>> normalInput = new ArrayList<>();
         normalInput.add(generateSequence(100));
         normalInput.add(generateSequence(100));
         normalInput.add(generateSequence(100));
         normalInput.add(generateSequence(100));
   
         ParallelMergeCombiningSequence<IntPair> normalSequence = new 
ParallelMergeCombiningSequence<>(
             pool,
             normalInput,
             INT_PAIR_ORDERING,
             INT_PAIR_MERGE_FN,
             true,
             5000,
             0,
             4,
             128,
             64,
             ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS,
             null
         );
   
         // Verify normal operation works
         Yielder<IntPair> normalYielder = Yielders.each(normalSequence);
         int normalCount = 0;
         while (!normalYielder.isDone()) {
           normalYielder = normalYielder.next(normalYielder.get());
           normalCount++;
         }
         normalYielder.close();
         LOG.info("Normal ParallelMergeCombiningSequence processed %d items", 
normalCount);
         assertTrue("Should process items normally", normalCount > 0);
   
         // Wait for pool to be quiescent
         assertTrue("Pool should be quiescent", pool.awaitQuiescence(5, 
TimeUnit.SECONDS));
   
         // Simulate the RC overflow by manipulating the ctl field
         long currentCtl = (long) ctlField.get(pool);
         long preOverflowRC = extractRC(currentCtl);
         LOG.info("Pre-overflow ctl: 0x%016X, RC=%d", currentCtl, 
preOverflowRC);
   
         // Set RC to MAX_CAP to simulate the overflow condition
         long overflowCtl = setRC(currentCtl, MAX_CAP);
         ctlField.set(pool, overflowCtl);
   
         // Verify the ctl field was actually modified
         long verifyCtl = (long) ctlField.get(pool);
         long postOverflowRC = extractRC(verifyCtl);
         LOG.info("Post-overflow ctl: 0x%016X, RC=%d", verifyCtl, 
postOverflowRC);
         assertEquals("RC should be set to MAX_CAP (32767)", MAX_CAP, 
postOverflowRC);
   
         // Now try to use ParallelMergeCombiningSequence with the corrupted 
pool
         List<Sequence<IntPair>> overflowInput = new ArrayList<>();
         overflowInput.add(generateSequence(1000));
         overflowInput.add(generateSequence(1000));
         overflowInput.add(generateSequence(1000));
         overflowInput.add(generateSequence(1000));
   
         // Use a short timeout to detect the hang
         ParallelMergeCombiningSequence<IntPair> overflowSequence = new 
ParallelMergeCombiningSequence<>(
             pool,
             overflowInput,
             INT_PAIR_ORDERING,
             INT_PAIR_MERGE_FN,
             true,
             2000, // 2 second timeout - should hit this if bug is present
             0,
             4,
             128,
             64,
             ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS,
             null
         );
   
         // With RC at MAX_CAP, the pool should fail to schedule tasks and 
timeout
         try {
           Yielder<IntPair> overflowYielder = Yielders.each(overflowSequence);
           int overflowCount = 0;
           while (!overflowYielder.isDone()) {
             overflowYielder = overflowYielder.next(overflowYielder.get());
             overflowCount++;
           }
           overflowYielder.close();
           // If we get here, the pool somehow recovered - this can happen if 
the JDK
           // implementation handles the corrupted state gracefully
           LOG.info("Overflow sequence processed %d items (pool recovered from 
corrupted state)", overflowCount);
           fail("Expected QueryTimeoutException due to RC overflow, but pool 
recovered and processed "
                + overflowCount + " items");
         }
         catch (QueryTimeoutException e) {
           // This is the expected behavior when the bug is active:
           // Pool thinks it has MAX_CAP workers available but won't schedule 
new tasks
           LOG.info("ParallelMergeCombiningSequence timed out as expected with 
RC overflow: %s", e.getMessage());
           assertTrue("Timeout message should mention timeout",
               e.getMessage().contains("timeout") || 
e.getMessage().contains("complete"));
         }
       }
       finally {
         pool.shutdownNow();
         pool.awaitTermination(5, TimeUnit.SECONDS);
       }
     }
   
     private static final Ordering<IntPair> INT_PAIR_ORDERING = 
Ordering.natural().onResultOf(p -> p.lhs);
   
     private static final BinaryOperator<IntPair> INT_PAIR_MERGE_FN = (lhs, 
rhs) -> {
       if (lhs == null) {
         return rhs;
       }
       if (rhs == null) {
         return lhs;
       }
       return new IntPair(lhs.lhs, lhs.rhs + rhs.rhs);
     };
   
     private static class IntPair extends Pair<Integer, Integer>
     {
       IntPair(Integer lhs, Integer rhs)
       {
         super(lhs, rhs);
       }
     }
   
     private static Sequence<IntPair> generateSequence(int size)
     {
       return new BaseSequence<>(
           new BaseSequence.IteratorMaker<IntPair, Iterator<IntPair>>()
           {
             @Override
             public Iterator<IntPair> make()
             {
               return new Iterator<IntPair>()
               {
                 int mergeKey = 0;
                 int rowCounter = 0;
   
                 @Override
                 public boolean hasNext()
                 {
                   return rowCounter < size;
                 }
   
                 @Override
                 public IntPair next()
                 {
                   rowCounter++;
                   mergeKey += ThreadLocalRandom.current().nextInt(1, 3);
                   return new IntPair(mergeKey, 
ThreadLocalRandom.current().nextInt(1, 100));
                 }
               };
             }
   
             @Override
             public void cleanup(Iterator<IntPair> iterFromMake)
             {
               // nothing to cleanup
             }
           }
       );
     }
   
     private static long extractRC(long ctl)
     {
       return (short) (ctl >> RC_SHIFT);
     }
   
     private static long setRC(long ctl, int rc)
     {
       return (ctl & ~RC_MASK) | (((long) rc & 0xffffL) << RC_SHIFT);
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to