This is an automated email from the ASF dual-hosted git repository.
adutra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new e53a3522f chore(events): unify in-memory buffer listeners
implementations (#2628)
e53a3522f is described below
commit e53a3522f4097cce90b6de89ebf123b0ab905be6
Author: Alexandre Dutra <[email protected]>
AuthorDate: Mon Sep 22 10:58:35 2025 +0200
chore(events): unify in-memory buffer listeners implementations (#2628)
---
...emoryBufferPolarisPersistenceEventListener.java | 202 -------------
...tener.java => InMemoryBufferEventListener.java} | 7 +-
.../InMemoryBufferEventListenerConfiguration.java | 2 +-
...yBufferPolarisPersistenceEventListenerTest.java | 324 ---------------------
...InMemoryBufferEventListenerBufferSizeTest.java} | 4 +-
...InMemoryBufferEventListenerBufferTimeTest.java} | 4 +-
...va => InMemoryBufferEventListenerTestBase.java} | 12 +-
7 files changed, 14 insertions(+), 541 deletions(-)
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
deleted file mode 100644
index 481c599bc..000000000
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.service.events.listeners;
-
-import com.google.common.annotations.VisibleForTesting;
-import io.smallrye.common.annotation.Identifier;
-import jakarta.annotation.Nullable;
-import jakarta.annotation.PostConstruct;
-import jakarta.annotation.PreDestroy;
-import jakarta.enterprise.context.ApplicationScoped;
-import jakarta.inject.Inject;
-import jakarta.ws.rs.container.ContainerRequestContext;
-import jakarta.ws.rs.core.Context;
-import jakarta.ws.rs.core.SecurityContext;
-import java.time.Clock;
-import java.time.Duration;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import org.apache.polaris.core.PolarisCallContext;
-import org.apache.polaris.core.context.CallContext;
-import org.apache.polaris.core.context.RealmContext;
-import org.apache.polaris.core.entity.PolarisEvent;
-import org.apache.polaris.core.persistence.BasePersistence;
-import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
-import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Event listener that buffers in memory and then dumps to persistence. */
-@ApplicationScoped
-@Identifier("persistence-in-memory-buffer")
-public class InMemoryBufferPolarisPersistenceEventListener extends
PolarisPersistenceEventListener {
- private static final Logger LOGGER =
-
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
- private static final String REQUEST_ID_KEY = "requestId";
- private final MetaStoreManagerFactory metaStoreManagerFactory;
-
- private final ConcurrentHashMap<String,
ConcurrentLinkedQueueWithApproximateSize<PolarisEvent>>
- buffer = new ConcurrentHashMap<>();
- private final ScheduledExecutorService executor;
- private final ConcurrentHashMap<String, Future<?>> futures = new
ConcurrentHashMap<>();
- private ScheduledFuture<?> scheduledFuture;
- private final Duration timeToFlush;
- private final int maxBufferSize;
-
- @Inject CallContext callContext;
- @Inject Clock clock;
- @Context SecurityContext securityContext;
- @Context ContainerRequestContext containerRequestContext;
-
- @Inject
- public InMemoryBufferPolarisPersistenceEventListener(
- MetaStoreManagerFactory metaStoreManagerFactory,
- Clock clock,
- InMemoryBufferEventListenerConfiguration eventListenerConfiguration) {
- this.metaStoreManagerFactory = metaStoreManagerFactory;
- this.clock = clock;
- this.timeToFlush = eventListenerConfiguration.bufferTime();
- this.maxBufferSize = eventListenerConfiguration.maxBufferSize();
-
- executor = Executors.newSingleThreadScheduledExecutor();
- }
-
- @PostConstruct
- void start() {
- scheduledFuture =
- executor.scheduleAtFixedRate(
- this::runCleanup, 0, timeToFlush.toMillis(),
TimeUnit.MILLISECONDS);
- }
-
- void runCleanup() {
- for (String realmId : buffer.keySet()) {
- try {
- checkAndFlushBufferIfNecessary(realmId, false);
- } catch (Exception e) {
- LOGGER.debug("Buffer checking task failed for realm ({}): {}",
realmId, e);
- }
- }
- }
-
- @PreDestroy
- void shutdown() {
- scheduledFuture.cancel(false);
- futures.forEach((key, future) -> future.cancel(false));
- executor.shutdown();
-
- try {
- if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
- executor.shutdownNow();
- if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
- LOGGER.warn("Executor did not shut down cleanly");
- }
- }
- } catch (InterruptedException e) {
- executor.shutdownNow();
- Thread.currentThread().interrupt();
- } finally {
- for (String realmId : buffer.keySet()) {
- try {
- checkAndFlushBufferIfNecessary(realmId, true);
- } catch (Exception e) {
- LOGGER.debug("Buffer flushing task failed for realm ({}): ",
realmId, e);
- }
- }
- }
- }
-
- @Nullable
- @Override
- protected String getRequestId() {
- if (containerRequestContext != null &&
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
- return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
- }
- return null;
- }
-
- @Override
- protected void processEvent(PolarisEvent polarisEvent) {
- String realmId = callContext.getRealmContext().getRealmIdentifier();
-
- ConcurrentLinkedQueueWithApproximateSize<PolarisEvent> realmQueue =
- buffer.computeIfAbsent(realmId, k -> new
ConcurrentLinkedQueueWithApproximateSize<>());
- realmQueue.add(polarisEvent);
- if (realmQueue.size() >= maxBufferSize) {
- futures.compute(
- realmId,
- (k, v) -> {
- if (v == null || v.isDone()) {
- return executor.submit(() ->
checkAndFlushBufferIfNecessary(realmId, true));
- }
- return v;
- });
- }
- }
-
- @VisibleForTesting
- void checkAndFlushBufferIfNecessary(String realmId, boolean forceFlush) {
- ConcurrentLinkedQueueWithApproximateSize<PolarisEvent> queue =
buffer.get(realmId);
- if (queue == null || queue.isEmpty()) {
- return;
- }
-
- PolarisEvent head = queue.peek();
- if (head == null) {
- return;
- }
-
- Duration elapsed = Duration.ofMillis(clock.millis() -
head.getTimestampMs());
-
- if (elapsed.compareTo(timeToFlush) > 0 || queue.size() >= maxBufferSize ||
forceFlush) {
- // Atomically replace old queue with new queue
- boolean replaced =
- buffer.replace(realmId, queue, new
ConcurrentLinkedQueueWithApproximateSize<>());
- if (!replaced) {
- // Another thread concurrently modified the buffer, so do not continue
- return;
- }
-
- RealmContext realmContext = () -> realmId;
- PolarisMetaStoreManager metaStoreManager =
- metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
- BasePersistence basePersistence =
metaStoreManagerFactory.getOrCreateSession(realmContext);
- metaStoreManager.writeEvents(
- new PolarisCallContext(realmContext, basePersistence),
queue.stream().toList());
-
- if (buffer.get(realmId).size() >= maxBufferSize) {
- // Ensure that all events will be flushed, even if the race condition
is triggered where
- // new events are added between replacing the buffer above and the
finishing of this method.
- futures.put(realmId, executor.submit(() ->
checkAndFlushBufferIfNecessary(realmId, true)));
- }
- }
- }
-
- @Override
- protected ContextSpecificInformation getContextSpecificInformation() {
- return new ContextSpecificInformation(
- clock.millis(),
- securityContext.getUserPrincipal() == null
- ? null
- : securityContext.getUserPrincipal().getName());
- }
-}
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java
similarity index 95%
rename from
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java
rename to
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java
index 8af66b194..6fd88d03f 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListener.java
@@ -44,7 +44,6 @@ import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.PolarisEvent;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
-import
org.apache.polaris.service.events.listeners.InMemoryBufferEventListenerConfiguration;
import
org.apache.polaris.service.events.listeners.PolarisPersistenceEventListener;
import org.eclipse.microprofile.faulttolerance.Fallback;
import org.eclipse.microprofile.faulttolerance.Retry;
@@ -52,10 +51,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ApplicationScoped
-@Identifier("persistence-in-memory")
-public class InMemoryEventListener extends PolarisPersistenceEventListener {
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferEventListener extends
PolarisPersistenceEventListener {
- private static final Logger LOGGER =
LoggerFactory.getLogger(InMemoryEventListener.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InMemoryBufferEventListener.class);
@Inject CallContext callContext;
@Inject Clock clock;
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferEventListenerConfiguration.java
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerConfiguration.java
similarity index 95%
rename from
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferEventListenerConfiguration.java
rename to
runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerConfiguration.java
index 277676f10..bae29a9a2 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferEventListenerConfiguration.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerConfiguration.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.polaris.service.events.listeners;
+package org.apache.polaris.service.events.listeners.inmemory;
import io.quarkus.runtime.annotations.StaticInitSafe;
import io.smallrye.config.ConfigMapping;
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java
deleted file mode 100644
index 026e741d4..000000000
---
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.polaris.service.events.listeners;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import jakarta.ws.rs.container.ContainerRequestContext;
-import java.time.Duration;
-import java.time.Instant;
-import java.time.ZoneOffset;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.polaris.core.PolarisCallContext;
-import org.apache.polaris.core.context.CallContext;
-import org.apache.polaris.core.context.RealmContext;
-import org.apache.polaris.core.entity.PolarisEvent;
-import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
-import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.parallel.Execution;
-import org.junit.jupiter.api.parallel.ExecutionMode;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-import org.threeten.extra.MutableClock;
-
-public class InMemoryBufferPolarisPersistenceEventListenerTest {
- private InMemoryBufferPolarisPersistenceEventListener eventListener;
- private PolarisMetaStoreManager polarisMetaStoreManager;
- private MutableClock clock;
- private CallContext callContext;
-
- private static final int CONFIG_MAX_BUFFER_SIZE = 5;
- private static final Duration CONFIG_TIME_TO_FLUSH_IN_MS =
Duration.ofMillis(500);
-
- @BeforeEach
- public void setUp() {
- callContext = Mockito.mock(CallContext.class);
- PolarisCallContext polarisCallContext =
Mockito.mock(PolarisCallContext.class);
- when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext);
-
- MetaStoreManagerFactory metaStoreManagerFactory =
Mockito.mock(MetaStoreManagerFactory.class);
- polarisMetaStoreManager = Mockito.mock(PolarisMetaStoreManager.class);
- when(metaStoreManagerFactory.getOrCreateMetaStoreManager(any()))
- .thenReturn(polarisMetaStoreManager);
-
- InMemoryBufferEventListenerConfiguration eventListenerConfiguration =
- Mockito.mock(InMemoryBufferEventListenerConfiguration.class);
-
when(eventListenerConfiguration.maxBufferSize()).thenReturn(CONFIG_MAX_BUFFER_SIZE);
-
when(eventListenerConfiguration.bufferTime()).thenReturn(CONFIG_TIME_TO_FLUSH_IN_MS);
-
- clock =
- MutableClock.of(
- Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to
make it easier to test
-
- eventListener =
- new InMemoryBufferPolarisPersistenceEventListener(
- metaStoreManagerFactory, clock, eventListenerConfiguration);
-
- eventListener.callContext = callContext;
- }
-
- @Test
- public void testProcessEventFlushesAfterConfiguredTime() {
- String realmId = "realm1";
- List<PolarisEvent> eventsAddedToBuffer =
addEventsWithoutTriggeringFlush(realmId);
-
- // Push clock forwards to flush the buffer
- clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
- eventListener.checkAndFlushBufferIfNecessary(realmId, false);
- verify(polarisMetaStoreManager, times(1)).writeEvents(any(),
eq(eventsAddedToBuffer));
- }
-
- @Test
- public void testProcessEventFlushesAfterMaxEvents() {
- String realm1 = "realm1";
- List<PolarisEvent> eventsAddedToBuffer =
addEventsWithoutTriggeringFlush(realm1);
- List<PolarisEvent> eventsAddedToBufferRealm2 =
addEventsWithoutTriggeringFlush("realm2");
-
- // Add the last event for realm1 and verify that it did trigger the flush
- PolarisEvent triggeringEvent = createSampleEvent();
- RealmContext realmContext = () -> realm1;
- when(callContext.getRealmContext()).thenReturn(realmContext);
- eventListener.processEvent(triggeringEvent);
- eventsAddedToBuffer.add(triggeringEvent);
-
- // Calling checkAndFlushBufferIfNecessary manually to replicate the
behavior of the executor
- // service
- eventListener.checkAndFlushBufferIfNecessary(realm1, false);
- verify(polarisMetaStoreManager, times(1)).writeEvents(any(),
eq(eventsAddedToBuffer));
- verify(polarisMetaStoreManager, times(0)).writeEvents(any(),
eq(eventsAddedToBufferRealm2));
- }
-
- @Test
- public void testCheckAndFlushBufferIfNecessaryIsThreadSafe() throws
Exception {
- String realmId = "realm1";
- int threadCount = 10;
- List<Thread> threads = new ArrayList<>();
- ConcurrentLinkedQueue<Exception> exceptions = new
ConcurrentLinkedQueue<>();
-
- // Pre-populate the buffer with events
- List<PolarisEvent> events = addEventsWithoutTriggeringFlush(realmId);
-
- // Push clock forwards to flush the buffer
- clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
-
- // Each thread will call checkAndFlushBufferIfNecessary concurrently
- for (int i = 0; i < threadCount; i++) {
- Thread t =
- new Thread(
- () -> {
- try {
- eventListener.checkAndFlushBufferIfNecessary(realmId, false);
- } catch (Exception e) {
- exceptions.add(e);
- }
- });
- threads.add(t);
- }
- // Start all threads
- threads.forEach(Thread::start);
- // Wait for all threads to finish
- for (Thread t : threads) {
- t.join();
- }
- // There should be no exceptions
- if (!exceptions.isEmpty()) {
- throw new AssertionError(
- "Exceptions occurred in concurrent checkAndFlushBufferIfNecessary:
", exceptions.peek());
- }
- // Only one flush should occur
- verify(polarisMetaStoreManager, times(1)).writeEvents(any(), eq(events));
- }
-
- @Execution(ExecutionMode.SAME_THREAD)
- @Test
- public void testProcessEventIsThreadSafe() throws Exception {
- String realmId = "realm1";
- when(callContext.getRealmContext()).thenReturn(() -> realmId);
- int threadCount = 10;
- List<Thread> threads = new ArrayList<>();
- ConcurrentLinkedQueue<Exception> exceptions = new
ConcurrentLinkedQueue<>();
- ConcurrentLinkedQueue<PolarisEvent> allEvents = new
ConcurrentLinkedQueue<>();
- eventListener.start();
-
- for (int i = 0; i < threadCount; i++) {
- Thread t =
- new Thread(
- () -> {
- try {
- for (int j = 0; j < 10; j++) {
- PolarisEvent event = createSampleEvent();
- allEvents.add(event);
- eventListener.processEvent(event);
- }
- } catch (Exception e) {
- exceptions.add(e);
- }
- });
- threads.add(t);
- }
-
- // Start all threads
- threads.forEach(Thread::start);
- // Wait for all threads to finish
- for (Thread t : threads) {
- t.join();
- }
- // There should be no exceptions
- if (!exceptions.isEmpty()) {
- throw new AssertionError(
- "Exceptions occurred in concurrent processEvent: ",
exceptions.peek());
- }
-
- Awaitility.await("expected amount of records should be processed")
- .atMost(Duration.ofSeconds(30))
- .pollDelay(Duration.ofMillis(500))
- .pollInterval(Duration.ofMillis(500))
- .untilAsserted(
- () -> {
- clock.add(500, ChronoUnit.MILLIS);
- ArgumentCaptor<List<PolarisEvent>> eventsCaptor =
ArgumentCaptor.captor();
- verify(polarisMetaStoreManager, atLeastOnce())
- .writeEvents(any(), eventsCaptor.capture());
- List<PolarisEvent> eventsProcessed =
-
eventsCaptor.getAllValues().stream().flatMap(List::stream).toList();
- if (eventsProcessed.size() > 100) {
- eventsProcessed = new ArrayList<>();
- }
-
assertThat(eventsProcessed.size()).isGreaterThanOrEqualTo(allEvents.size());
- });
- ArgumentCaptor<List<PolarisEvent>> eventsCaptor = ArgumentCaptor.captor();
- verify(polarisMetaStoreManager, atLeastOnce()).writeEvents(any(),
eventsCaptor.capture());
- List<PolarisEvent> seenEvents =
- eventsCaptor.getAllValues().stream().flatMap(List::stream).toList();
- assertThat(seenEvents.size()).isEqualTo(allEvents.size());
- assertThat(seenEvents).hasSameElementsAs(allEvents);
- }
-
- @Test
- public void testRequestIdFunctionalityWithContainerRequestContext() {
- // Test when containerRequestContext has requestId property
- ContainerRequestContext mockContainerRequestContext =
- Mockito.mock(ContainerRequestContext.class);
- String expectedRequestId = "custom-request-id-123";
-
-
when(mockContainerRequestContext.hasProperty("requestId")).thenReturn(true);
-
when(mockContainerRequestContext.getProperty("requestId")).thenReturn(expectedRequestId);
-
- eventListener.containerRequestContext = mockContainerRequestContext;
-
- String actualRequestId = eventListener.getRequestId();
- assertThat(actualRequestId)
- .as("Expected requestId '" + expectedRequestId + "' but got '" +
actualRequestId + "'")
- .isEqualTo(expectedRequestId);
- }
-
- @Test
- public void testRequestIdFunctionalityWithoutContainerRequestContext() {
- // Test when containerRequestContext is null
- try {
- java.lang.reflect.Field field =
- InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField(
- "containerRequestContext");
- field.setAccessible(true);
- field.set(eventListener, null);
- } catch (Exception e) {
- throw new RuntimeException("Failed to set containerRequestContext
field", e);
- }
-
- String requestId1 = eventListener.getRequestId();
- String requestId2 = eventListener.getRequestId();
-
- assertThat(requestId1 == null).isTrue();
- assertThat(requestId2 == null).isTrue();
- }
-
- @Test
- public void
testRequestIdFunctionalityWithContainerRequestContextButNoProperty() {
- // Test when containerRequestContext exists but doesn't have requestId
property
- ContainerRequestContext mockContainerRequestContext =
- Mockito.mock(ContainerRequestContext.class);
-
when(mockContainerRequestContext.hasProperty("requestId")).thenReturn(false);
- eventListener.containerRequestContext = mockContainerRequestContext;
-
- String requestId = eventListener.getRequestId();
-
- assertThat(requestId == null).isTrue();
-
- // Verify that getProperty was never called since hasProperty returned
false
- verify(mockContainerRequestContext, times(0)).getProperty("requestId");
- }
-
- private List<PolarisEvent> addEventsWithoutTriggeringFlush(String realmId) {
- List<PolarisEvent> realmEvents = new ArrayList<>();
- for (int i = 0; i < CONFIG_MAX_BUFFER_SIZE - 1; i++) {
- realmEvents.add(createSampleEvent());
- }
- RealmContext realmContext = () -> realmId;
- when(callContext.getRealmContext()).thenReturn(realmContext);
- for (PolarisEvent realmEvent : realmEvents) {
- eventListener.processEvent(realmEvent);
- }
- verify(polarisMetaStoreManager, times(0)).writeEvents(any(), any());
- return realmEvents;
- }
-
- private PolarisEvent createSampleEvent() {
- String catalogId = "test-catalog";
- String id = UUID.randomUUID().toString();
- String requestId = "test-request-id";
- String eventType = "TEST_EVENT";
- long timestampMs = 0;
- String principalName = "test-user";
- PolarisEvent.ResourceType resourceType = PolarisEvent.ResourceType.TABLE;
- String resourceIdentifier = "test-table";
-
- PolarisEvent event =
- new PolarisEvent(
- catalogId,
- id,
- requestId,
- eventType,
- timestampMs,
- principalName,
- resourceType,
- resourceIdentifier);
-
- Map<String, String> additionalParams = new HashMap<>();
- additionalParams.put("testKey", "testValue");
- event.setAdditionalProperties(additionalParams);
-
- return event;
- }
-}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerBufferSizeTest.java
similarity index 95%
rename from
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java
rename to
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerBufferSizeTest.java
index 1cb76cdcf..7ee12ebc0 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerBufferSizeTest.java
@@ -39,8 +39,8 @@ import org.mockito.Mockito;
@QuarkusTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-@TestProfile(InMemoryEventListenerBufferSizeTest.Profile.class)
-class InMemoryEventListenerBufferSizeTest extends
InMemoryEventListenerTestBase {
+@TestProfile(InMemoryBufferEventListenerBufferSizeTest.Profile.class)
+class InMemoryBufferEventListenerBufferSizeTest extends
InMemoryBufferEventListenerTestBase {
public static class Profile implements QuarkusTestProfile {
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerBufferTimeTest.java
similarity index 91%
rename from
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java
rename to
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerBufferTimeTest.java
index 8431274ea..3a6c7210e 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerBufferTimeTest.java
@@ -29,8 +29,8 @@ import org.junit.jupiter.api.TestInstance;
@QuarkusTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-@TestProfile(InMemoryEventListenerBufferTimeTest.Profile.class)
-class InMemoryEventListenerBufferTimeTest extends
InMemoryEventListenerTestBase {
+@TestProfile(InMemoryBufferEventListenerBufferTimeTest.Profile.class)
+class InMemoryBufferEventListenerBufferTimeTest extends
InMemoryBufferEventListenerTestBase {
public static class Profile implements QuarkusTestProfile {
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerTestBase.java
similarity index 92%
rename from
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java
rename to
runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerTestBase.java
index 527491b56..d86fb44e8 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryBufferEventListenerTestBase.java
@@ -42,7 +42,7 @@ import org.apache.polaris.core.entity.PolarisEvent;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.junit.jupiter.api.AfterEach;
-abstract class InMemoryEventListenerTestBase {
+abstract class InMemoryBufferEventListenerTestBase {
static final Map<String, String> BASE_CONFIG =
ImmutableMap.<String, String>builder()
@@ -53,18 +53,18 @@ abstract class InMemoryEventListenerTestBase {
.put(
"quarkus.datasource.jdbc.url",
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE")
- .put("polaris.event-listener.type", "persistence-in-memory")
+ .put("polaris.event-listener.type", "persistence-in-memory-buffer")
.put(
-
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryEventListener/flush\".retry.max-retries",
+
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryBufferEventListener/flush\".retry.max-retries",
"1")
.put(
-
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryEventListener/flush\".retry.delay",
+
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryBufferEventListener/flush\".retry.delay",
"10")
.build();
@Inject
- @Identifier("persistence-in-memory")
- InMemoryEventListener producer;
+ @Identifier("persistence-in-memory-buffer")
+ InMemoryBufferEventListener producer;
@InjectSpy
@Identifier("relational-jdbc")