This is an automated email from the ASF dual-hosted git repository.

adutra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new e53a3522f chore(events): unify in-memory buffer listeners 
implementations (#2628)
e53a3522f is described below

commit e53a3522f4097cce90b6de89ebf123b0ab905be6
Author: Alexandre Dutra <[email protected]>
AuthorDate: Mon Sep 22 10:58:35 2025 +0200

    chore(events): unify in-memory buffer listeners implementations (#2628)
---
 ...emoryBufferPolarisPersistenceEventListener.java | 202 -------------
 ...tener.java => InMemoryBufferEventListener.java} |   7 +-
 .../InMemoryBufferEventListenerConfiguration.java  |   2 +-
 ...yBufferPolarisPersistenceEventListenerTest.java | 324 ---------------------
 ...InMemoryBufferEventListenerBufferSizeTest.java} |   4 +-
 ...InMemoryBufferEventListenerBufferTimeTest.java} |   4 +-
 ...va => InMemoryBufferEventListenerTestBase.java} |  12 +-
 7 files changed, 14 insertions(+), 541 deletions(-)

diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
deleted file mode 100644
index 481c599bc..000000000
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.events.listeners;
-
-import com.google.common.annotations.VisibleForTesting;
-import io.smallrye.common.annotation.Identifier;
-import jakarta.annotation.Nullable;
-import jakarta.annotation.PostConstruct;
-import jakarta.annotation.PreDestroy;
-import jakarta.enterprise.context.ApplicationScoped;
-import jakarta.inject.Inject;
-import jakarta.ws.rs.container.ContainerRequestContext;
-import jakarta.ws.rs.core.Context;
-import jakarta.ws.rs.core.SecurityContext;
-import java.time.Clock;
-import java.time.Duration;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import org.apache.polaris.core.PolarisCallContext;
-import org.apache.polaris.core.context.CallContext;
-import org.apache.polaris.core.context.RealmContext;
-import org.apache.polaris.core.entity.PolarisEvent;
-import org.apache.polaris.core.persistence.BasePersistence;
-import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
-import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Event listener that buffers in memory and then dumps to persistence. */
-@ApplicationScoped
-@Identifier("persistence-in-memory-buffer")
-public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
-  private static final Logger LOGGER =
-      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
-  private static final String REQUEST_ID_KEY = "requestId";
-  private final MetaStoreManagerFactory metaStoreManagerFactory;
-
-  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueueWithApproximateSize<PolarisEvent>>
-      buffer = new ConcurrentHashMap<>();
-  private final ScheduledExecutorService executor;
-  private final ConcurrentHashMap<String, Future<?>> futures = new 
ConcurrentHashMap<>();
-  private ScheduledFuture<?> scheduledFuture;
-  private final Duration timeToFlush;
-  private final int maxBufferSize;
-
-  @Inject CallContext callContext;
-  @Inject Clock clock;
-  @Context SecurityContext securityContext;
-  @Context ContainerRequestContext containerRequestContext;
-
-  @Inject
-  public InMemoryBufferPolarisPersistenceEventListener(
-      MetaStoreManagerFactory metaStoreManagerFactory,
-      Clock clock,
-      InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
-    this.metaStoreManagerFactory = metaStoreManagerFactory;
-    this.clock = clock;
-    this.timeToFlush = eventListenerConfiguration.bufferTime();
-    this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
-
-    executor = Executors.newSingleThreadScheduledExecutor();
-  }
-
-  @PostConstruct
-  void start() {
-    scheduledFuture =
-        executor.scheduleAtFixedRate(
-            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS);
-  }
-
-  void runCleanup() {
-    for (String realmId : buffer.keySet()) {
-      try {
-        checkAndFlushBufferIfNecessary(realmId, false);
-      } catch (Exception e) {
-        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
-      }
-    }
-  }
-
-  @PreDestroy
-  void shutdown() {
-    scheduledFuture.cancel(false);
-    futures.forEach((key, future) -> future.cancel(false));
-    executor.shutdown();
-
-    try {
-      if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
-        executor.shutdownNow();
-        if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
-          LOGGER.warn("Executor did not shut down cleanly");
-        }
-      }
-    } catch (InterruptedException e) {
-      executor.shutdownNow();
-      Thread.currentThread().interrupt();
-    } finally {
-      for (String realmId : buffer.keySet()) {
-        try {
-          checkAndFlushBufferIfNecessary(realmId, true);
-        } catch (Exception e) {
-          LOGGER.debug("Buffer flushing task failed for realm ({}): ", 
realmId, e);
-        }
-      }
-    }
-  }
-
-  @Nullable
-  @Override
-  protected String getRequestId() {
-    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
-      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
-    }
-    return null;
-  }
-
-  @Override
-  protected void processEvent(PolarisEvent polarisEvent) {
-    String realmId = callContext.getRealmContext().getRealmIdentifier();
-
-    ConcurrentLinkedQueueWithApproximateSize<PolarisEvent> realmQueue =
-        buffer.computeIfAbsent(realmId, k -> new 
ConcurrentLinkedQueueWithApproximateSize<>());
-    realmQueue.add(polarisEvent);
-    if (realmQueue.size() >= maxBufferSize) {
-      futures.compute(
-          realmId,
-          (k, v) -> {
-            if (v == null || v.isDone()) {
-              return executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId, true));
-            }
-            return v;
-          });
-    }
-  }
-
-  @VisibleForTesting
-  void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) {
-    ConcurrentLinkedQueueWithApproximateSize<PolarisEvent> queue = 
buffer.get(realmId);
-    if (queue == null || queue.isEmpty()) {
-      return;
-    }
-
-    PolarisEvent head = queue.peek();
-    if (head == null) {
-      return;
-    }
-
-    Duration elapsed = Duration.ofMillis(clock.millis() - 
head.getTimestampMs());
-
-    if (elapsed.compareTo(timeToFlush) > 0 || queue.size() >= maxBufferSize || 
forceFlush) {
-      // Atomically replace old queue with new queue
-      boolean replaced =
-          buffer.replace(realmId, queue, new 
ConcurrentLinkedQueueWithApproximateSize<>());
-      if (!replaced) {
-        // Another thread concurrently modified the buffer, so do not continue
-        return;
-      }
-
-      RealmContext realmContext = () -> realmId;
-      PolarisMetaStoreManager metaStoreManager =
-          metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
-      BasePersistence basePersistence = 
metaStoreManagerFactory.getOrCreateSession(realmContext);
-      metaStoreManager.writeEvents(
-          new PolarisCallContext(realmContext, basePersistence), 
queue.stream().toList());
-
-      if (buffer.get(realmId).size() >= maxBufferSize) {
-        // Ensure that all events will be flushed, even if the race condition 
is triggered where
-        // new events are added between replacing the buffer above and the 
finishing of this method.
-        futures.put(realmId, executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId, true)));
-      }
-    }
-  }
-
-  @Override
-  protected ContextSpecificInformation getContextSpecificInformation() {
-    return new ContextSpecificInformation(
-        clock.millis(),
-        securityContext.getUserPrincipal() == null
-            ? null
-            : securityContext.getUserPrincipal().getName());
-  }
-}
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java
similarity index 95%
rename from 
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java
rename to 
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java
index 8af66b194..6fd88d03f 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java
@@ -44,7 +44,6 @@ import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.context.RealmContext;
 import org.apache.polaris.core.entity.PolarisEvent;
 import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
