This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch 34.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/34.0.0 by this push:
     new 436e6ff9414  [Backport] Fix race in SegmentLoadDropHandler. (#18291) 
#18300
436e6ff9414 is described below

commit 436e6ff94142b869807c64e8c50b765c6853b830
Author: Karan Kumar <[email protected]>
AuthorDate: Mon Jul 21 14:32:24 2025 +0530

     [Backport] Fix race in SegmentLoadDropHandler. (#18291) #18300
    
    SegmentLoadDropHandler has a race that causes dropSegmentDelayMillis to
    not be respected. It happens in this sequence of events:
    
    - removeSegment is called, which adds the segment to segmentsToDelete
      and schedules a future drop.
    - addSegment is called, which removes the segment from segmentsToDelete
      and proceeds to load it.
    - removeSegment is called again, which adds the segment back to
      segmentsToDelete, before the originally-scheduled drop executes.
    - The originally-scheduled drop executes, sees the segment is in
      segmentsToDelete, and removes it. This could happen immediately after
      the second call to removeSegment, defeating dropSegmentDelayMillis.
    
    This can occur in practice when the coordinator period is less than the
    dropSegmentDelayMillis. By default, it's not (default is 1m Coordinator
    period and 30s dropSegmentDelayMillis).
    
    This patch addresses it by ensuring that delayed drops only happen if
    the entry in segmentDropLatches corresponds to the removeSegment call
    that scheduled the drop.
    
    (cherry picked from commit d5bdb7384af3bce609b98f5e0b70d9e041a250df)
    
    Co-authored-by: Gian Merlino <[email protected]>
---
 .../server/coordination/SegmentDropLatch.java      | 89 ++++++++++++++++++++++
 .../coordination/SegmentLoadDropHandler.java       | 57 +++++++-------
 2 files changed, 118 insertions(+), 28 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentDropLatch.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentDropLatch.java
new file mode 100644
index 00000000000..286593f1f03
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentDropLatch.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordination;
+
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+
+/**
+ * Latch held by {@link SegmentLoadDropHandler#segmentDropLatches} when a drop 
is scheduled or actively happening.
+ */
+public class SegmentDropLatch
+{
+  enum State
+  {
+    PENDING,
+    DROPPING,
+    DONE
+  }
+
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
+  private State state = State.PENDING;
+
+  /**
+   * Sets this latch to {@link State#DROPPING} state, if it was in {@link 
State#PENDING}.
+   *
+   * @return whether the original state was {@link State#PENDING}
+   */
+  public boolean startDropping()
+  {
+    synchronized (lock) {
+      if (state == State.PENDING) {
+        state = State.DROPPING;
+        lock.notifyAll();
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  /**
+   * Sets this latch to {@link State#DONE} state, if it was in {@link 
State#DROPPING}. Otherwise does nothing.
+   */
+  public void doneDropping()
+  {
+    synchronized (lock) {
+      if (state == State.DROPPING) {
+        state = State.DONE;
+        lock.notifyAll();
+      }
+    }
+  }
+
+  /**
+   * Cancels this latch if the drop has not yet begun to execute. Otherwise, 
waits for the drop to finish.
+   * Once this method returns, the drop is definitely no longer scheduled (it 
has either been canceled, or has
+   * already happened).
+   */
+  public void cancelOrAwait() throws InterruptedException
+  {
+    synchronized (lock) {
+      if (state == State.PENDING) {
+        state = State.DONE;
+      } else {
+        while (state != State.DONE) {
+          lock.wait();
+        }
+      }
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 875fad0a823..6d8faf593cb 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -45,7 +45,7 @@ import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -62,16 +62,16 @@ public class SegmentLoadDropHandler
 {
   private static final EmittingLogger log = new 
EmittingLogger(SegmentLoadDropHandler.class);
 
-  // Synchronizes removals from segmentsToDelete
-  private final Object segmentDeleteLock = new Object();
-
   private final SegmentLoaderConfig config;
   private final DataSegmentAnnouncer announcer;
   private final SegmentManager segmentManager;
   private final ScheduledExecutorService normalLoadExec;
   private final ThreadPoolExecutor turboLoadExec;
 
-  private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
+  /**
+   * Holder of latches for segments that have drops scheduled, or drops 
currently being executed.
+   */
+  private final ConcurrentHashMap<DataSegment, SegmentDropLatch> 
segmentDropLatches;
 
   // Keep history of load/drop request status in a LRU cache to maintain 
idempotency if same request shows up
   // again and to return status of a completed request. Maximum size of this 
cache must be significantly greater
@@ -129,7 +129,7 @@ public class SegmentLoadDropHandler
     // Allow core threads to time out to save resources when not in turbo mode
     this.turboLoadExec.allowCoreThreadTimeOut(true);
 
-    this.segmentsToDelete = new ConcurrentSkipListSet<>();
+    this.segmentDropLatches = new ConcurrentHashMap<>();
     requestStatuses = 
CacheBuilder.newBuilder().maximumSize(config.getStatusQueueMaxSize()).initialCapacity(8).build();
   }
 
@@ -152,23 +152,15 @@ public class SegmentLoadDropHandler
     SegmentChangeStatus result = null;
     try {
       log.info("Loading segment[%s] in mode[%s]", segment.getId(), 
loadingMode);
-      /*
-         The lock below is used to prevent a race condition when the scheduled 
runnable in removeSegment() starts,
-         and if (segmentsToDelete.remove(segment)) returns true, in which case 
historical will start deleting segment
-         files. At that point, it's possible that right after the "if" check, 
addSegment() is called and actually loads
-         the segment, which makes dropping segment and downloading segment 
happen at the same time.
-       */
-      if (segmentsToDelete.contains(segment)) {
-        /*
-           Both contains(segment) and remove(segment) can be moved inside the 
synchronized block. However, in that case,
-           each time when addSegment() is called, it has to wait for the lock 
in order to make progress, which will make
-           things slow. Given that in most cases 
segmentsToDelete.contains(segment) returns false, it will save a lot of
-           cost of acquiring lock by doing the "contains" check outside the 
synchronized block.
-         */
-        synchronized (segmentDeleteLock) {
-          segmentsToDelete.remove(segment);
-        }
+
+      // Cancel any pending drops for this segment, or wait for them if they 
have already started executing. This is
+      // necessary to prevent delayed drops issued by removeSegment() from 
dropping a segment while we are trying to
+      // load it.
+      final SegmentDropLatch currentDropLatch = 
segmentDropLatches.remove(segment);
+      if (currentDropLatch != null) {
+        currentDropLatch.cancelOrAwait();
       }
+
       try {
         segmentManager.loadSegment(segment);
       }
@@ -216,14 +208,19 @@ public class SegmentLoadDropHandler
     SegmentChangeStatus result = null;
     try {
       announcer.unannounceSegment(segment);
-      segmentsToDelete.add(segment);
+
+      final SegmentDropLatch dropLatch = new SegmentDropLatch();
+      final SegmentDropLatch priorLatch = 
segmentDropLatches.putIfAbsent(segment, dropLatch);
+
+      if (priorLatch != null) {
+        log.warn("Cannot drop segment[%s] that already has a drop pending. 
Ignoring.", segment.getId());
+        return;
+      }
 
       Runnable runnable = () -> {
         try {
-          synchronized (segmentDeleteLock) {
-            if (segmentsToDelete.remove(segment)) {
-              segmentManager.dropSegment(segment);
-            }
+          if (dropLatch.startDropping()) {
+            segmentManager.dropSegment(segment);
           }
         }
         catch (Exception e) {
@@ -231,6 +228,10 @@ public class SegmentLoadDropHandler
              .addData("segment", segment)
              .emit();
         }
+        finally {
+          dropLatch.doneDropping();
+          segmentDropLatches.remove(segment, dropLatch);
+        }
       };
 
       if (scheduleDrop) {
@@ -265,7 +266,7 @@ public class SegmentLoadDropHandler
 
   public Collection<DataSegment> getSegmentsToDelete()
   {
-    return ImmutableList.copyOf(segmentsToDelete);
+    return ImmutableList.copyOf(segmentDropLatches.keySet());
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to