Copilot commented on code in PR #4677:
URL: https://github.com/apache/polaris/pull/4677#discussion_r3383953016
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -126,4 +113,14 @@ protected void onProcessorError(String realmId, Throwable
error) {
error);
processors.invalidate(realmId);
}
+
+ @VisibleForTesting
+ UnicastProcessor<PolarisEvent> processor(String realmId) {
+ return processors.get(realmId);
+ }
Review Comment:
`processor(String realmId)` currently uses `processors.get(realmId)`, which
will create and cache a new processor if none exists. Since this helper is only
for tests/inspection (and replaces prior direct-map access), it’s safer to
return the current cached instance without side effects.
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -51,13 +51,16 @@ public class InMemoryBufferEventListener extends
PolarisPersistenceEventListener
@Inject MetaStoreManagerFactory metaStoreManagerFactory;
@Inject InMemoryBufferEventListenerConfiguration configuration;
- @VisibleForTesting
- final LoadingCache<String, UnicastProcessor<PolarisEvent>> processors =
+ private final LoadingCache<String, UnicastProcessor<PolarisEvent>>
processors =
Caffeine.newBuilder()
.expireAfterAccess(Duration.ofHours(1))
- .evictionListener(
- (String realmId, UnicastProcessor<?> processor, RemovalCause
cause) ->
- completeSynchronized(processor))
+ .executor(Runnable::run)
+ .removalListener(
+ (String realmId, UnicastProcessor<?> processor, RemovalCause
cause) -> {
+ synchronized (processor) {
+ processor.onComplete();
+ }
+ })
Review Comment:
The Caffeine `RemovalListener` contract allows `value` to be null (and other
caches in this repo guard against it). As written, `synchronized (processor)`
will throw an NPE if a null value is ever observed. Also, the synchronization
around `onComplete()` is non-obvious—adding a short comment helps preserve the
concurrency rationale that used to be documented here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]