This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch 27.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/27.0.0 by this push:
     new 0d0497b39b Quick fix for SegmentLoadDropHandler bug (#14670) (#14694)
0d0497b39b is described below

commit 0d0497b39bf6c63815ad910a31f974c79ae6001a
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Jul 29 13:01:25 2023 +0530

    Quick fix for SegmentLoadDropHandler bug (#14670) (#14694)
---
 .../coordination/SegmentLoadDropHandler.java       | 17 ++++---
 .../coordination/SegmentLoadDropHandlerTest.java   | 57 +++++++++++++++-------
 2 files changed, 51 insertions(+), 23 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 2ba5dc9330..636894d214 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -773,7 +773,7 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
   }
 
   // Future with cancel() implementation to remove it from "waitingFutures" 
list
-  private static class CustomSettableFuture extends 
AbstractFuture<List<DataSegmentChangeRequestAndStatus>>
+  private class CustomSettableFuture extends 
AbstractFuture<List<DataSegmentChangeRequestAndStatus>>
   {
     private final LinkedHashSet<CustomSettableFuture> waitingFutures;
     private final Map<DataSegmentChangeRequest, AtomicReference<Status>> 
statusRefs;
@@ -789,15 +789,20 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
 
     public void resolve()
     {
-      synchronized (statusRefs) {
+      synchronized (requestStatusesLock) {
         if (isDone()) {
           return;
         }
 
-        List<DataSegmentChangeRequestAndStatus> result = new 
ArrayList<>(statusRefs.size());
-        statusRefs.forEach(
-            (request, statusRef) -> result.add(new 
DataSegmentChangeRequestAndStatus(request, statusRef.get()))
-        );
+        final List<DataSegmentChangeRequestAndStatus> result = new 
ArrayList<>(statusRefs.size());
+        statusRefs.forEach((request, statusRef) -> {
+          // Remove complete statuses from the cache
+          final Status status = statusRef.get();
+          if (status != null && status.getState() != Status.STATE.PENDING) {
+            requestStatuses.invalidate(request);
+          }
+          result.add(new DataSegmentChangeRequestAndStatus(request, status));
+        });
 
         set(result);
       }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index d6d4d2374d..c734d7b8f8 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -562,8 +562,12 @@ public class SegmentLoadDropHandlerTest
   public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws 
Exception
   {
     final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
-    Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()))
-           .thenReturn(true);
+    Mockito.doReturn(true).when(segmentManager).loadSegment(
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyBoolean(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+    );
     
Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any());
     final SegmentLoadDropHandler segmentLoadDropHandler = new 
SegmentLoadDropHandler(
         jsonMapper,
@@ -578,11 +582,11 @@ public class SegmentLoadDropHandlerTest
 
     segmentLoadDropHandler.start();
 
-    DataSegment segment1 = makeSegment("batchtest1", "1", 
Intervals.of("P1d/2011-04-01"));
+    final DataSegment segment1 = makeSegment("batchtest1", "1", 
Intervals.of("P1d/2011-04-01"));
 
     List<DataSegmentChangeRequest> batch = ImmutableList.of(new 
SegmentChangeRequestLoad(segment1));
 
-    // load the segment
+    // Request 1: Load the segment
     ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future = 
segmentLoadDropHandler
         .processBatch(batch);
     for (Runnable runnable : scheduledRunnable) {
@@ -592,7 +596,7 @@ public class SegmentLoadDropHandlerTest
     Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
     scheduledRunnable.clear();
 
-    // drop the segment
+    // Request 2: Drop the segment
     batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1));
     future = segmentLoadDropHandler.processBatch(batch);
     for (Runnable runnable : scheduledRunnable) {
@@ -603,23 +607,36 @@ public class SegmentLoadDropHandlerTest
     scheduledRunnable.clear();
 
     // check invocations after a load-drop sequence
-    Mockito.verify(segmentManager, 
Mockito.times(1)).loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
-    Mockito.verify(segmentManager, 
Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+    Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyBoolean(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+    );
+    Mockito.verify(segmentManager, Mockito.times(1))
+           .dropSegment(ArgumentMatchers.any());
 
-    // try to reload the segment - this should be a no-op since it might be 
the case that this is the first load client
-    // with this request, we'll forget about the success of the load request
+    // Request 3: Reload the segment
     batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
     future = segmentLoadDropHandler.processBatch(batch);
-    Assert.assertEquals(scheduledRunnable.size(), 0);
+    for (Runnable runnable : scheduledRunnable) {
+      runnable.run();
+    }
     result = future.get();
     Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+    scheduledRunnable.clear();
 
-    // check invocations - should stay the same
-    Mockito.verify(segmentManager, 
Mockito.times(1)).loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
-    Mockito.verify(segmentManager, 
Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+    // check invocations - 1 more load has happened
+    Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyBoolean(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+    );
+    Mockito.verify(segmentManager, Mockito.times(1))
+           .dropSegment(ArgumentMatchers.any());
 
-    // try to reload the segment - this time the loader will know that is a 
fresh request to load
-    // so, the segment manager will be asked to load
+    // Request 4: Try to reload the segment - segment is loaded again
     batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
     future = segmentLoadDropHandler.processBatch(batch);
     for (Runnable runnable : scheduledRunnable) {
@@ -630,8 +647,14 @@ public class SegmentLoadDropHandlerTest
     scheduledRunnable.clear();
 
     // check invocations - the load segment counter should bump up
-    Mockito.verify(segmentManager, 
Mockito.times(2)).loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
-    Mockito.verify(segmentManager, 
Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+    Mockito.verify(segmentManager, Mockito.times(3)).loadSegment(
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyBoolean(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+    );
+    Mockito.verify(segmentManager, Mockito.times(1))
+           .dropSegment(ArgumentMatchers.any());
 
     segmentLoadDropHandler.stop();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to