This is an automated email from the ASF dual-hosted git repository.

adutra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e2facbb4 Introduce alternate in-memory buffering event listener 
(#2574)
4e2facbb4 is described below

commit 4e2facbb4b743b4d54c1d39aaf6fc290960eca7e
Author: Alexandre Dutra <adu...@apache.org>
AuthorDate: Fri Sep 19 11:05:01 2025 +0200

    Introduce alternate in-memory buffering event listener (#2574)
---
 runtime/service/build.gradle.kts                   |  11 +-
 ...emoryBufferPolarisPersistenceEventListener.java |   6 +-
 .../listeners/PolarisPersistenceEventListener.java |   8 +-
 .../listeners/inmemory/InMemoryEventListener.java  | 141 +++++++++++++++++++++
 .../InMemoryEventListenerBufferSizeTest.java       | 100 +++++++++++++++
 .../InMemoryEventListenerBufferTimeTest.java       |  54 ++++++++
 .../inmemory/InMemoryEventListenerTestBase.java    | 117 +++++++++++++++++
 7 files changed, 425 insertions(+), 12 deletions(-)

diff --git a/runtime/service/build.gradle.kts b/runtime/service/build.gradle.kts
index 0fc787e0d..095a68468 100644
--- a/runtime/service/build.gradle.kts
+++ b/runtime/service/build.gradle.kts
@@ -129,22 +129,23 @@ dependencies {
   testImplementation("io.quarkus:quarkus-junit5-mockito")
   testImplementation("io.quarkus:quarkus-rest-client")
   testImplementation("io.quarkus:quarkus-rest-client-jackson")
+  testImplementation("io.quarkus:quarkus-jdbc-h2")
+
   testImplementation("io.rest-assured:rest-assured")
+
   testImplementation(libs.localstack)
-  testImplementation("org.testcontainers:testcontainers")
+
+  testImplementation(project(":polaris-runtime-test-common"))
   testImplementation(project(":polaris-container-spec-helper"))
 
   testImplementation(libs.threeten.extra)
   testImplementation(libs.hawkular.agent.prometheus.scraper)
 
-  testImplementation(project(":polaris-runtime-test-common"))
-
-  testImplementation("io.quarkus:quarkus-junit5")
   testImplementation(libs.awaitility)
+
   testImplementation(platform(libs.testcontainers.bom))
   testImplementation("org.testcontainers:testcontainers")
   testImplementation("org.testcontainers:postgresql")
-  testImplementation("org.postgresql:postgresql")
 
   testFixturesImplementation(project(":polaris-core"))
   testFixturesImplementation(project(":polaris-api-management-model"))
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
index 8dc425ad0..481c599bc 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java
@@ -127,7 +127,7 @@ public class InMemoryBufferPolarisPersistenceEventListener 
extends PolarisPersis
 
   @Nullable
   @Override
-  String getRequestId() {
+  protected String getRequestId() {
     if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
       return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
     }
@@ -135,7 +135,7 @@ public class InMemoryBufferPolarisPersistenceEventListener 
extends PolarisPersis
   }
 
   @Override
-  void processEvent(PolarisEvent polarisEvent) {
+  protected void processEvent(PolarisEvent polarisEvent) {
     String realmId = callContext.getRealmContext().getRealmIdentifier();
 
     ConcurrentLinkedQueueWithApproximateSize<PolarisEvent> realmQueue =
@@ -192,7 +192,7 @@ public class InMemoryBufferPolarisPersistenceEventListener 
extends PolarisPersis
   }
 
   @Override
-  ContextSpecificInformation getContextSpecificInformation() {
+  protected ContextSpecificInformation getContextSpecificInformation() {
     return new ContextSpecificInformation(
         clock.millis(),
         securityContext.getUserPrincipal() == null
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java
index e9d43f003..11771797a 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java
@@ -90,12 +90,12 @@ public abstract class PolarisPersistenceEventListener 
extends PolarisEventListen
     processEvent(polarisEvent);
   }
 
-  protected record ContextSpecificInformation(long timestamp, @Nullable String 
principalName) {}
+  public record ContextSpecificInformation(long timestamp, @Nullable String 
principalName) {}
 
-  abstract ContextSpecificInformation getContextSpecificInformation();
+  protected abstract ContextSpecificInformation 
getContextSpecificInformation();
 
   @Nullable
-  abstract String getRequestId();
+  protected abstract String getRequestId();
 
-  abstract void processEvent(PolarisEvent event);
+  protected abstract void processEvent(PolarisEvent event);
 }
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java
new file mode 100644
index 000000000..8af66b194
--- /dev/null
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListener.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners.inmemory;
+
+import static 
org.apache.polaris.service.logging.LoggingMDCFilter.REQUEST_ID_KEY;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
+import jakarta.annotation.Nullable;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import jakarta.ws.rs.core.SecurityContext;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import 
org.apache.polaris.service.events.listeners.InMemoryBufferEventListenerConfiguration;
+import 
org.apache.polaris.service.events.listeners.PolarisPersistenceEventListener;
+import org.eclipse.microprofile.faulttolerance.Fallback;
+import org.eclipse.microprofile.faulttolerance.Retry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ApplicationScoped
+@Identifier("persistence-in-memory")
+public class InMemoryEventListener extends PolarisPersistenceEventListener {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryEventListener.class);
+
+  @Inject CallContext callContext;
+  @Inject Clock clock;
+  @Inject MetaStoreManagerFactory metaStoreManagerFactory;
+  @Inject InMemoryBufferEventListenerConfiguration configuration;
+
+  @Context SecurityContext securityContext;
+  @Context ContainerRequestContext requestContext;
+
+  @VisibleForTesting
+  final LoadingCache<String, UnicastProcessor<PolarisEvent>> processors =
+      Caffeine.newBuilder()
+          .expireAfterAccess(Duration.ofHours(1))
+          .evictionListener(
+              (String realmId, UnicastProcessor<?> processor, RemovalCause 
cause) ->
+                  processor.onComplete())
+          .build(this::createProcessor);
+
+  @Override
+  protected void processEvent(PolarisEvent event) {
+    var realmId = callContext.getRealmContext().getRealmIdentifier();
+    processEvent(realmId, event);
+  }
+
+  protected void processEvent(String realmId, PolarisEvent event) {
+    var processor = Objects.requireNonNull(processors.get(realmId));
+    processor.onNext(event);
+  }
+
+  @Override
+  protected ContextSpecificInformation getContextSpecificInformation() {
+    var principal = securityContext.getUserPrincipal();
+    var principalName = principal == null ? null : principal.getName();
+    return new ContextSpecificInformation(clock.millis(), principalName);
+  }
+
+  @Nullable
+  @Override
+  protected String getRequestId() {
+    return (String) requestContext.getProperty(REQUEST_ID_KEY);
+  }
+
+  @PreDestroy
+  public void shutdown() {
+    processors.asMap().values().forEach(UnicastProcessor::onComplete);
+    processors.invalidateAll(); // doesn't call the eviction listener
+  }
+
+  protected UnicastProcessor<PolarisEvent> createProcessor(String realmId) {
+    UnicastProcessor<PolarisEvent> processor = UnicastProcessor.create();
+    processor
+        .emitOn(Infrastructure.getDefaultWorkerPool())
+        .group()
+        .intoLists()
+        .of(configuration.maxBufferSize(), configuration.bufferTime())
+        .subscribe()
+        .with(events -> flush(realmId, events), error -> 
onProcessorError(realmId, error));
+    return processor;
+  }
+
+  @Retry(maxRetries = 5, delay = 1000, jitter = 100)
+  @Fallback(fallbackMethod = "onFlushError")
+  protected void flush(String realmId, List<PolarisEvent> events) {
+    RealmContext realmContext = () -> realmId;
+    var metaStoreManager = 
metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
+    var basePersistence = 
metaStoreManagerFactory.getOrCreateSession(realmContext);
+    var callContext = new PolarisCallContext(realmContext, basePersistence);
+    metaStoreManager.writeEvents(callContext, events);
+  }
+
+  @SuppressWarnings("unused")
+  protected void onFlushError(String realmId, List<PolarisEvent> events, 
Throwable error) {
+    LOGGER.error("Failed to persist {} events for realm '{}'", events.size(), 
realmId, error);
+  }
+
+  protected void onProcessorError(String realmId, Throwable error) {
+    LOGGER.error(
+        "Unexpected error while processing events for realm '{}'; some events 
may have been dropped",
+        realmId,
+        error);
+    processors.invalidate(realmId);
+  }
+}
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java
new file mode 100644
index 000000000..1cb76cdcf
--- /dev/null
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferSizeTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners.inmemory;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+
+import com.google.common.collect.ImmutableMap;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.QuarkusTestProfile;
+import io.quarkus.test.junit.TestProfile;
+import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
+import io.smallrye.mutiny.subscription.BackPressureFailure;
+import java.util.Map;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.mockito.Mockito;
+
+@QuarkusTest
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestProfile(InMemoryEventListenerBufferSizeTest.Profile.class)
+class InMemoryEventListenerBufferSizeTest extends 
InMemoryEventListenerTestBase {
+
+  public static class Profile implements QuarkusTestProfile {
+
+    @Override
+    public Map<String, String> getConfigOverrides() {
+      return ImmutableMap.<String, String>builder()
+          .putAll(BASE_CONFIG)
+          
.put("polaris.event-listener.persistence-in-memory-buffer.buffer-time", "60s")
+          
.put("polaris.event-listener.persistence-in-memory-buffer.max-buffer-size", 
"10")
+          .build();
+    }
+  }
+
+  @Test
+  void testFlushOnSize() {
+    sendAsync("test1", 10);
+    sendAsync("test2", 10);
+    assertRows("test1", 10);
+    assertRows("test2", 10);
+  }
+
+  @Test
+  void testFlushOnShutdown() {
+    producer.processEvent("test1", event());
+    producer.processEvent("test2", event());
+    producer.shutdown();
+    assertRows("test1", 1);
+    assertRows("test2", 1);
+  }
+
+  @Test
+  void testFlushFailureRecovery() {
+    var manager = Mockito.mock(PolarisMetaStoreManager.class);
+    
doReturn(manager).when(metaStoreManagerFactory).getOrCreateMetaStoreManager(any());
+    RuntimeException error = new RuntimeException("error");
+    doThrow(error)
+        .doThrow(error) // first batch will give up after 2 attempts
+        .doThrow(error)
+        .doCallRealMethod() // second batch will succeed on the 2nd attempt
+        .when(manager)
+        .writeEvents(any(), any());
+    sendAsync("test1", 20);
+    assertRows("test1", 10);
+  }
+
+  @Test
+  void testProcessorFailureRecovery() {
+    producer.processEvent("test1", event());
+    UnicastProcessor<PolarisEvent> test1 = producer.processors.get("test1");
+    assertThat(test1).isNotNull();
+    // emulate backpressure error; will drop the event and invalidate the 
processor
+    test1.onError(new BackPressureFailure("error"));
+    // will create a new processor and recover
+    sendAsync("test1", 10);
+    assertRows("test1", 10);
+  }
+}
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java
new file mode 100644
index 000000000..8431274ea
--- /dev/null
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerBufferTimeTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners.inmemory;
+
+import com.google.common.collect.ImmutableMap;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.QuarkusTestProfile;
+import io.quarkus.test.junit.TestProfile;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@QuarkusTest
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestProfile(InMemoryEventListenerBufferTimeTest.Profile.class)
+class InMemoryEventListenerBufferTimeTest extends 
InMemoryEventListenerTestBase {
+
+  public static class Profile implements QuarkusTestProfile {
+
+    @Override
+    public Map<String, String> getConfigOverrides() {
+      return ImmutableMap.<String, String>builder()
+          .putAll(BASE_CONFIG)
+          
.put("polaris.event-listener.persistence-in-memory-buffer.buffer-time", "100ms")
+          
.put("polaris.event-listener.persistence-in-memory-buffer.max-buffer-size", 
"1000")
+          .build();
+    }
+  }
+
+  @Test
+  void testFlushOnTimeout() {
+    sendAsync("test1", 5);
+    sendAsync("test2", 1);
+    assertRows("test1", 5);
+    assertRows("test2", 1);
+  }
+}
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java
new file mode 100644
index 000000000..527491b56
--- /dev/null
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/inmemory/InMemoryEventListenerTestBase.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners.inmemory;
+
+import static org.apache.polaris.core.entity.PolarisEvent.ResourceType.CATALOG;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.reset;
+
+import com.google.common.collect.ImmutableMap;
+import io.netty.channel.EventLoopGroup;
+import io.quarkus.netty.MainEventLoopGroup;
+import io.quarkus.test.junit.mockito.InjectSpy;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Map;
+import java.util.UUID;
+import javax.sql.DataSource;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.junit.jupiter.api.AfterEach;
+
+abstract class InMemoryEventListenerTestBase {
+
+  static final Map<String, String> BASE_CONFIG =
+      ImmutableMap.<String, String>builder()
+          .put("polaris.realm-context.realms", "test1,test2")
+          .put("polaris.persistence.type", "relational-jdbc")
+          .put("polaris.persistence.auto-bootstrap-types", "relational-jdbc")
+          .put("quarkus.datasource.db-kind", "h2")
+          .put(
+              "quarkus.datasource.jdbc.url",
+              
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;MODE=PostgreSQL;DATABASE_TO_LOWER=TRUE")
+          .put("polaris.event-listener.type", "persistence-in-memory")
+          .put(
+              
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryEventListener/flush\".retry.max-retries",
+              "1")
+          .put(
+              
"quarkus.fault-tolerance.\"org.apache.polaris.service.events.listeners.inmemory.InMemoryEventListener/flush\".retry.delay",
+              "10")
+          .build();
+
+  @Inject
+  @Identifier("persistence-in-memory")
+  InMemoryEventListener producer;
+
+  @InjectSpy
+  @Identifier("relational-jdbc")
+  @SuppressWarnings("CdiInjectionPointsInspection")
+  MetaStoreManagerFactory metaStoreManagerFactory;
+
+  @Inject
+  @MainEventLoopGroup
+  @SuppressWarnings("CdiInjectionPointsInspection")
+  EventLoopGroup eventLoopGroup;
+
+  @Inject Instance<DataSource> dataSource;
+
+  @AfterEach
+  void clearEvents() throws Exception {
+    reset(metaStoreManagerFactory);
+    producer.shutdown();
+    try (Connection connection = dataSource.get().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("DELETE FROM polaris_schema.events");
+    }
+  }
+
+  void sendAsync(String realmId, int n) {
+    for (int i = 0; i < n; i++) {
+      eventLoopGroup.next().execute(() -> producer.processEvent(realmId, 
event()));
+    }
+  }
+
+  @SuppressWarnings("SqlSourceToSinkFlow")
+  void assertRows(String realmId, int expected) {
+    String query = "SELECT COUNT(*) FROM polaris_schema.events WHERE realm_id 
= '" + realmId + "'";
+    await()
+        .atMost(Duration.ofSeconds(10))
+        .untilAsserted(
+            () -> {
+              try (Connection connection = dataSource.get().getConnection();
+                  Statement statement = connection.createStatement();
+                  ResultSet rs = statement.executeQuery(query)) {
+                rs.next();
+                assertThat(rs.getInt(1)).isEqualTo(expected);
+              }
+            });
+  }
+
+  static PolarisEvent event() {
+    String id = UUID.randomUUID().toString();
+    return new PolarisEvent("test", id, null, "test", 0, null, CATALOG, 
"test");
+  }
+}

Reply via email to