walterddr commented on code in PR #10289:
URL: https://github.com/apache/pinot/pull/10289#discussion_r1110447502
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java:
##########
@@ -40,37 +44,23 @@
* of work is signaled using the {@link #onDataAvailable(MailboxIdentifier)}
* callback.
*/
-@NotThreadSafe
+@ThreadSafe
public class RoundRobinScheduler implements OpChainScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(RoundRobinScheduler.class);
- private static final long DEFAULT_RELEASE_TIMEOUT =
TimeUnit.MINUTES.toMillis(1);
+ private static final String AVAILABLE_RELEASE_THREAD_NAME =
"round-robin-scheduler-release-thread";
private final long _releaseTimeout;
private final Supplier<Long> _ticker;
- // the _available queue contains operator chains that are available
- // to this scheduler but do not have any data available to schedule
- // while the _ready queue contains the operator chains that are ready
- // to be scheduled (have data, or are first-time scheduled)
- private final Queue<AvailableEntry> _available = new LinkedList<>();
- private final Queue<OpChain> _ready = new LinkedList<>();
-
- private boolean _isShutDown = false;
-
- // using a Set here is acceptable because calling hasNext() and
- // onDataAvailable() cannot occur concurrently - that means that
- // anytime we schedule a new operator based on the presence of
- // mail we can be certain that it will consume all of the mail
- // form that mailbox, even if there are multiple items in it. If,
- // during execution of that operator, more mail appears, then the
- // operator will be rescheduled immediately potentially resulting
- // in a false-positive schedule
- @VisibleForTesting
- final Set<MailboxIdentifier> _seenMail = new HashSet<>();
-
- public RoundRobinScheduler() {
- this(DEFAULT_RELEASE_TIMEOUT);
- }
+ private final Map<OpChainId, OpChain> _aliveChains = new
ConcurrentHashMap<>();
+ private final Set<OpChainId> _runningChains = Sets.newConcurrentHashSet();
+ final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
+ private final Map<OpChainId, Long> _available = new ConcurrentHashMap<>();
Review Comment:
my understanding of these treatment of opChain ID
- activeChain === running + ready + available
- running doesn't need to be kept as a state b/c it is being executed at
the moment. and it is guaranteed to be put back in the opChainSchedulerService.
- so activeChain === ready + available
- seenMail is only there to move opChain quickly from available to ready
- they will be moved to ready by the periodic scheduled task anyway
- if onDataAvailable arrived and the opChain
- is in available, it gets moved to ready
- is not in available, seenMail can add an opChain
- when ready is poll, seenMail should clear that opChain
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]