flyrain commented on code in PR #4498:
URL: https://github.com/apache/polaris/pull/4498#discussion_r3277215570
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -56,8 +56,15 @@ public class InMemoryBufferEventListener extends
PolarisPersistenceEventListener
Caffeine.newBuilder()
.expireAfterAccess(Duration.ofHours(1))
.evictionListener(
- (String realmId, UnicastProcessor<?> processor, RemovalCause
cause) ->
- processor.onComplete())
+ (String realmId, UnicastProcessor<?> processor, RemovalCause
cause) -> {
+ // onComplete is not synchronized in smallrye-mutiny's
UnicastProcessor,
+ // unlike onNext. Acquire the processor's intrinsic monitor
(the same one
+ // onNext uses) so that eviction-driven onComplete and
concurrent
+ // processEvent-driven onNext mutually exclude.
+ synchronized (processor) {
Review Comment:
Comment is good. Worth one extra line: this assumes smallrye-mutiny's
`UnicastProcessor.onNext` locks on `this` — re-verify on mutiny version bumps,
otherwise this guard becomes a silent no-op.
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -68,7 +75,17 @@ protected void processEvent(String realmId, PolarisEvent
event) {
@PreDestroy
public void shutdown() {
- processors.asMap().values().forEach(UnicastProcessor::onComplete);
+ // Same rationale as the eviction listener: hold the processor's monitor so
+ // shutdown-driven onComplete cannot race a concurrent processEvent-driven
onNext.
+ processors
+ .asMap()
+ .values()
+ .forEach(
+ p -> {
+ synchronized (p) {
Review Comment:
Same `synchronized (p) { p.onComplete(); }` pattern as the eviction listener
— could pull into a tiny private helper (`completeSynchronized(processor)`) to
keep both call sites in lockstep.
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -56,8 +56,15 @@ public class InMemoryBufferEventListener extends
PolarisPersistenceEventListener
Caffeine.newBuilder()
.expireAfterAccess(Duration.ofHours(1))
.evictionListener(
- (String realmId, UnicastProcessor<?> processor, RemovalCause
cause) ->
- processor.onComplete())
+ (String realmId, UnicastProcessor<?> processor, RemovalCause
cause) -> {
+ // onComplete is not synchronized in smallrye-mutiny's
UnicastProcessor,
+ // unlike onNext. Acquire the processor's intrinsic monitor
(the same one
+ // onNext uses) so that eviction-driven onComplete and
concurrent
+ // processEvent-driven onNext mutually exclude.
+ synchronized (processor) {
Review Comment:
Thanks for fix, @visit2rahul ! Not a blocker, but is there a way to test it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]