This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c4fa3cc Fix load-drop-load sequence for same segment and historical
in http loadqueue peon (#11717)
c4fa3cc is described below
commit c4fa3ccfc4a8df0da3968b817dbb057e4ada79fc
Author: Rohan Garg <[email protected]>
AuthorDate: Mon Jan 31 13:16:58 2022 +0530
Fix load-drop-load sequence for same segment and historical in http
loadqueue peon (#11717)
Fixes an issue where a load-drop-load sequence for a segment and historical
doesn't work correctly for http based load queue peon. The first cycle of
load-drop works fine - the problem comes when there is an attempt to reload the
segment. The historical caches load success for some recent segments and makes
the reload as a no-op. But it doesn't consider that fact that the segment was
also dropped in between the load requests.
This change invalidates the cache after a client tries to fetch a success
result.
---
.../coordination/SegmentLoadDropHandler.java | 5 +
.../coordination/SegmentLoadDropHandlerTest.java | 121 +++++++++++++++++++--
2 files changed, 118 insertions(+), 8 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index 53013d1..e723d6d 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -555,6 +555,11 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
},
this::resolveWaitingFutures
);
+ } else if (status.get().getState() == Status.STATE.SUCCESS) {
+ // SUCCESS case, we'll clear up the cached success while serving it to
this client
+ // Not doing this can lead to an incorrect response to upcoming
clients for a reload
+ requestStatuses.invalidate(changeRequest);
+ return status;
}
return requestStatuses.getIfPresent(changeRequest);
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index 9e41197..d03bc52 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -87,6 +87,7 @@ public class SegmentLoadDropHandlerTest
private SegmentManager segmentManager;
private List<Runnable> scheduledRunnable;
private SegmentLoaderConfig segmentLoaderConfig;
+ private SegmentLoaderConfig noAnnouncerSegmentLoaderConfig;
private SegmentLoaderConfig segmentLoaderConfigNoLocations;
private ScheduledExecutorFactory scheduledExecutorFactory;
private List<StorageLocationConfig> locations;
@@ -194,6 +195,39 @@ public class SegmentLoadDropHandlerTest
}
};
+ noAnnouncerSegmentLoaderConfig = new SegmentLoaderConfig()
+ {
+ @Override
+ public File getInfoDir()
+ {
+ return testStorageLocation.getInfoDir();
+ }
+
+ @Override
+ public int getNumLoadingThreads()
+ {
+ return 5;
+ }
+
+ @Override
+ public int getAnnounceIntervalMillis()
+ {
+ return 0;
+ }
+
+ @Override
+ public List<StorageLocationConfig> getLocations()
+ {
+ return locations;
+ }
+
+ @Override
+ public int getDropSegmentDelayMillis()
+ {
+ return 0;
+ }
+ };
+
segmentLoaderConfigNoLocations = new SegmentLoaderConfig()
{
@Override
@@ -475,15 +509,8 @@ public class SegmentLoadDropHandlerTest
runnable.run();
}
- result = segmentLoadDropHandler.processBatch(batch).get();
+ result = segmentLoadDropHandler.processBatch(ImmutableList.of(new
SegmentChangeRequestLoad(segment1))).get();
Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS,
result.get(0).getStatus());
- Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS,
result.get(1).getStatus());
-
-
- for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus e :
segmentLoadDropHandler.processBatch(batch)
-
.get()) {
- Assert.assertEquals(SegmentLoadDropHandler.Status.SUCCESS,
e.getStatus());
- }
segmentLoadDropHandler.stop();
}
@@ -530,4 +557,82 @@ public class SegmentLoadDropHandlerTest
segmentLoadDropHandler.stop();
}
+
+ @Test(timeout = 60_000L)
+ public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws
Exception
+ {
+ final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
+ Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any()))
+ .thenReturn(true);
+
Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any());
+ final SegmentLoadDropHandler segmentLoadDropHandler = new
SegmentLoadDropHandler(
+ jsonMapper,
+ noAnnouncerSegmentLoaderConfig,
+ announcer,
+ Mockito.mock(DataSegmentServerAnnouncer.class),
+ segmentManager,
+ segmentCacheManager,
+ scheduledExecutorFactory.create(5, "SegmentLoadDropHandlerTest-[%d]"),
+ new ServerTypeConfig(ServerType.HISTORICAL)
+ );
+
+ segmentLoadDropHandler.start();
+
+ DataSegment segment1 = makeSegment("batchtest1", "1",
Intervals.of("P1d/2011-04-01"));
+
+ List<DataSegmentChangeRequest> batch = ImmutableList.of(new
SegmentChangeRequestLoad(segment1));
+
+ // load the segment
+ ListenableFuture<List<DataSegmentChangeRequestAndStatus>> future =
segmentLoadDropHandler
+ .processBatch(batch);
+ for (Runnable runnable : scheduledRunnable) {
+ runnable.run();
+ }
+ List<DataSegmentChangeRequestAndStatus> result = future.get();
+ Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+ scheduledRunnable.clear();
+
+ // drop the segment
+ batch = ImmutableList.of(new SegmentChangeRequestDrop(segment1));
+ future = segmentLoadDropHandler.processBatch(batch);
+ for (Runnable runnable : scheduledRunnable) {
+ runnable.run();
+ }
+ result = future.get();
+ Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+ scheduledRunnable.clear();
+
+ // check invocations after a load-drop sequence
+ Mockito.verify(segmentManager,
Mockito.times(1)).loadSegment(ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
+ Mockito.verify(segmentManager,
Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+
+ // try to reload the segment - this should be a no-op since it might be
the case that this is the first load client
+ // with this request, we'll forget about the success of the load request
+ batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
+ future = segmentLoadDropHandler.processBatch(batch);
+ Assert.assertEquals(scheduledRunnable.size(), 0);
+ result = future.get();
+ Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+
+ // check invocations - should stay the same
+ Mockito.verify(segmentManager,
Mockito.times(1)).loadSegment(ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
+ Mockito.verify(segmentManager,
Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+
+ // try to reload the segment - this time the loader will know that is a
fresh request to load
+ // so, the segment manager will be asked to load
+ batch = ImmutableList.of(new SegmentChangeRequestLoad(segment1));
+ future = segmentLoadDropHandler.processBatch(batch);
+ for (Runnable runnable : scheduledRunnable) {
+ runnable.run();
+ }
+ result = future.get();
+ Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
+ scheduledRunnable.clear();
+
+ // check invocations - the load segment counter should bump up
+ Mockito.verify(segmentManager,
Mockito.times(2)).loadSegment(ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
+ Mockito.verify(segmentManager,
Mockito.times(1)).dropSegment(ArgumentMatchers.any());
+
+ segmentLoadDropHandler.stop();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]