nandorKollar commented on PR #4498:
URL: https://github.com/apache/polaris/pull/4498#issuecomment-4502672144

   > OK I'm spending too much time on this 😅 – but I think I have a version 
that eliminates all possible race conditions:
   > 
   > ```java
   > @ApplicationScoped
   > @Identifier("persistence-in-memory-buffer")
   > public class InMemoryBufferEventListener extends 
PolarisPersistenceEventListener {
   > 
   >   private static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryBufferEventListener.class);
   > 
   >   @Inject MetaStoreManagerFactory metaStoreManagerFactory;
   >   @Inject InMemoryBufferEventListenerConfiguration configuration;
   > 
   >   private static final class EvictedException extends 
IllegalStateException {}
   > 
   >   private final class EventProcessor {
   > 
   >     private final UnicastProcessor<PolarisEvent> processor;
   >     private final ReentrantLock lock = new ReentrantLock();
   >     private boolean completed = false; // guarded by lock
   > 
   >     EventProcessor(String realmId) {
   >       processor = UnicastProcessor.create();
   >       processor
   >           .emitOn(Infrastructure.getDefaultWorkerPool())
   >           .group()
   >           .intoLists()
   >           .of(configuration.maxBufferSize(), configuration.bufferTime())
   >           .subscribe()
   >           .with(events -> flush(realmId, events), error -> 
onProcessorError(realmId, error));
   >     }
   > 
   >    void onNext(PolarisEvent event) {
   >       lock.lock();
   >       try {
   >         if (completed) {
   >           throw new EvictedException();
   >         }
   >         processor.onNext(event);
   >       } finally {
   >         lock.unlock();
   >       }
   >     }
   > 
   >    void onComplete() {
   >       lock.lock();
   >       try {
   >         if (!completed) {
   >           completed = true;
   >           processor.onComplete();
   >         }
   >       } finally {
   >         lock.unlock();
   >       }
   >     }
   >   }
   > 
   >   @VisibleForTesting
   >   final LoadingCache<String, Processor> processors =
   >       Caffeine.newBuilder()
   >           .expireAfterAccess(Duration.ofHours(1))
   >           .evictionListener(
   >               (String realmId, EventProcessor processor, RemovalCause 
cause) -> {
   >                 processor.onComplete();
   >               })
   >           .build(EventProcessor::new);
   > 
   >   private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
   >   private boolean shutdown = false; // guarded by shutdownLock
   > 
   >   @Override
   >   protected void processEvent(String realmId, PolarisEvent event) {
   >     shutdownLock.readLock().lock();
   >     try {
   >       if (shutdown) {
   >         return;
   >       }
   >       while (true) {
   >         var processor = Objects.requireNonNull(processors.get(realmId));
   >         try {
   >           processor.onNext(event);
   >           return;
   >         } catch (EvictedException ignored) {
   >           // processor was evicted; loop to get a fresh one
   >         }
   >       }
   >     } finally {
   >       shutdownLock.readLock().unlock();
   >     }
   >   }
   > 
   >   @PreDestroy
   >   public void shutdown() {
   >     shutdownLock.writeLock().lock();
   >     try {
   >       shutdown = true;
   >       processors.asMap().values().forEach(Processor::onComplete);
   >       processors.invalidateAll(); // doesn't call the eviction listener
   >     } finally {
   >       shutdownLock.writeLock().unlock();
   >     }
   >   }
   > 
   >   @Retry(maxRetries = 5, delay = 1000, jitter = 100)
   >   @Fallback(fallbackMethod = "onFlushError")
   >   protected void flush(String realmId, List<PolarisEvent> events) {
   >     RealmContext realmContext = () -> realmId;
   >     var metaStoreManager = 
metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
   >     var basePersistence = 
metaStoreManagerFactory.getOrCreateSession(realmContext);
   >     var callContext = new PolarisCallContext(realmContext, 
basePersistence);
   >     metaStoreManager.writeEvents(callContext, events);
   >   }
   > 
   >   @SuppressWarnings("unused")
   >   protected void onFlushError(String realmId, List<PolarisEvent> events, 
Throwable error) {
   >     LOGGER.error("Failed to persist {} events for realm '{}'", 
events.size(), realmId, error);
   >   }
   > 
   >   protected void onProcessorError(String realmId, Throwable error) {
   >     LOGGER.error(
   >         "Unexpected error while processing events for realm '{}'; some 
events may have been dropped",
   >         realmId,
   >         error);
   >     processors.invalidate(realmId);
   >   }
   > }
   > ```
   > 
   > The only minor nit is that when onProcessorError is called, the processor 
is left in an inconsistent state (not completed) so a subsequent call to 
`onNext()` is possible – but since the processor is errored out, I don't think 
we care.
   > 
   > Again I'm not sure we need to be that paranoid. I'll defer to your 
judgement.
   
   This uses now two locks when calling `onNext`: our reentrant lock, and the 
monitor of the processor no? I'd expect that this could cause some - probably 
negligible - performance degradation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to