visit2rahul commented on code in PR #4577:
URL: https://github.com/apache/polaris/pull/4577#discussion_r3324458171
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -51,58 +53,139 @@ public class InMemoryBufferEventListener extends
PolarisPersistenceEventListener
@Inject MetaStoreManagerFactory metaStoreManagerFactory;
@Inject InMemoryBufferEventListenerConfiguration configuration;
+ /**
+ * Thrown by {@link EventProcessor#onNext} when the wrapped processor has
already completed (for
+ * example, it was evicted between the cache lookup and the {@code onNext}
call). {@link
+ * #processEvent} catches it and retries with a freshly loaded processor.
+ */
+ private static final class EvictedException extends IllegalStateException {}
Review Comment:
done
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -51,58 +53,139 @@ public class InMemoryBufferEventListener extends
PolarisPersistenceEventListener
@Inject MetaStoreManagerFactory metaStoreManagerFactory;
@Inject InMemoryBufferEventListenerConfiguration configuration;
+ /**
+ * Thrown by {@link EventProcessor#onNext} when the wrapped processor has
already completed (for
+ * example, it was evicted between the cache lookup and the {@code onNext}
call). {@link
+ * #processEvent} catches it and retries with a freshly loaded processor.
+ */
+ private static final class EvictedException extends IllegalStateException {}
+
+ /**
+ * Wraps a {@link UnicastProcessor} together with its own lock, so that
mutual exclusion between
+ * {@code onNext} (from {@link #processEvent}) and {@code onComplete} (from
eviction or shutdown)
+ * does not depend on smallrye-mutiny's internal {@code synchronized} on
{@code
+ * UnicastProcessor.onNext}.
+ */
+ final class EventProcessor {
+
+ private final UnicastProcessor<PolarisEvent> processor;
+ private final ReentrantLock lock = new ReentrantLock();
+ private boolean completed = false; // guarded by lock
+
+ EventProcessor(String realmId) {
+ processor = UnicastProcessor.create();
+ processor
+ .emitOn(Infrastructure.getDefaultWorkerPool())
+ .group()
+ .intoLists()
+ .of(configuration.maxBufferSize(), configuration.bufferTime())
+ .subscribe()
+ .with(events -> flush(realmId, events), error ->
onProcessorError(realmId, error));
+ }
+
+ void onNext(PolarisEvent event) {
+ lock.lock();
+ try {
+ if (completed) {
+ throw new EvictedException();
+ }
+ processor.onNext(event);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void onComplete() {
+ lock.lock();
+ try {
+ if (!completed) {
+ completed = true;
+ processor.onComplete();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ UnicastProcessor<PolarisEvent> processor() {
+ return processor;
+ }
+ }
+
@VisibleForTesting
- final LoadingCache<String, UnicastProcessor<PolarisEvent>> processors =
+ final LoadingCache<String, EventProcessor> processors =
Caffeine.newBuilder()
.expireAfterAccess(Duration.ofHours(1))
.evictionListener(
- (String realmId, UnicastProcessor<?> processor, RemovalCause
cause) ->
- completeSynchronized(processor))
+ (String realmId, EventProcessor processor, RemovalCause cause)
-> {
+ if (processor != null) {
+ processor.onComplete();
+ }
+ })
.build(this::createProcessor);
+ // Construct via a method (not EventProcessor::new) so that when the cache
lives on a CDI client
+ // proxy, the call is delegated to the contextual bean instance, whose
injected configuration is
+ // non-null. A direct inner-class instantiation would capture the proxy as
the enclosing instance
+ // and read its uninjected (null) configuration.
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]