kfaraz commented on code in PR #14468:
URL: https://github.com/apache/druid/pull/14468#discussion_r1247388656
##########
server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java:
##########
@@ -300,72 +292,59 @@ private void runSegmentCallbacks(
{
for (final Map.Entry<SegmentCallback, Executor> entry :
segmentCallbacks.entrySet()) {
entry.getValue().execute(
- new Runnable()
- {
- @Override
- public void run()
- {
- if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
- segmentCallbacks.remove(entry.getKey());
- if (segmentPredicates.remove(entry.getKey()) != null) {
- finalPredicate = Predicates.or(
- defaultFilter,
- Predicates.or(segmentPredicates.values())
- );
- }
+ () -> {
+ if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
+ segmentCallbacks.remove(entry.getKey());
+ if (segmentPredicates.remove(entry.getKey()) != null) {
+ updateFinalPredicate();
}
}
}
);
}
}
- private void runServerCallbacks(final DruidServer server)
+ private void runServerRemovedCallbacks(final DruidServer server)
{
for (final Map.Entry<ServerRemovedCallback, Executor> entry :
serverCallbacks.entrySet()) {
entry.getValue().execute(
- new Runnable()
- {
- @Override
- public void run()
- {
- if (CallbackAction.UNREGISTER ==
entry.getKey().serverRemoved(server)) {
- serverCallbacks.remove(entry.getKey());
- }
+ () -> {
+ if (CallbackAction.UNREGISTER ==
entry.getKey().serverRemoved(server)) {
+ serverCallbacks.remove(entry.getKey());
}
}
);
}
}
- //best effort wait for first segment listing fetch from all servers and then
call
- //segmentViewInitialized on all registered segment callbacks.
+ /**
+ * Waits until the sync wait timeout for all servers to be synced at least
once.
+ * Finally calls {@link SegmentCallback#segmentViewInitialized()} regardless
of
+ * whether all servers synced successfully or not.
+ */
private void serverInventoryInitialized()
{
- long start = System.currentTimeMillis();
- long serverSyncWaitTimeout = config.getServerTimeout() + 2 *
ChangeRequestHttpSyncer.HTTP_TIMEOUT_EXTRA_MS;
+ final Stopwatch stopwatch = Stopwatch.createUnstarted();
+ final Duration syncWaitTimeout = Duration.millis(
+ config.getServerTimeout() + 2 *
ChangeRequestHttpSyncer.MIN_READ_TIMEOUT_MILLIS
+ );
- List<DruidServerHolder> uninitializedServers = new ArrayList<>();
- for (DruidServerHolder server : servers.values()) {
- if (!server.isSyncedSuccessfullyAtleastOnce()) {
- uninitializedServers.add(server);
+ final List<DruidServerHolder> uninitializedServers = new
ArrayList<>(servers.values());
+ while (DateTimes.hasNotElapsed(syncWaitTimeout, stopwatch)) {
+ uninitializedServers.removeIf(
+ serverHolder -> serverHolder.isSyncedSuccessfullyAtleastOnce()
+ || serverHolder.isStopped()
+ );
+ if (uninitializedServers.isEmpty()) {
Review Comment:
The loop condition will be evaluated after the sleep of 5s. We don't want to
wait if the list is already empty.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]