-import 
org.apache.polaris.service.events.listeners.InMemoryBufferEventListenerConfiguration;
 import 
org.apache.polaris.service.events.listeners.PolarisPersistenceEventListener;
 import org.eclipse.microprofile.faulttolerance.Fallback;
 import org.eclipse.microprofile.faulttolerance.Retry;
@@ -52,10 +51,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @ApplicationScoped
-@Identifier("persistence-in-memory")
-public class InMemoryEventListener extends PolarisPersistenceEventListener {
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferEventListener extends 
PolarisPersistenceEventListener {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryEventListener.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryBufferEventListener.class);
 
   @Inject CallContext callContext;
   @Inject Clock clock;
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferEventListenerConfiguration.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerConfiguration.java
similarity index 95%
rename from 
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferEventListenerConfiguration.java
rename to 
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerConfiguration.java
index 277676f10..bae29a9a2 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferEventListenerConfiguration.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerConfiguration.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.polaris.service.events.listeners;
+package org.apache.polaris.service.events.listeners.inmemory;
 
 import io.quarkus.runtime.annotations.StaticInitSafe;
 import io.smallrye.config.ConfigMapping;
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java
deleted file mode 100644
index 026e741d4..000000000
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.polaris.service.events.listeners;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import jakarta.ws.rs.container.ContainerRequestContext;
-import java.time.Duration;
-import java.time.Instant;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.polaris.core.PolarisCallContext;
-import org.apache.polaris.core.context.CallContext;
-import org.apache.polaris.core.context.RealmContext;
-import org.apache.polaris.core.entity.PolarisEvent;
-import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
-import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.parallel.Execution;
-import org.junit.jupiter.api.parallel.ExecutionMode;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-import org.threeten.extra.MutableClock;
-
-public class InMemoryBufferPolarisPersistenceEventListenerTest {
-  private InMemoryBufferPolarisPersistenceEventListener eventListener;
-  private PolarisMetaStoreManager polarisMetaStoreManager;
-  private MutableClock clock;
-  private CallContext callContext;
-
-  private static final int CONFIG_MAX_BUFFER_SIZE = 5;
-  private static final Duration CONFIG_TIME_TO_FLUSH_IN_MS = 
Duration.ofMillis(500);
-
-  @BeforeEach
-  public void setUp() {
-    callContext = Mockito.mock(CallContext.class);
-    PolarisCallContext polarisCallContext = 
Mockito.mock(PolarisCallContext.class);
-    when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext);
-
-    MetaStoreManagerFactory metaStoreManagerFactory = 
Mockito.mock(MetaStoreManagerFactory.class);
-    polarisMetaStoreManager = Mockito.mock(PolarisMetaStoreManager.class);
-    when(metaStoreManagerFactory.getOrCreateMetaStoreManager(any()))
-        .thenReturn(polarisMetaStoreManager);
-
-    InMemoryBufferEventListenerConfiguration eventListenerConfiguration =
-        Mockito.mock(InMemoryBufferEventListenerConfiguration.class);
-    
when(eventListenerConfiguration.maxBufferSize()).thenReturn(CONFIG_MAX_BUFFER_SIZE);
-    
when(eventListenerConfiguration.bufferTime()).thenReturn(CONFIG_TIME_TO_FLUSH_IN_MS);
-
-    clock =
-        MutableClock.of(
-            Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to 
make it easier to test
-
-    eventListener =
-        new InMemoryBufferPolarisPersistenceEventListener(
-            metaStoreManagerFactory, clock, eventListenerConfiguration);
-
-    eventListener.callContext = callContext;
-  }
-
-  @Test
-  public void testProcessEventFlushesAfterConfiguredTime() {
-    String realmId = "realm1";
-    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realmId);
-
-    // Push clock forwards to flush the buffer
-    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
-    eventListener.checkAndFlushBufferIfNecessary(realmId, false);
-    verify(polarisMetaStoreManager, times(1)).writeEvents(any(), 
eq(eventsAddedToBuffer));
-  }
-
-  @Test
-  public void testProcessEventFlushesAfterMaxEvents() {
-    String realm1 = "realm1";
-    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realm1);
-    List<PolarisEvent> eventsAddedToBufferRealm2 = 
addEventsWithoutTriggeringFlush("realm2");
-
-    // Add the last event for realm1 and verify that it did trigger the flush
-    PolarisEvent triggeringEvent = createSampleEvent();
-    RealmContext realmContext = () -> realm1;
-    when(callContext.getRealmContext()).thenReturn(realmContext);
-    eventListener.processEvent(triggeringEvent);
-    eventsAddedToBuffer.add(triggeringEvent);
-
-    // Calling checkAndFlushBufferIfNecessary manually to replicate the 
behavior of the executor
-    // service
-    eventListener.checkAndFlushBufferIfNecessary(realm1, false);
-    verify(polarisMetaStoreManager, times(1)).writeEvents(any(), 
eq(eventsAddedToBuffer));
-    verify(polarisMetaStoreManager, times(0)).writeEvents(any(), 
eq(eventsAddedToBufferRealm2));
-  }
-
-  @Test
-  public void testCheckAndFlushBufferIfNecessaryIsThreadSafe() throws 
Exception {
-    String realmId = "realm1";
-    int threadCount = 10;
-    List<Thread> threads = new ArrayList<>();
-    ConcurrentLinkedQueue<Exception> exceptions = new 
ConcurrentLinkedQueue<>();
-
-    // Pre-populate the buffer with events
-    List<PolarisEvent> events = addEventsWithoutTriggeringFlush(realmId);
-
-    // Push clock forwards to flush the buffer
-    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
-
-    // Each thread will call checkAndFlushBufferIfNecessary concurrently
-    for (int i = 0; i < threadCount; i++) {
-      Thread t =
-          new Thread(
-              () -> {
-                try {
-                  eventListener.checkAndFlushBufferIfNecessary(realmId, false);
-                } catch (Exception e) {
-                  exceptions.add(e);
-                }
-              });
-      threads.add(t);
-    }
-    // Start all threads
-    threads.forEach(Thread::start);
-    // Wait for all threads to finish
-    for (Thread t : threads) {
-      t.join();
-    }
-    // There should be no exceptions
-    if (!exceptions.isEmpty()) {
-      throw new AssertionError(
-          "Exceptions occurred in concurrent checkAndFlushBufferIfNecessary: 
", exceptions.peek());
-    }
-    // Only one flush should occur
-    verify(polarisMetaStoreManager, times(1)).writeEvents(any(), eq(events));
-  }
-
-  @Execution(ExecutionMode.SAME_THREAD)
-  @Test
-  public void testProcessEventIsThreadSafe() throws Exception {
-    String realmId = "realm1";
-    when(callContext.getRealmContext()).thenReturn(() -> realmId);
-    int threadCount = 10;
-    List<Thread> threads = new ArrayList<>();
-    ConcurrentLinkedQueue<Exception> exceptions = new 
ConcurrentLinkedQueue<>();
-    ConcurrentLinkedQueue<PolarisEvent> allEvents = new 
ConcurrentLinkedQueue<>();
-    eventListener.start();
-
-    for (int i = 0; i < threadCount; i++) {
-      Thread t =
-          new Thread(
-              () -> {
-                try {
-                  for (int j = 0; j < 10; j++) {
-                    PolarisEvent event = createSampleEvent();
-                    allEvents.add(event);
-                    eventListener.processEvent(event);
-                  }
-                } catch (Exception e) {
-                  exceptions.add(e);
-                }
-              });
-      threads.add(t);
-    }
-
-    // Start all threads
-    threads.forEach(Thread::start);
-    // Wait for all threads to finish
-    for (Thread t : threads) {
-      t.join();
-    }
-    // There should be no exceptions
-    if (!exceptions.isEmpty()) {
-      throw new AssertionError(
-          "Exceptions occurred in concurrent processEvent: ", 
exceptions.peek());
-    }
-
-    Awaitility.await("expected amount of records should be processed")
-        .atMost(Duration.ofSeconds(30))
-        .pollDelay(Duration.ofMillis(500))
-        .pollInterval(Duration.ofMillis(500))
-        .untilAsserted(
-            () -> {
-              clock.add(500, ChronoUnit.MILLIS);
-              ArgumentCaptor<List<PolarisEvent>> eventsCaptor = 
ArgumentCaptor.captor();
-              verify(polarisMetaStoreManager, atLeastOnce())
-                  .writeEvents(any(), eventsCaptor.capture());
-              List<PolarisEvent> eventsProcessed =
-                  
eventsCaptor.getAllValues().stream().flatMap(List::stream).toList();
-              if (eventsProcessed.size() > 100) {
-                eventsProcessed = new ArrayList<>();
-              }
-              
assertThat(eventsProcessed.size()).isGreaterThanOrEqualTo(allEvents.size());
-            });
-    ArgumentCaptor<List<PolarisEvent>> eventsCaptor = ArgumentCaptor.captor();
-    verify(polarisMetaStoreManager, atLeastOnce()).writeEvents(any(), 
eventsCaptor.capture());
-    List<PolarisEvent> seenEvents =
-        eventsCaptor.getAllValues().stream().flatMap(List::stream).toList();
-    assertThat(seenEvents.size()).isEqualTo(allEvents.size());
-    assertThat(seenEvents).hasSameElementsAs(allEvents);
-  }
-
-  @Test
-  public void testRequestIdFunctionalityWithContainerRequestContext() {
-    // Test when containerRequestContext has requestId property
-    ContainerRequestContext mockContainerRequestContext =
-        Mockito.mock(ContainerRequestContext.class);
-    String expectedRequestId = "custom-request-id-123";
-
-    
when(mockContainerRequestContext.hasProperty("requestId")).thenReturn(true);
-    
when(mockContainerRequestContext.getProperty("requestId")).thenReturn(expectedRequestId);
-
-    eventListener.containerRequestContext = mockContainerRequestContext;
-
-    String actualRequestId = eventListener.getRequestId();
-    assertThat(actualRequestId)
-        .as("Expected requestId '" + expectedRequestId + "' but got '" + 
actualRequestId + "'")
-        .isEqualTo(expectedRequestId);
-  }
-
-  @Test
-  public void testRequestIdFunctionalityWithoutContainerRequestContext() {
-    // Test when containerRequestContext is null
-    try {
-      java.lang.reflect.Field field =
-          InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField(
-              "containerRequestContext");
-      field.setAccessible(true);
-      field.set(eventListener, null);
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to set containerRequestContext 
field", e);
-    }
-
-    String requestId1 = eventListener.getRequestId();
-    String requestId2 = eventListener.getRequestId();
-
-    assertThat(requestId1 == null).isTrue();
-    assertThat(requestId2 == null).isTrue();
-  }
-
-  @Test
-  public void 
testRequestIdFunctionalityWithContainerRequestContextButNoProperty() {
-    // Test when containerRequestContext exists but doesn't have requestId 
property
-    ContainerRequestContext mockContainerRequestContext =
-        Mockito.mock(ContainerRequestContext.class);
-    
when(mockContainerRequestContext.hasProperty("requestId")).thenReturn(false);
-    eventListener.containerRequestContext = mockContainerRequestContext;
-
-    String requestId = eventListener.getRequestId();
-
-    assertThat(requestId == null).isTrue();
-
-    // Verify that getProperty was never called since hasProperty returned 
false
-    verify(mockContainerRequestContext, times(0)).getProperty("requestId");
-  }
-
-  private List<PolarisEvent> addEventsWithoutTriggeringFlush(String realmId) {
-    List<PolarisEvent> realmEvents = new ArrayList<>();
-    for (int i = 0; i < CONFIG_MAX_BUFFER_SIZE - 1; i++) {
-      realmEvents.add(createSampleEvent());
-    }
-    RealmContext realmContext = () -> realmId;
-    when(callContext.getRealmContext()).thenReturn(realmContext);
-    for (PolarisEvent realmEvent : realmEvents) {
-      eventListener.processEvent(realmEvent);
-    }
-    verify(polarisMetaStoreManager, times(0)).writeEvents(any(), any());
-    return realmEvents;
-  }
-
-  private PolarisEvent createSampleEvent() {
-    String catalogId = "test-catalog";
-    String id = UUID.randomUUID().toString();
-    String requestId = "test-request-id";
-    String eventType = "TEST_EVENT";
-    long timestampMs = 0;
-    String principalName = "test-user";
-    PolarisEvent.ResourceType resourceType = PolarisEvent.ResourceType.TABLE;
-    String resourceIdentifier = "test-table";
-
-    PolarisEvent event =
-        new PolarisEvent(
-            catalogId,
-            id,
-            requestId,
-            eventType,
-            timestampMs,
-            principalName,
-            resourceType,
-            resourceIdentifier);
-
-    Map<String, String> additionalParams = new HashMap<>();
-    additionalParams.put("testKey", "testValue");
-    event.setAdditionalProperties(additionalParams);
-
-    return event;
-  }
-}
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerBufferSizeTest.java
similarity index 95%
rename from 
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java
rename to 
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerBufferSizeTest.java
index 1cb76cdcf..7ee12ebc0 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerBufferSizeTest.java
@@ -39,8 +39,8 @@ import org.mockito.Mockito;
 
 @QuarkusTest
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-@TestProfile(InMemoryEventListenerBufferSizeTest.Profile.class)
-class InMemoryEventListenerBufferSizeTest extends 
InMemoryEventListenerTestBase {
+@TestProfile(InMemoryBufferEventListenerBufferSizeTest.Profile.class)
+class InMemoryBufferEventListenerBufferSizeTest extends 
InMemoryBufferEventListenerTestBase {
 
   public static class Profile implements QuarkusTestProfile {
 
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerBufferTimeTest.java
similarity index 91%
rename from 
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java
rename to 
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerBufferTimeTest.java
index 8431274ea..3a6c7210e 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerBufferTimeTest.java
@@ -29,8 +29,8 @@ import org.junit.jupiter.api.TestInstance;
 
 @QuarkusTest
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-@TestProfile(InMemoryEventListenerBufferTimeTest.Profile.class)
-class InMemoryEventListenerBufferTimeTest extends 
InMemoryEventListenerTestBase {
+@TestProfile(InMemoryBufferEventListenerBufferTimeTest.Profile.class)
+class InMemoryBufferEventListenerBufferTimeTest extends 
InMemoryBufferEventListenerTestBase {
 
   public static class Profile implements QuarkusTestProfile {
 
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerTestBase.java
similarity index 92%
rename from 
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java
rename to 
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerTestBase.java
index 527491b56..d86fb44e8 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerTestBase.java
@@ -42,7 +42,7 @@ import org.apache.polaris.core.entity.PolarisEvent;
 import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
 import org.junit.jupiter.api.AfterEach;
 
-abstract class InMemoryEventListenerTestBase {
+abstract class InMemoryBufferEventListenerTestBase {
 
   static final Map<String, String> BASE_CONFIG =
       ImmutableMap.<String, String>builder()
@@ -53,18 +53,18 @@ abstract class InMemoryEventListenerTestBase {
           .put(
               "quarkus.datasource.jdbc.url",
               
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE")
-          .put("polaris.event-listener.type", "persistence-in-memory")
+          .put("polaris.event-listener.type", "persistence-in-memory-buffer")
           .put(
-              
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryEventListener/flush\".retry.max-retries",
+              
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryBufferEventListener/flush\".retry.max-retries",
               "1")
           .put(
-              
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryEventListener/flush\".retry.delay",
+              
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryBufferEventListener/flush\".retry.delay",
               "10")
           .build();
 
   @Inject
-  @Identifier("persistence-in-memory")
-  InMemoryEventListener producer;
+  @Identifier("persistence-in-memory-buffer")
+  InMemoryBufferEventListener producer;
 
   @InjectSpy
   @Identifier("relational-jdbc")

Reply via email to