gianm commented on code in PR #14616:
URL: https://github.com/apache/druid/pull/14616#discussion_r1270994288
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -272,21 +306,25 @@ private void loadSegment(DataSegment segment,
DataSegmentChangeCallback callback
*
* @throws SegmentLoadingException if it fails to load the given segment
*/
- private void loadSegment(DataSegment segment, DataSegmentChangeCallback
callback, boolean lazy, @Nullable
- ExecutorService loadSegmentIntoPageCacheExec)
- throws SegmentLoadingException
+ private void loadSegment(
+ DataSegment segment,
+ DataSegmentChangeCallback callback,
+ boolean lazy,
+ @Nullable ExecutorService loadSegmentIntoPageCacheExec
+ ) throws SegmentLoadingException
{
final boolean loaded;
try {
- loaded = segmentManager.loadSegment(segment,
- lazy,
- () -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP,
false),
- loadSegmentIntoPageCacheExec
+ loaded = segmentManager.loadSegment(
+ segment,
+ lazy,
+ () -> unannounceAndDropSegment(segment,
DataSegmentChangeCallback.NOOP),
Review Comment:
I take it all these changes from `removeSegment` to
`unannounceAndDropSegment` are so we do an immediate drop (rather than delayed
drop) in the case of failed loads? If so— that seems right to me. No reason to
delay the drop if we never loaded and announced it in the first place.
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -90,19 +93,46 @@ public class SegmentLoadDropHandler implements
DataSegmentChangeHandler
private final SegmentManager segmentManager;
private final ScheduledExecutorService exec;
private final ServerTypeConfig serverTypeConfig;
- private final ConcurrentSkipListSet<DataSegment> segmentsToDelete;
+ private final ConcurrentSkipListSet<DataSegment> segmentsToDrop;
private final SegmentCacheManager segmentCacheManager;
private volatile boolean started = false;
- // Keep history of load/drop request status in a LRU cache to maintain
idempotency if same request shows up
- // again and to return status of a completed request. Maximum size of this
cache must be significantly greater
- // than number of pending load/drop requests. so that history is not lost
too quickly.
- private final Cache<DataSegmentChangeRequest, AtomicReference<Status>>
requestStatuses;
+ /**
+ * Used to cache the status of a completed load or drop request until it has
+ * been served to the (Coordinator) client exactly once.
+ * <p>
+ * The cache is used as follows:
+ * <ol>
+ * <li>An entry with state PENDING is added to the cache upon receiving a
+ * request to load or drop a segment.</li>
+ * <li>A duplicate request received at this point is immediately answered
with PENDING.</li>
+ * <li>Once the load/drop finishes, the entry is updated to either SUCCESS
or FAILED.</li>
+ * <li>A duplicate request received at this point is immediately answered
with
+ * SUCCESS or FAILED and the entry is removed from the cache.</li>
+ * <li>If the first request itself finishes after the load or drop has
already
+ * completed, it is answered with a SUCCESS or FAILED and the entry is
removed
+ * from the cache.</li>
+ * </ol>
+ * <p>
+ * Maximum size of this cache must be significantly greater than the number
of
Review Comment:
What happens if it's not significantly greater?
I guess this could happen if someone sets
`druid.coordinator.loadqueuepeon.http.batchSize` on the Coordinator higher than
`druid.segmentCache.statusQueueMaxSize` on the Historical.
##########
server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java:
##########
@@ -272,21 +306,25 @@ private void loadSegment(DataSegment segment,
DataSegmentChangeCallback callback
*
* @throws SegmentLoadingException if it fails to load the given segment
*/
- private void loadSegment(DataSegment segment, DataSegmentChangeCallback
callback, boolean lazy, @Nullable
- ExecutorService loadSegmentIntoPageCacheExec)
- throws SegmentLoadingException
+ private void loadSegment(
+ DataSegment segment,
+ DataSegmentChangeCallback callback,
+ boolean lazy,
+ @Nullable ExecutorService loadSegmentIntoPageCacheExec
+ ) throws SegmentLoadingException
{
final boolean loaded;
try {
- loaded = segmentManager.loadSegment(segment,
- lazy,
- () -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP,
false),
- loadSegmentIntoPageCacheExec
+ loaded = segmentManager.loadSegment(
+ segment,
+ lazy,
+ () -> unannounceAndDropSegment(segment,
DataSegmentChangeCallback.NOOP),
Review Comment:
I take it all these changes from `removeSegment` to
`unannounceAndDropSegment` are so we do an immediate drop (rather than delayed
drop) in the case of failed loads? If so— that seems right to me. No reason to
delay the drop if we never loaded and announced it in the first place.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]