This is an automated email from the ASF dual-hosted git repository. adutra pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new 4e2facbb4 Introduce alternate in-memory buffering event listener (#2574) 4e2facbb4 is described below commit 4e2facbb4b743b4d54c1d39aaf6fc290960eca7e Author: Alexandre Dutra <adu...@apache.org> AuthorDate: Fri Sep 19 11:05:01 2025 +0200 Introduce alternate in-memory buffering event listener (#2574) --- runtime/service/build.gradle.kts | 11 +- ...emoryBufferPolarisPersistenceEventListener.java | 6 +- .../listeners/PolarisPersistenceEventListener.java | 8 +- .../listeners/inmemory/InMemoryEventListener.java | 141 +++++++++++++++++++++ .../InMemoryEventListenerBufferSizeTest.java | 100 +++++++++++++++ .../InMemoryEventListenerBufferTimeTest.java | 54 ++++++++ .../inmemory/InMemoryEventListenerTestBase.java | 117 +++++++++++++++++ 7 files changed, 425 insertions(+), 12 deletions(-) diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts index 0fc787e0d..095a68468 100644 --- a/runtime/service/build.gradle.kts +++ b/runtime/service/build.gradle.kts @@ -129,22 +129,23 @@ dependencies { testImplementation("io.quarkus:quarkus-junit5-mockito") testImplementation("io.quarkus:quarkus-rest-client") testImplementation("io.quarkus:quarkus-rest-client-jackson") + testImplementation("io.quarkus:quarkus-jdbc-h2") + testImplementation("io.rest-assured:rest-assured") + testImplementation(libs.localstack) - testImplementation("org.testcontainers:testcontainers") + + testImplementation(project(":polaris-runtime-test-common")) testImplementation(project(":polaris-container-spec-helper")) testImplementation(libs.threeten.extra) testImplementation(libs.hawkular.agent.prometheus.scraper) - testImplementation(project(":polaris-runtime-test-common")) - - testImplementation("io.quarkus:quarkus-junit5") testImplementation(libs.awaitility) + testImplementation(platform(libs.testcontainers.bom)) testImplementation("org.testcontainers:testcontainers") testImplementation("org.testcontainers:postgresql") - testImplementation("org.postgresql:postgresql") testFixturesImplementation(project(":polaris-core")) testFixturesImplementation(project(":polaris-api-management-model")) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java index 8dc425ad0..481c599bc 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java @@ -127,7 +127,7 @@ public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersis @Nullable @Override - String getRequestId() { + protected String getRequestId() { if (containerRequestContext != null && containerRequestContext.hasProperty(REQUEST_ID_KEY)) { return (String) containerRequestContext.getProperty(REQUEST_ID_KEY); } @@ -135,7 +135,7 @@ public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersis } @Override - void processEvent(PolarisEvent polarisEvent) { + protected void processEvent(PolarisEvent polarisEvent) { String realmId = callContext.getRealmContext().getRealmIdentifier(); ConcurrentLinkedQueueWithApproximateSize<PolarisEvent> realmQueue = @@ -192,7 +192,7 @@ public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersis } @Override - ContextSpecificInformation getContextSpecificInformation() { + protected ContextSpecificInformation getContextSpecificInformation() { return new ContextSpecificInformation( clock.millis(), securityContext.getUserPrincipal() == null diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java index e9d43f003..11771797a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java @@ -90,12 +90,12 @@ public abstract class PolarisPersistenceEventListener extends PolarisEventListen processEvent(polarisEvent); } - protected record ContextSpecificInformation(long timestamp, @Nullable String principalName) {} + public record ContextSpecificInformation(long timestamp, @Nullable String principalName) {} - abstract ContextSpecificInformation getContextSpecificInformation(); + protected abstract ContextSpecificInformation getContextSpecificInformation(); @Nullable - abstract String getRequestId(); + protected abstract String getRequestId(); - abstract void processEvent(PolarisEvent event); + protected abstract void processEvent(PolarisEvent event); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java new file mode 100644 index 000000000..8af66b194 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners.inmemory; + +import static org.apache.polaris.service.logging.LoggingMDCFilter.REQUEST_ID_KEY; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor; +import jakarta.annotation.Nullable; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.SecurityContext; +import java.time.Clock; +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.service.events.listeners.InMemoryBufferEventListenerConfiguration; +import org.apache.polaris.service.events.listeners.PolarisPersistenceEventListener; +import org.eclipse.microprofile.faulttolerance.Fallback; +import org.eclipse.microprofile.faulttolerance.Retry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ApplicationScoped +@Identifier("persistence-in-memory") +public class InMemoryEventListener extends PolarisPersistenceEventListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryEventListener.class); + + @Inject CallContext callContext; + @Inject Clock clock; + @Inject MetaStoreManagerFactory metaStoreManagerFactory; + @Inject InMemoryBufferEventListenerConfiguration configuration; + + @Context SecurityContext securityContext; + @Context ContainerRequestContext requestContext; + + @VisibleForTesting + final LoadingCache<String, UnicastProcessor<PolarisEvent>> processors = + Caffeine.newBuilder() + .expireAfterAccess(Duration.ofHours(1)) + .evictionListener( + (String realmId, UnicastProcessor<?> processor, RemovalCause cause) -> + processor.onComplete()) + .build(this::createProcessor); + + @Override + protected void processEvent(PolarisEvent event) { + var realmId = callContext.getRealmContext().getRealmIdentifier(); + processEvent(realmId, event); + } + + protected void processEvent(String realmId, PolarisEvent event) { + var processor = Objects.requireNonNull(processors.get(realmId)); + processor.onNext(event); + } + + @Override + protected ContextSpecificInformation getContextSpecificInformation() { + var principal = securityContext.getUserPrincipal(); + var principalName = principal == null ? null : principal.getName(); + return new ContextSpecificInformation(clock.millis(), principalName); + } + + @Nullable + @Override + protected String getRequestId() { + return (String) requestContext.getProperty(REQUEST_ID_KEY); + } + + @PreDestroy + public void shutdown() { + processors.asMap().values().forEach(UnicastProcessor::onComplete); + processors.invalidateAll(); // doesn't call the eviction listener + } + + protected UnicastProcessor<PolarisEvent> createProcessor(String realmId) { + UnicastProcessor<PolarisEvent> processor = UnicastProcessor.create(); + processor + .emitOn(Infrastructure.getDefaultWorkerPool()) + .group() + .intoLists() + .of(configuration.maxBufferSize(), configuration.bufferTime()) + .subscribe() + .with(events -> flush(realmId, events), error -> onProcessorError(realmId, error)); + return processor; + } + + @Retry(maxRetries = 5, delay = 1000, jitter = 100) + @Fallback(fallbackMethod = "onFlushError") + protected void flush(String realmId, List<PolarisEvent> events) { + RealmContext realmContext = () -> realmId; + var metaStoreManager = metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext); + var basePersistence = metaStoreManagerFactory.getOrCreateSession(realmContext); + var callContext = new PolarisCallContext(realmContext, basePersistence); + metaStoreManager.writeEvents(callContext, events); + } + + @SuppressWarnings("unused") + protected void onFlushError(String realmId, List<PolarisEvent> events, Throwable error) { + LOGGER.error("Failed to persist {} events for realm '{}'", events.size(), realmId, error); + } + + protected void onProcessorError(String realmId, Throwable error) { + LOGGER.error( + "Unexpected error while processing events for realm '{}'; some events may have been dropped", + realmId, + error); + processors.invalidate(realmId); + } +} diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java new file mode 100644 index 000000000..1cb76cdcf --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners.inmemory; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; + +import com.google.common.collect.ImmutableMap; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor; +import io.smallrye.mutiny.subscription.BackPressureFailure; +import java.util.Map; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.mockito.Mockito; + +@QuarkusTest +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@TestProfile(InMemoryEventListenerBufferSizeTest.Profile.class) +class InMemoryEventListenerBufferSizeTest extends InMemoryEventListenerTestBase { + + public static class Profile implements QuarkusTestProfile { + + @Override + public Map<String, String> getConfigOverrides() { + return ImmutableMap.<String, String>builder() + .putAll(BASE_CONFIG) + .put("polaris.event-listener.persistence-in-memory-buffer.buffer-time", "60s") + .put("polaris.event-listener.persistence-in-memory-buffer.max-buffer-size", "10") + .build(); + } + } + + @Test + void testFlushOnSize() { + sendAsync("test1", 10); + sendAsync("test2", 10); + assertRows("test1", 10); + assertRows("test2", 10); + } + + @Test + void testFlushOnShutdown() { + producer.processEvent("test1", event()); + producer.processEvent("test2", event()); + producer.shutdown(); + assertRows("test1", 1); + assertRows("test2", 1); + } + + @Test + void testFlushFailureRecovery() { + var manager = Mockito.mock(PolarisMetaStoreManager.class); + doReturn(manager).when(metaStoreManagerFactory).getOrCreateMetaStoreManager(any()); + RuntimeException error = new RuntimeException("error"); + doThrow(error) + .doThrow(error) // first batch will give up after 2 attempts + .doThrow(error) + .doCallRealMethod() // second batch will succeed on the 2nd attempt + .when(manager) + .writeEvents(any(), any()); + sendAsync("test1", 20); + assertRows("test1", 10); + } + + @Test + void testProcessorFailureRecovery() { + producer.processEvent("test1", event()); + UnicastProcessor<PolarisEvent> test1 = producer.processors.get("test1"); + assertThat(test1).isNotNull(); + // emulate backpressure error; will drop the event and invalidate the processor + test1.onError(new BackPressureFailure("error")); + // will create a new processor and recover + sendAsync("test1", 10); + assertRows("test1", 10); + } +} diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java new file mode 100644 index 000000000..8431274ea --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners.inmemory; + +import com.google.common.collect.ImmutableMap; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@QuarkusTest +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@TestProfile(InMemoryEventListenerBufferTimeTest.Profile.class) +class InMemoryEventListenerBufferTimeTest extends InMemoryEventListenerTestBase { + + public static class Profile implements QuarkusTestProfile { + + @Override + public Map<String, String> getConfigOverrides() { + return ImmutableMap.<String, String>builder() + .putAll(BASE_CONFIG) + .put("polaris.event-listener.persistence-in-memory-buffer.buffer-time", "100ms") + .put("polaris.event-listener.persistence-in-memory-buffer.max-buffer-size", "1000") + .build(); + } + } + + @Test + void testFlushOnTimeout() { + sendAsync("test1", 5); + sendAsync("test2", 1); + assertRows("test1", 5); + assertRows("test2", 1); + } +} diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java new file mode 100644 index 000000000..527491b56 --- /dev/null +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners.inmemory; + +import static org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.reset; + +import com.google.common.collect.ImmutableMap; +import io.netty.channel.EventLoopGroup; +import io.quarkus.netty.MainEventLoopGroup; +import io.quarkus.test.junit.mockito.InjectSpy; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.inject.Instance; +import jakarta.inject.Inject; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.time.Duration; +import java.util.Map; +import java.util.UUID; +import javax.sql.DataSource; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.junit.jupiter.api.AfterEach; + +abstract class InMemoryEventListenerTestBase { + + static final Map<String, String> BASE_CONFIG = + ImmutableMap.<String, String>builder() + .put("polaris.realm-context.realms", "test1,test2") + .put("polaris.persistence.type", "relational-jdbc") + .put("polaris.persistence.auto-bootstrap-types", "relational-jdbc") + .put("quarkus.datasource.db-kind", "h2") + .put( + "quarkus.datasource.jdbc.url", + "jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE") + .put("polaris.event-listener.type", "persistence-in-memory") + .put( + "quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryEventListener/flush\".retry.max-retries", + "1") + .put( + "quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryEventListener/flush\".retry.delay", + "10") + .build(); + + @Inject + @Identifier("persistence-in-memory") + InMemoryEventListener producer; + + @InjectSpy + @Identifier("relational-jdbc") + @SuppressWarnings("CdiInjectionPointsInspection") + MetaStoreManagerFactory metaStoreManagerFactory; + + @Inject + @MainEventLoopGroup + @SuppressWarnings("CdiInjectionPointsInspection") + EventLoopGroup eventLoopGroup; + + @Inject Instance<DataSource> dataSource; + + @AfterEach + void clearEvents() throws Exception { + reset(metaStoreManagerFactory); + producer.shutdown(); + try (Connection connection = dataSource.get().getConnection(); + Statement statement = connection.createStatement()) { + statement.execute("DELETE FROM polaris_schema.events"); + } + } + + void sendAsync(String realmId, int n) { + for (int i = 0; i < n; i++) { + eventLoopGroup.next().execute(() -> producer.processEvent(realmId, event())); + } + } + + @SuppressWarnings("SqlSourceToSinkFlow") + void assertRows(String realmId, int expected) { + String query = "SELECT COUNT(*) FROM polaris_schema.events WHERE realm_id = '" + realmId + "'"; + await() + .atMost(Duration.ofSeconds(10)) + .untilAsserted( + () -> { + try (Connection connection = dataSource.get().getConnection(); + Statement statement = connection.createStatement(); + ResultSet rs = statement.executeQuery(query)) { + rs.next(); + assertThat(rs.getInt(1)).isEqualTo(expected); + } + }); + } + + static PolarisEvent event() { + String id = UUID.randomUUID().toString(); + return new PolarisEvent("test", id, null, "test", 0, null, CATALOG, "test"); + } +}