ankitsultana commented on code in PR #10289:
URL: https://github.com/apache/pinot/pull/10289#discussion_r1111564713
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java:
##########
@@ -39,168 +47,208 @@
* but will only schedule them when there is work to be done. The availability
* of work is signaled using the {@link #onDataAvailable(MailboxIdentifier)}
* callback.
+ * <p>
+ * Design: There are 3 states for a OpChain:
+ *
+ * 1. Ready: This state means that an OpChain is ready to be run. If an
OpChain is in this state, then it would be in
+ * the _ready queue. The _ready queue is polled on every call to
{@link #next}.
+ * 2. Available/Yielded: This is when the OpChain was run and it returned a
no-op block, indicating that it has no
+ * new data to process. The {@link OpChainId} for
these OpChains are stored in the _available
+ * map, where the value of the map is the
release-timeout.
+ * 3. Running: When the OpChain is returned by the {@link #next} method, it
is considered to be in the Running state.
+ *
+ * The following state transitions are possible:
+ * 1. Ready ==> Running: This happens when the OpChain is returned by the
{@link #next} method.
+ * 2. Running ==> Available/Yielded: This happens when a running OpChain
returns a No-op block, following which a
+ * yield is called and there's no entry
for the OpChain in _seenMail.
+ * 3. Available/Yielded ==> Ready: This can happen in two cases: (1) When
yield is called but _seenMail has an
+ * entry for the corresponding OpChainId,
which means there was some data received
+ * by MailboxReceiveOperator after the last
poll. (2) When a sender has died or hasn't
+ * sent any data in the last
_releaseTimeoutMs milliseconds.
+ *
+ * The OpChain is considered "alive" from the time it is registered until it
is de-registered. Any reference to the
+ * OpChain or its related metadata is kept only while the OpChain is alive.
The {@link #onDataAvailable} callback
+ * can be called before an OpChain was ever registered. In that case, this
scheduler will simply ignore the callback,
+ * since once the OpChain gets registered it will anyways be put into the
_ready queue immediately. In case the
+ * OpChain never gets registered (e.g. if the broker couldn't dispatch it),
as long as the sender cleans up all
+ * resources that it has acquired, there will be no leak, since the
scheduler doesn't hold any references for
+ * non-alive OpChains.
+ * </p>
*/
-@NotThreadSafe
+@ThreadSafe
public class RoundRobinScheduler implements OpChainScheduler {
private static final Logger LOGGER =
LoggerFactory.getLogger(RoundRobinScheduler.class);
- private static final long DEFAULT_RELEASE_TIMEOUT =
TimeUnit.MINUTES.toMillis(1);
+ private static final String AVAILABLE_RELEASE_THREAD_NAME =
"round-robin-scheduler-release-thread";
- private final long _releaseTimeout;
+ private final long _releaseTimeoutMs;
private final Supplier<Long> _ticker;
- // the _available queue contains operator chains that are available
- // to this scheduler but do not have any data available to schedule
- // while the _ready queue contains the operator chains that are ready
- // to be scheduled (have data, or are first-time scheduled)
- private final Queue<AvailableEntry> _available = new LinkedList<>();
- private final Queue<OpChain> _ready = new LinkedList<>();
-
- private boolean _isShutDown = false;
-
- // using a Set here is acceptable because calling hasNext() and
- // onDataAvailable() cannot occur concurrently - that means that
- // anytime we schedule a new operator based on the presence of
- // mail we can be certain that it will consume all of the mail
- // form that mailbox, even if there are multiple items in it. If,
- // during execution of that operator, more mail appears, then the
- // operator will be rescheduled immediately potentially resulting
- // in a false-positive schedule
- @VisibleForTesting
- final Set<MailboxIdentifier> _seenMail = new HashSet<>();
+ private final Map<OpChainId, OpChain> _aliveChains = new
ConcurrentHashMap<>();
+ final Set<OpChainId> _seenMail = Sets.newConcurrentHashSet();
+ private final Map<OpChainId, Long> _available = new ConcurrentHashMap<>();
- public RoundRobinScheduler() {
- this(DEFAULT_RELEASE_TIMEOUT);
- }
+ private final BlockingQueue<OpChain> _ready = new LinkedBlockingQueue<>();
+
+ private final Lock _lock = new ReentrantLock();
+ private final ScheduledExecutorService _availableOpChainReleaseService;
- public RoundRobinScheduler(long releaseTimeout) {
- this(releaseTimeout, System::currentTimeMillis);
+ public RoundRobinScheduler(long releaseTimeoutMs) {
+ this(releaseTimeoutMs, System::currentTimeMillis);
}
- public RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
- _releaseTimeout = releaseTimeoutMs;
+ RoundRobinScheduler(long releaseTimeoutMs, Supplier<Long> ticker) {
+ Preconditions.checkArgument(releaseTimeoutMs > 0, "Release timeout for
round-robin scheduler should be > 0ms");
+ _releaseTimeoutMs = releaseTimeoutMs;
_ticker = ticker;
+ _availableOpChainReleaseService =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r);
+ t.setName(AVAILABLE_RELEASE_THREAD_NAME);
+ t.setDaemon(true);
+ return t;
+ });
+ _availableOpChainReleaseService.scheduleAtFixedRate(() -> {
+ List<OpChainId> timedOutWaiting = new ArrayList<>();
+ for (Map.Entry<OpChainId, Long> entry : _available.entrySet()) {
+ if (Thread.interrupted()) {
+ LOGGER.warn("Thread={} interrupted. Scheduler may be shutting
down.", AVAILABLE_RELEASE_THREAD_NAME);
+ break;
+ }
+ if (_ticker.get() > entry.getValue()) {
+ timedOutWaiting.add(entry.getKey());
+ }
+ }
+ for (OpChainId opChainId : timedOutWaiting) {
+ _lock.lock();
+ try {
+ if (_available.containsKey(opChainId)) {
+ _available.remove(opChainId);
+ _ready.offer(_aliveChains.get(opChainId));
+ }
+ } finally {
+ _lock.unlock();
+ }
+ }
+ }, _releaseTimeoutMs, _releaseTimeoutMs, TimeUnit.MILLISECONDS);
}
@Override
- public void register(OpChain operatorChain, boolean isNew) {
- if (_isShutDown) {
- return;
- }
- // the first time an operator chain is scheduled, it should
- // immediately be considered ready in case it does not need
- // read from any mailbox (e.g. with a LiteralValueOperator)
- if (isNew) {
+ public void register(OpChain operatorChain) {
+ Preconditions.checkState(!_aliveChains.containsKey(operatorChain.getId()),
+ String.format("Tried to re-register op-chain: %s",
operatorChain.getId()));
+ _lock.lock();
+ try {
+ _aliveChains.put(operatorChain.getId(), operatorChain);
_ready.add(operatorChain);
- } else {
- long releaseTs = _releaseTimeout < 0 ? Long.MAX_VALUE : _ticker.get() +
_releaseTimeout;
- _available.add(new AvailableEntry(operatorChain, releaseTs));
+ } finally {
+ _lock.unlock();
}
trace("registered " + operatorChain);
}
@Override
- public void onDataAvailable(MailboxIdentifier mailbox) {
- // it may be possible to receive this callback when there's no
corresponding
- // operator chain registered to the mailbox - this can happen when either
- // (1) we get the callback before the first register is called or (2) we
get
- // the callback while the operator chain is executing. to account for this,
- // we just store it in a set of seen mail and only check for it when
hasNext
- // is called.
- //
- // note that scenario (2) may cause a false-positive schedule where an
operator
- // chain gets scheduled for mail that it had already processed, in which
case
- // the operator chain will simply do nothing and get put back onto the
queue.
- // scenario (2) may additionally cause a memory leak - if onDataAvailable
is
- // called with an EOS block _while_ the operator chain is executing, the
chain
- // will consume the EOS block and computeReady() will never remove the
mailbox
- // from the _seenMail set.
- //
- // TODO: fix the memory leak by adding a close(opChain) callback
- _seenMail.add(mailbox);
- trace("got mail for " + mailbox);
+ public void deregister(OpChain operatorChain) {
+ Preconditions.checkState(_aliveChains.containsKey(operatorChain.getId()),
+ "Tried to de-register an un-registered op-chain");
+ _lock.lock();
+ try {
+ OpChainId chainId = operatorChain.getId();
+ _aliveChains.remove(chainId);
+ // it could be that the onDataAvailable callback was called when the
OpChain was executing, in which case there
+ // could be a dangling entry in _seenMail.
+ _seenMail.remove(chainId);
+ } finally {
+ _lock.unlock();
+ }
}
@Override
- public boolean hasNext() {
- if (!_ready.isEmpty()) {
- return true;
+ public void yield(OpChain operatorChain) {
+ long releaseTs = _ticker.get() + _releaseTimeoutMs;
+ _lock.lock();
+ try {
+ // It could be that this OpChain received data before it could be
yielded completely. In that case, mark it ready
+ // to get it scheduled asap.
+ if (_seenMail.contains(operatorChain.getId())) {
+ _seenMail.remove(operatorChain.getId());
+ _ready.add(operatorChain);
+ return;
+ }
+ _available.put(operatorChain.getId(), releaseTs);
+ } finally {
+ _lock.unlock();
}
- computeReady();
- return !_ready.isEmpty();
}
@Override
- public OpChain next() {
- OpChain op = _ready.poll();
- trace("Polled " + op);
- return op;
+ public void onDataAvailable(MailboxIdentifier mailbox) {
+ // TODO: Should we add an API in MailboxIdentifier to get the requestId?
+ OpChainId opChainId = new
OpChainId(Long.parseLong(mailbox.getJobId().split("_")[0]),
Review Comment:
Yeah so I also wanted us to revisit `MailboxIdentifier::getJobId`. Right now
it is `requestId_stageid` but in case of multiple brokers there can be
duplicate requests with the same id in a server. Filed:
https://github.com/apache/pinot/issues/10305
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]