adutra commented on code in PR #4577:
URL: https://github.com/apache/polaris/pull/4577#discussion_r3322988335
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -51,58 +53,139 @@ public class InMemoryBufferEventListener extends
PolarisPersistenceEventListener
@Inject MetaStoreManagerFactory metaStoreManagerFactory;
@Inject InMemoryBufferEventListenerConfiguration configuration;
+ /**
+ * Thrown by {@link EventProcessor#onNext} when the wrapped processor has
already completed (for
+ * example, it was evicted between the cache lookup and the {@code onNext}
call). {@link
+ * #processEvent} catches it and retries with a freshly loaded processor.
+ */
+ private static final class EvictedException extends IllegalStateException {}
Review Comment:
nit: after looking at this code again, I think `CompletedException` makes
more sense.
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -51,58 +53,139 @@ public class InMemoryBufferEventListener extends
PolarisPersistenceEventListener
@Inject MetaStoreManagerFactory metaStoreManagerFactory;
@Inject InMemoryBufferEventListenerConfiguration configuration;
+ /**
+ * Thrown by {@link EventProcessor#onNext} when the wrapped processor has
already completed (for
+ * example, it was evicted between the cache lookup and the {@code onNext}
call). {@link
+ * #processEvent} catches it and retries with a freshly loaded processor.
+ */
+ private static final class EvictedException extends IllegalStateException {}
+
+ /**
+ * Wraps a {@link UnicastProcessor} together with its own lock, so that
mutual exclusion between
+ * {@code onNext} (from {@link #processEvent}) and {@code onComplete} (from
eviction or shutdown)
+ * does not depend on smallrye-mutiny's internal {@code synchronized} on
{@code
+ * UnicastProcessor.onNext}.
+ */
+ final class EventProcessor {
+
+ private final UnicastProcessor<PolarisEvent> processor;
+ private final ReentrantLock lock = new ReentrantLock();
+ private boolean completed = false; // guarded by lock
+
+ EventProcessor(String realmId) {
+ processor = UnicastProcessor.create();
+ processor
+ .emitOn(Infrastructure.getDefaultWorkerPool())
+ .group()
+ .intoLists()
+ .of(configuration.maxBufferSize(), configuration.bufferTime())
+ .subscribe()
+ .with(events -> flush(realmId, events), error ->
onProcessorError(realmId, error));
+ }
+
+ void onNext(PolarisEvent event) {
+ lock.lock();
+ try {
+ if (completed) {
+ throw new EvictedException();
+ }
+ processor.onNext(event);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void onComplete() {
+ lock.lock();
+ try {
+ if (!completed) {
+ completed = true;
+ processor.onComplete();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ UnicastProcessor<PolarisEvent> processor() {
+ return processor;
+ }
+ }
+
@VisibleForTesting
- final LoadingCache<String, UnicastProcessor<PolarisEvent>> processors =
+ final LoadingCache<String, EventProcessor> processors =
Caffeine.newBuilder()
.expireAfterAccess(Duration.ofHours(1))
.evictionListener(
- (String realmId, UnicastProcessor<?> processor, RemovalCause
cause) ->
- completeSynchronized(processor))
+ (String realmId, EventProcessor processor, RemovalCause cause)
-> {
+ if (processor != null) {
+ processor.onComplete();
+ }
+ })
.build(this::createProcessor);
+ // Construct via a method (not EventProcessor::new) so that when the cache
lives on a CDI client
+ // proxy, the call is delegated to the contextual bean instance, whose
injected configuration is
+ // non-null. A direct inner-class instantiation would capture the proxy as
the enclosing instance
+ // and read its uninjected (null) configuration.
+ protected EventProcessor createProcessor(String realmId) {
+ return new EventProcessor(realmId);
+ }
+
+ private final ReentrantReadWriteLock shutdownLock = new
ReentrantReadWriteLock();
+ private boolean shutdown = false; // guarded by shutdownLock
+
@Override
protected void processEvent(String realmId, PolarisEvent event) {
- var processor = Objects.requireNonNull(processors.get(realmId));
- // UnicastProcessor.onNext() is internally synchronized (smallrye-mutiny
- // UnicastProcessor declares onNext as `public synchronized void`), so
concurrent
- // processEvent() calls for the same realm serialize on the processor's
intrinsic
- // lock without an external guard.
- processor.onNext(event);
+ shutdownLock.readLock().lock();
+ try {
+ if (shutdown) {
+ return;
+ }
+ while (true) {
+ var processor = Objects.requireNonNull(processors.get(realmId));
+ try {
+ processor.onNext(event);
+ return;
+ } catch (EvictedException ignored) {
+ // processor was evicted between the cache lookup and onNext; retry
with a fresh one
+ }
+ }
+ } finally {
+ shutdownLock.readLock().unlock();
+ }
}
@PreDestroy
public void shutdown() {
-
processors.asMap().values().forEach(InMemoryBufferEventListener::completeSynchronized);
- processors.invalidateAll(); // doesn't call the eviction listener
+ shutdownLock.writeLock().lock();
+ try {
+ shutdown = true;
+ processors.asMap().values().forEach(EventProcessor::onComplete);
+ processors.invalidateAll(); // doesn't call the eviction listener
+ } finally {
+ shutdownLock.writeLock().unlock();
+ }
}
/**
- * Calls {@link UnicastProcessor#onComplete()} while holding the processor's
intrinsic monitor.
- *
- * <p>smallrye-mutiny's {@code UnicastProcessor.onNext} is method-{@code
synchronized} on the
- * processor instance; {@code onComplete} is not. Acquiring the same
intrinsic monitor here
- * restores symmetric mutual exclusion between concurrent {@code onNext}
(from {@code
- * processEvent}) and {@code onComplete} (from eviction or shutdown).
Wrapping the pattern as an
- * invariant keeps the synchronization requirement structurally visible to
future maintainers.
+ * Re-enables the listener after a {@link #shutdown()}. {@code shutdown} is
terminal in production
+ * (it runs on {@code @PreDestroy}), but tests reuse the same {@code
@ApplicationScoped} bean
+ * across methods and call {@link #shutdown()} between them, so they need a
way to clear the flag
+ * and start fresh.
*/
- private static void completeSynchronized(UnicastProcessor<?> processor) {
- synchronized (processor) {
- processor.onComplete();
+ @VisibleForTesting
+ void reset() {
Review Comment:
I prefer when `@VisibleForTesting` is used to broaden the visibility of a
method that would otherwise have been private. But here, we are introducing a
method that has no production usage.
Instead, there is a better approach: in
`InMemoryBufferEventListenerTestBase`, declare the producer as
`Instance<InMemoryBufferEventListener>`, then call `Instance.destroy()` in
`clearEvents()`:
```java
@Inject
@Identifier("persistence-in-memory-buffer")
Instance<InMemoryBufferEventListener> producer;
@AfterEach
void clearEvents() throws Exception {
producer.destroy(producer.get());
// ...
}
```
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -51,58 +53,139 @@ public class InMemoryBufferEventListener extends
PolarisPersistenceEventListener
@Inject MetaStoreManagerFactory metaStoreManagerFactory;
@Inject InMemoryBufferEventListenerConfiguration configuration;
+ /**
+ * Thrown by {@link EventProcessor#onNext} when the wrapped processor has
already completed (for
+ * example, it was evicted between the cache lookup and the {@code onNext}
call). {@link
+ * #processEvent} catches it and retries with a freshly loaded processor.
+ */
+ private static final class EvictedException extends IllegalStateException {}
+
+ /**
+ * Wraps a {@link UnicastProcessor} together with its own lock, so that
mutual exclusion between
+ * {@code onNext} (from {@link #processEvent}) and {@code onComplete} (from
eviction or shutdown)
+ * does not depend on smallrye-mutiny's internal {@code synchronized} on
{@code
+ * UnicastProcessor.onNext}.
+ */
+ final class EventProcessor {
+
+ private final UnicastProcessor<PolarisEvent> processor;
+ private final ReentrantLock lock = new ReentrantLock();
+ private boolean completed = false; // guarded by lock
+
+ EventProcessor(String realmId) {
+ processor = UnicastProcessor.create();
+ processor
+ .emitOn(Infrastructure.getDefaultWorkerPool())
+ .group()
+ .intoLists()
+ .of(configuration.maxBufferSize(), configuration.bufferTime())
+ .subscribe()
+ .with(events -> flush(realmId, events), error ->
onProcessorError(realmId, error));
+ }
+
+ void onNext(PolarisEvent event) {
+ lock.lock();
+ try {
+ if (completed) {
+ throw new EvictedException();
+ }
+ processor.onNext(event);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void onComplete() {
+ lock.lock();
+ try {
+ if (!completed) {
+ completed = true;
+ processor.onComplete();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ UnicastProcessor<PolarisEvent> processor() {
+ return processor;
+ }
+ }
+
@VisibleForTesting
- final LoadingCache<String, UnicastProcessor<PolarisEvent>> processors =
+ final LoadingCache<String, EventProcessor> processors =
Caffeine.newBuilder()
.expireAfterAccess(Duration.ofHours(1))
.evictionListener(
- (String realmId, UnicastProcessor<?> processor, RemovalCause
cause) ->
- completeSynchronized(processor))
+ (String realmId, EventProcessor processor, RemovalCause cause)
-> {
+ if (processor != null) {
+ processor.onComplete();
+ }
+ })
.build(this::createProcessor);
+ // Construct via a method (not EventProcessor::new) so that when the cache
lives on a CDI client
+ // proxy, the call is delegated to the contextual bean instance, whose
injected configuration is
+ // non-null. A direct inner-class instantiation would capture the proxy as
the enclosing instance
+ // and read its uninjected (null) configuration.
Review Comment:
Nice catch 👍 – but now you also need to make `EventProcessor` protected,
otherwise we'd be leaking a package-private type through a protected method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]