This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch 27.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/27.0.0 by this push:
new 0d0497b39b Quick fix for SegmentLoadDropHandler bug (#14670) (#14694)
0d0497b39b is described below
commit 0d0497b39bf6c63815ad910a31f974c79ae6001a
Author: Kashif Faraz <[email protected]>
AuthorDate: Sat Jul 29 13:01:25 2023 +0530
Quick fix for SegmentLoadDropHandler bug (#14670) (#14694)
---
.../coordination/SegmentLoadDropHandler.java | 17 ++++---
.../coordination/SegmentLoadDropHandlerTest.java | 57 +++++++++++++++-------
2 files changed, 51 insertions(+), 23 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 2ba5dc9330..636894d214 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -773,7 +773,7 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
}
// Future with cancel() implementation to remove it from "waitingFutures"
list
- private static class CustomSettableFuture extends
AbstractFuture<List<DataSegmentChangeRequestAndStatus>>
+ private class CustomSettableFuture extends
AbstractFuture<List<DataSegmentChangeRequestAndStatus>>
{
private final LinkedHashSet<CustomSettableFuture> waitingFutures;
private final Map<DataSegmentChangeRequest, AtomicReference<Status>>
statusRefs;
@@ -789,15 +789,20 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
public void resolve()
{
- synchronized (statusRefs) {
+ synchronized (requestStatusesLock) {
if (isDone()) {
return;
}
- List<DataSegmentChangeRequestAndStatus> result = new
ArrayList<>(statusRefs.size());
- statusRefs.forEach(
- (request, statusRef) -> result.add(new
DataSegmentChangeRequestAndStatus(request, statusRef.get()))
- );
+ final List<DataSegmentChangeRequestAndStatus> result = new
ArrayList<>(statusRefs.size());
+ statusRefs.forEach((request, statusRef) -> {
+ // Remove complete statuses from the cache
+ final Status status = statusRef.get();
+ if (status != null && status.getState() != Status.STATE.PENDING) {
+ requestStatuses.invalidate(request);
+ }
+ result.add(new DataSegmentChangeRequestAndStatus(request, status));
+ });
set(result);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index d6d4d2374d..c734d7b8f8 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -562,8 +562,12 @@ public class SegmentLoadDropHandlerTest
public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws
Exception
{
final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
- Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()))
- .thenReturn(true);
+ Mockito.doReturn(true).when(segmentManager).loadSegment(
+ ArgumentMatchers.any(),
+ ArgumentMatchers.anyBoolean(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ );
Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any());
final SegmentLoadDropHandler segmentLoadDropHandler = new
SegmentLoadDropHandler(
jsonMapper,
@@ -578,11 +582,11 @@ public class SegmentLoadDropHandlerTest
segmentLoadDropHandler.start();
- DataSegment segment1 = makeSegment("batchtest1", "1",
Intervals.of("P1d/2011-04-01"));
+ final DataSegment segment1 = makeSegment("batchtest1", "1",
Intervals.of("P1d/2011-04-01"));
List<DataSegmentChangeRequest> batch = ImmutableList.of(new
SegmentChangeRequestLoad(segment1));
- // load the segment
+ // Request 1: Load the segment
ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future =
segmentLoadDropHandler
.processBatch(batch);
for (Runnable runnable : scheduledRunnable) {
@@ -592,7 +596,7 @@ public class SegmentLoadDropHandlerTest
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
scheduledRunnable.clear();
- // drop the segment
+ // Request 2: Drop the segment
batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1));
future = segmentLoadDropHandler.processBatch(batch);
for (Runnable runnable : scheduledRunnable) {
@@ -603,23 +607,36 @@ public class SegmentLoadDropHandlerTest
scheduledRunnable.clear();
// check invocations after a load-drop sequence
- Mockito.verify(segmentManager,
Mockito.times(1)).loadSegment(ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
- Mockito.verify(segmentManager,
Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+ Mockito.verify(segmentManager, Mockito.times(1)).loadSegment(
+ ArgumentMatchers.any(),
+ ArgumentMatchers.anyBoolean(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ );
+ Mockito.verify(segmentManager, Mockito.times(1))
+ .dropSegment(ArgumentMatchers.any());
- // try to reload the segment - this should be a no-op since it might be
the case that this is the first load client
- // with this request, we'll forget about the success of the load request
+ // Request 3: Reload the segment
batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
future = segmentLoadDropHandler.processBatch(batch);
- Assert.assertEquals(scheduledRunnable.size(), 0);
+ for (Runnable runnable : scheduledRunnable) {
+ runnable.run();
+ }
result = future.get();
Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+ scheduledRunnable.clear();
- // check invocations - should stay the same
- Mockito.verify(segmentManager,
Mockito.times(1)).loadSegment(ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
- Mockito.verify(segmentManager,
Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+ // check invocations - 1 more load has happened
+ Mockito.verify(segmentManager, Mockito.times(2)).loadSegment(
+ ArgumentMatchers.any(),
+ ArgumentMatchers.anyBoolean(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ );
+ Mockito.verify(segmentManager, Mockito.times(1))
+ .dropSegment(ArgumentMatchers.any());
- // try to reload the segment - this time the loader will know that is a
fresh request to load
- // so, the segment manager will be asked to load
+ // Request 4: Try to reload the segment - segment is loaded again
batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
future = segmentLoadDropHandler.processBatch(batch);
for (Runnable runnable : scheduledRunnable) {
@@ -630,8 +647,14 @@ public class SegmentLoadDropHandlerTest
scheduledRunnable.clear();
// check invocations - the load segment counter should bump up
- Mockito.verify(segmentManager,
Mockito.times(2)).loadSegment(ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
- Mockito.verify(segmentManager,
Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+ Mockito.verify(segmentManager, Mockito.times(3)).loadSegment(
+ ArgumentMatchers.any(),
+ ArgumentMatchers.anyBoolean(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ );
+ Mockito.verify(segmentManager, Mockito.times(1))
+ .dropSegment(ArgumentMatchers.any());
segmentLoadDropHandler.stop();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]