adutra commented on PR #4498:
URL: https://github.com/apache/polaris/pull/4498#issuecomment-4501582601

   OK I'm spending too much time on this 😅 – but I think I have a version that 
eliminates all possible race conditions:
   
   ```java
   @ApplicationScoped
   @Identifier("persistence-in-memory-buffer")
   public class InMemoryBufferEventListener extends 
PolarisPersistenceEventListener {
   
     private static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryBufferEventListener.class);
   
     @Inject MetaStoreManagerFactory metaStoreManagerFactory;
     @Inject InMemoryBufferEventListenerConfiguration configuration;
   
     private static final class EvictedException extends IllegalStateException 
{}
   
     public final class Processor {
   
       private final UnicastProcessor<PolarisEvent> processor;
       private final ReentrantLock lock = new ReentrantLock();
       private boolean completed = false; // guarded by lock
   
       public Processor(String realmId) {
         processor = UnicastProcessor.create();
         processor
             .emitOn(Infrastructure.getDefaultWorkerPool())
             .group()
             .intoLists()
             .of(configuration.maxBufferSize(), configuration.bufferTime())
             .subscribe()
             .with(events -> flush(realmId, events), error -> 
onProcessorError(realmId, error));
       }
   
       public void onNext(PolarisEvent event) {
         lock.lock();
         try {
           if (completed) {
             throw new EvictedException();
           }
           processor.onNext(event);
         } finally {
           lock.unlock();
         }
       }
   
       public void onComplete() {
         lock.lock();
         try {
           if (!completed) {
             completed = true;
             processor.onComplete();
           }
         } finally {
           lock.unlock();
         }
       }
     }
   
     @VisibleForTesting
     final LoadingCache<String, Processor> processors =
         Caffeine.newBuilder()
             .expireAfterAccess(Duration.ofHours(1))
             .evictionListener(
                 (String realmId, Processor processor, RemovalCause cause) -> {
                   processor.onComplete();
                 })
             .build(Processor::new);
   
     private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
     private boolean shutdown = false; // guarded by shutdownLock
   
     @Override
     protected void processEvent(String realmId, PolarisEvent event) {
       shutdownLock.readLock().lock();
       try {
         if (shutdown) {
           return;
         }
         while (true) {
           var processor = Objects.requireNonNull(processors.get(realmId));
           try {
             processor.onNext(event);
             return;
           } catch (EvictedException ignored) {
             // processor was evicted; loop to get a fresh one
           }
         }
       } finally {
         shutdownLock.readLock().unlock();
       }
     }
   
     @PreDestroy
     public void shutdown() {
       shutdownLock.writeLock().lock();
       try {
         shutdown = true;
         processors.asMap().values().forEach(Processor::onComplete);
         processors.invalidateAll(); // doesn't call the eviction listener
       } finally {
         shutdownLock.writeLock().unlock();
       }
     }
   
     @Retry(maxRetries = 5, delay = 1000, jitter = 100)
     @Fallback(fallbackMethod = "onFlushError")
     protected void flush(String realmId, List<PolarisEvent> events) {
       RealmContext realmContext = () -> realmId;
       var metaStoreManager = 
metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
       var basePersistence = 
metaStoreManagerFactory.getOrCreateSession(realmContext);
       var callContext = new PolarisCallContext(realmContext, basePersistence);
       metaStoreManager.writeEvents(callContext, events);
     }
   
     @SuppressWarnings("unused")
     protected void onFlushError(String realmId, List<PolarisEvent> events, 
Throwable error) {
       LOGGER.error("Failed to persist {} events for realm '{}'", 
events.size(), realmId, error);
     }
   
     protected void onProcessorError(String realmId, Throwable error) {
       LOGGER.error(
           "Unexpected error while processing events for realm '{}'; some 
events may have been dropped",
           realmId,
           error);
       processors.invalidate(realmId);
     }
   }
   ```
   
   The only minor nit is that when onProcessorError is called, the processor is 
left in an inconsistent state (not completed) so a subsequent call to 
`onNext()` is possible – but since the processor is errored out, I don't think 
we care.
   
   Again I'm not sure we need to be that paranoid. I'll defer to your judgement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to