adutra commented on code in PR #4577:
URL: https://github.com/apache/polaris/pull/4577#discussion_r3325259752
##########
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java:
##########
@@ -51,58 +53,122 @@ public class InMemoryBufferEventListener extends
PolarisPersistenceEventListener
@Inject MetaStoreManagerFactory metaStoreManagerFactory;
@Inject InMemoryBufferEventListenerConfiguration configuration;
+ /**
+ * Thrown by {@link EventProcessor#onNext} when the wrapped processor has
already completed (for
+ * example, it was evicted between the cache lookup and the {@code onNext}
call). {@link
+ * #processEvent} catches it and retries with a freshly loaded processor.
+ */
+ private static final class CompletedException extends IllegalStateException
{}
+
+ /**
+ * Wraps a {@link UnicastProcessor} together with its own lock, so that
mutual exclusion between
+ * {@code onNext} (from {@link #processEvent}) and {@code onComplete} (from
eviction or shutdown)
+ * does not depend on smallrye-mutiny's internal {@code synchronized} on
{@code
+ * UnicastProcessor.onNext}.
+ */
+ protected final class EventProcessor {
+
+ private final UnicastProcessor<PolarisEvent> processor;
+ private final ReentrantLock lock = new ReentrantLock();
+ private boolean completed = false; // guarded by lock
+
+ EventProcessor(String realmId) {
+ processor = UnicastProcessor.create();
+ processor
+ .emitOn(Infrastructure.getDefaultWorkerPool())
+ .group()
+ .intoLists()
+ .of(configuration.maxBufferSize(), configuration.bufferTime())
+ .subscribe()
+ .with(events -> flush(realmId, events), error ->
onProcessorError(realmId, error));
+ }
+
+ void onNext(PolarisEvent event) {
+ lock.lock();
+ try {
+ if (completed) {
+ throw new CompletedException();
+ }
+ processor.onNext(event);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void onComplete() {
+ lock.lock();
+ try {
+ if (!completed) {
+ completed = true;
+ processor.onComplete();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ UnicastProcessor<PolarisEvent> processor() {
Review Comment:
Let's make the field package-private directly, I don't see a strong reason
to introduce this getter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]