adutra commented on code in PR #4577:
URL: https://github.com/apache/polaris/pull/4577#discussion_r3322988335


##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -51,58 +53,139 @@ public class InMemoryBufferEventListener extends 
PolarisPersistenceEventListener
   @Inject MetaStoreManagerFactory metaStoreManagerFactory;
   @Inject InMemoryBufferEventListenerConfiguration configuration;
 
+  /**
+   * Thrown by {@link EventProcessor#onNext} when the wrapped processor has 
already completed (for
+   * example, it was evicted between the cache lookup and the {@code onNext} 
call). {@link
+   * #processEvent} catches it and retries with a freshly loaded processor.
+   */
+  private static final class EvictedException extends IllegalStateException {}

Review Comment:
   nit: after looking at this code again, I think `CompletedException` makes 
more sense.



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -51,58 +53,139 @@ public class InMemoryBufferEventListener extends 
PolarisPersistenceEventListener
   @Inject MetaStoreManagerFactory metaStoreManagerFactory;
   @Inject InMemoryBufferEventListenerConfiguration configuration;
 
+  /**
+   * Thrown by {@link EventProcessor#onNext} when the wrapped processor has 
already completed (for
+   * example, it was evicted between the cache lookup and the {@code onNext} 
call). {@link
+   * #processEvent} catches it and retries with a freshly loaded processor.
+   */
+  private static final class EvictedException extends IllegalStateException {}
+
+  /**
+   * Wraps a {@link UnicastProcessor} together with its own lock, so that 
mutual exclusion between
+   * {@code onNext} (from {@link #processEvent}) and {@code onComplete} (from 
eviction or shutdown)
+   * does not depend on smallrye-mutiny's internal {@code synchronized} on 
{@code
+   * UnicastProcessor.onNext}.
+   */
+  final class EventProcessor {
+
+    private final UnicastProcessor<PolarisEvent> processor;
+    private final ReentrantLock lock = new ReentrantLock();
+    private boolean completed = false; // guarded by lock
+
+    EventProcessor(String realmId) {
+      processor = UnicastProcessor.create();
+      processor
+          .emitOn(Infrastructure.getDefaultWorkerPool())
+          .group()
+          .intoLists()
+          .of(configuration.maxBufferSize(), configuration.bufferTime())
+          .subscribe()
+          .with(events -> flush(realmId, events), error -> 
onProcessorError(realmId, error));
+    }
+
+    void onNext(PolarisEvent event) {
+      lock.lock();
+      try {
+        if (completed) {
+          throw new EvictedException();
+        }
+        processor.onNext(event);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    void onComplete() {
+      lock.lock();
+      try {
+        if (!completed) {
+          completed = true;
+          processor.onComplete();
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @VisibleForTesting
+    UnicastProcessor<PolarisEvent> processor() {
+      return processor;
+    }
+  }
+
   @VisibleForTesting
-  final LoadingCache<String, UnicastProcessor<PolarisEvent>> processors =
+  final LoadingCache<String, EventProcessor> processors =
       Caffeine.newBuilder()
           .expireAfterAccess(Duration.ofHours(1))
           .evictionListener(
-              (String realmId, UnicastProcessor<?> processor, RemovalCause 
cause) ->
-                  completeSynchronized(processor))
+              (String realmId, EventProcessor processor, RemovalCause cause) 
-> {
+                if (processor != null) {
+                  processor.onComplete();
+                }
+              })
           .build(this::createProcessor);
 
+  // Construct via a method (not EventProcessor::new) so that when the cache 
lives on a CDI client
+  // proxy, the call is delegated to the contextual bean instance, whose 
injected configuration is
+  // non-null. A direct inner-class instantiation would capture the proxy as 
the enclosing instance
+  // and read its uninjected (null) configuration.
+  protected EventProcessor createProcessor(String realmId) {
+    return new EventProcessor(realmId);
+  }
+
+  private final ReentrantReadWriteLock shutdownLock = new 
ReentrantReadWriteLock();
+  private boolean shutdown = false; // guarded by shutdownLock
+
   @Override
   protected void processEvent(String realmId, PolarisEvent event) {
-    var processor = Objects.requireNonNull(processors.get(realmId));
-    // UnicastProcessor.onNext() is internally synchronized (smallrye-mutiny
-    // UnicastProcessor declares onNext as `public synchronized void`), so 
concurrent
-    // processEvent() calls for the same realm serialize on the processor's 
intrinsic
-    // lock without an external guard.
-    processor.onNext(event);
+    shutdownLock.readLock().lock();
+    try {
+      if (shutdown) {
+        return;
+      }
+      while (true) {
+        var processor = Objects.requireNonNull(processors.get(realmId));
+        try {
+          processor.onNext(event);
+          return;
+        } catch (EvictedException ignored) {
+          // processor was evicted between the cache lookup and onNext; retry 
with a fresh one
+        }
+      }
+    } finally {
+      shutdownLock.readLock().unlock();
+    }
   }
 
   @PreDestroy
   public void shutdown() {
-    
processors.asMap().values().forEach(InMemoryBufferEventListener::completeSynchronized);
-    processors.invalidateAll(); // doesn't call the eviction listener
+    shutdownLock.writeLock().lock();
+    try {
+      shutdown = true;
+      processors.asMap().values().forEach(EventProcessor::onComplete);
+      processors.invalidateAll(); // doesn't call the eviction listener
+    } finally {
+      shutdownLock.writeLock().unlock();
+    }
   }
 
   /**
-   * Calls {@link UnicastProcessor#onComplete()} while holding the processor's 
intrinsic monitor.
-   *
-   * <p>smallrye-mutiny's {@code UnicastProcessor.onNext} is method-{@code 
synchronized} on the
-   * processor instance; {@code onComplete} is not. Acquiring the same 
intrinsic monitor here
-   * restores symmetric mutual exclusion between concurrent {@code onNext} 
(from {@code
-   * processEvent}) and {@code onComplete} (from eviction or shutdown). 
Wrapping the pattern as an
-   * invariant keeps the synchronization requirement structurally visible to 
future maintainers.
+   * Re-enables the listener after a {@link #shutdown()}. {@code shutdown} is 
terminal in production
+   * (it runs on {@code @PreDestroy}), but tests reuse the same {@code 
@ApplicationScoped} bean
+   * across methods and call {@link #shutdown()} between them, so they need a 
way to clear the flag
+   * and start fresh.
    */
-  private static void completeSynchronized(UnicastProcessor<?> processor) {
-    synchronized (processor) {
-      processor.onComplete();
+  @VisibleForTesting
+  void reset() {

Review Comment:
   I prefer when `@VisibleForTesting` is used to broaden the visibility of a 
method that would otherwise have been private. But here, we are introducing a 
method that has no production usage.
   
   Instead, there is a better approach: in 
`InMemoryBufferEventListenerTestBase`, declare the producer as 
`Instance<InMemoryBufferEventListener>`, then call `Instance.destroy()` in 
`clearEvents()`:
   
   ```java
   @Inject
   @Identifier("persistence-in-memory-buffer")
   Instance<InMemoryBufferEventListener> producer;
   
   @AfterEach
   void clearEvents() throws Exception {
     producer.destroy(producer.get());
     // ...
   }
   ```



##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -51,58 +53,139 @@ public class InMemoryBufferEventListener extends 
PolarisPersistenceEventListener
   @Inject MetaStoreManagerFactory metaStoreManagerFactory;
   @Inject InMemoryBufferEventListenerConfiguration configuration;
 
+  /**
+   * Thrown by {@link EventProcessor#onNext} when the wrapped processor has 
already completed (for
+   * example, it was evicted between the cache lookup and the {@code onNext} 
call). {@link
+   * #processEvent} catches it and retries with a freshly loaded processor.
+   */
+  private static final class EvictedException extends IllegalStateException {}
+
+  /**
+   * Wraps a {@link UnicastProcessor} together with its own lock, so that 
mutual exclusion between
+   * {@code onNext} (from {@link #processEvent}) and {@code onComplete} (from 
eviction or shutdown)
+   * does not depend on smallrye-mutiny's internal {@code synchronized} on 
{@code
+   * UnicastProcessor.onNext}.
+   */
+  final class EventProcessor {
+
+    private final UnicastProcessor<PolarisEvent> processor;
+    private final ReentrantLock lock = new ReentrantLock();
+    private boolean completed = false; // guarded by lock
+
+    EventProcessor(String realmId) {
+      processor = UnicastProcessor.create();
+      processor
+          .emitOn(Infrastructure.getDefaultWorkerPool())
+          .group()
+          .intoLists()
+          .of(configuration.maxBufferSize(), configuration.bufferTime())
+          .subscribe()
+          .with(events -> flush(realmId, events), error -> 
onProcessorError(realmId, error));
+    }
+
+    void onNext(PolarisEvent event) {
+      lock.lock();
+      try {
+        if (completed) {
+          throw new EvictedException();
+        }
+        processor.onNext(event);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    void onComplete() {
+      lock.lock();
+      try {
+        if (!completed) {
+          completed = true;
+          processor.onComplete();
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @VisibleForTesting
+    UnicastProcessor<PolarisEvent> processor() {
+      return processor;
+    }
+  }
+
   @VisibleForTesting
-  final LoadingCache<String, UnicastProcessor<PolarisEvent>> processors =
+  final LoadingCache<String, EventProcessor> processors =
       Caffeine.newBuilder()
           .expireAfterAccess(Duration.ofHours(1))
           .evictionListener(
-              (String realmId, UnicastProcessor<?> processor, RemovalCause 
cause) ->
-                  completeSynchronized(processor))
+              (String realmId, EventProcessor processor, RemovalCause cause) 
-> {
+                if (processor != null) {
+                  processor.onComplete();
+                }
+              })
           .build(this::createProcessor);
 
+  // Construct via a method (not EventProcessor::new) so that when the cache 
lives on a CDI client
+  // proxy, the call is delegated to the contextual bean instance, whose 
injected configuration is
+  // non-null. A direct inner-class instantiation would capture the proxy as 
the enclosing instance
+  // and read its uninjected (null) configuration.

Review Comment:
   Nice catch 👍 – but now you also need to make `EventProcessor` protected, 
otherwise we'd be leaking a package-private type through a protected method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to