kfaraz commented on code in PR #18255:
URL: https://github.com/apache/druid/pull/18255#discussion_r2209760430
##########
server/src/test/java/org/apache/druid/server/metrics/LatchableEmitter.java:
##########
@@ -158,48 +144,64 @@ public void waitForEventAggregate(
);
}
- private void triggerConditionEvaluations()
+ private void triggerConditionEvaluations(Event event)
{
if (conditionEvaluateExecutor == null) {
throw new ISE("Cannot evaluate conditions as the
'conditionEvaluateExecutor' is null.");
} else {
- conditionEvaluateExecutor.submit(this::evaluateWaitConditions);
+ conditionEvaluateExecutor.submit(() -> evaluateWaitConditions(event));
}
}
/**
* Evaluates wait conditions. This method must be invoked on the
* {@link #conditionEvaluateExecutor} so that it does not block {@link
#emit(Event)}.
*/
- private void evaluateWaitConditions()
+ private void evaluateWaitConditions(Event event)
{
- eventReadWriteLock.readLock().lock();
+ eventProcessingLock.lock();
try {
// Create a copy of the conditions for thread-safety
final List<WaitCondition> conditionsToEvaluate =
List.copyOf(waitConditions);
if (conditionsToEvaluate.isEmpty()) {
return;
}
- List<Event> events = getEvents();
for (WaitCondition condition : conditionsToEvaluate) {
- final int currentNumberOfEvents = events.size();
-
- // Do not use an iterator over the list to avoid concurrent
modification exceptions
- // Evaluate new events against this condition
- for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i)
{
- if (condition.predicate.test(events.get(i))) {
- condition.countDownLatch.countDown();
- }
+ if (condition.predicate.test(event)) {
+ condition.countDownLatch.countDown();
}
- condition.processedUntil = currentNumberOfEvents;
}
}
catch (Exception e) {
log.error(e, "Error while evaluating wait conditions");
Review Comment:
I am still not sure if we can throw an Exception here though.
The contract of `Emitter.emit()` says that it shouldn't throw an exception
and only log warnings/errors.
An exception thrown here would still not fail the test directly as the
`emit()` call might be happening on some service thread that just swallows up
the exception.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]