visit2rahul commented on code in PR #4577:
URL: https://github.com/apache/polaris/pull/4577#discussion_r3324458171


##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -51,58 +53,139 @@ public class InMemoryBufferEventListener extends 
PolarisPersistenceEventListener
   @Inject MetaStoreManagerFactory metaStoreManagerFactory;
   @Inject InMemoryBufferEventListenerConfiguration configuration;
 
+  /**
+   * Thrown by {@link EventProcessor#onNext} when the wrapped processor has 
already completed (for
+   * example, it was evicted between the cache lookup and the {@code onNext} 
call). {@link
+   * #processEvent} catches it and retries with a freshly loaded processor.
+   */
+  private static final class EvictedException extends IllegalStateException {}

Review Comment:
   done 



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -51,58 +53,139 @@ public class InMemoryBufferEventListener extends 
PolarisPersistenceEventListener
   @Inject MetaStoreManagerFactory metaStoreManagerFactory;
   @Inject InMemoryBufferEventListenerConfiguration configuration;
 
+  /**
+   * Thrown by {@link EventProcessor#onNext} when the wrapped processor has 
already completed (for
+   * example, it was evicted between the cache lookup and the {@code onNext} 
call). {@link
+   * #processEvent} catches it and retries with a freshly loaded processor.
+   */
+  private static final class EvictedException extends IllegalStateException {}
+
+  /**
+   * Wraps a {@link UnicastProcessor} together with its own lock, so that 
mutual exclusion between
+   * {@code onNext} (from {@link #processEvent}) and {@code onComplete} (from 
eviction or shutdown)
+   * does not depend on smallrye-mutiny's internal {@code synchronized} on 
{@code
+   * UnicastProcessor.onNext}.
+   */
+  final class EventProcessor {
+
+    private final UnicastProcessor<PolarisEvent> processor;
+    private final ReentrantLock lock = new ReentrantLock();
+    private boolean completed = false; // guarded by lock
+
+    EventProcessor(String realmId) {
+      processor = UnicastProcessor.create();
+      processor
+          .emitOn(Infrastructure.getDefaultWorkerPool())
+          .group()
+          .intoLists()
+          .of(configuration.maxBufferSize(), configuration.bufferTime())
+          .subscribe()
+          .with(events -> flush(realmId, events), error -> 
onProcessorError(realmId, error));
+    }
+
+    void onNext(PolarisEvent event) {
+      lock.lock();
+      try {
+        if (completed) {
+          throw new EvictedException();
+        }
+        processor.onNext(event);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    void onComplete() {
+      lock.lock();
+      try {
+        if (!completed) {
+          completed = true;
+          processor.onComplete();
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @VisibleForTesting
+    UnicastProcessor<PolarisEvent> processor() {
+      return processor;
+    }
+  }
+
   @VisibleForTesting
-  final LoadingCache<String, UnicastProcessor<PolarisEvent>> processors =
+  final LoadingCache<String, EventProcessor> processors =
       Caffeine.newBuilder()
           .expireAfterAccess(Duration.ofHours(1))
           .evictionListener(
-              (String realmId, UnicastProcessor<?> processor, RemovalCause 
cause) ->
-                  completeSynchronized(processor))
+              (String realmId, EventProcessor processor, RemovalCause cause) 
-> {
+                if (processor != null) {
+                  processor.onComplete();
+                }
+              })
           .build(this::createProcessor);
 
+  // Construct via a method (not EventProcessor::new) so that when the cache 
lives on a CDI client
+  // proxy, the call is delegated to the contextual bean instance, whose 
injected configuration is
+  // non-null. A direct inner-class instantiation would capture the proxy as 
the enclosing instance
+  // and read its uninjected (null) configuration.

Review Comment:
   done 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to