adnanhemani commented on code in PR #1844:
URL: https://github.com/apache/polaris/pull/1844#discussion_r2258598256


##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableCreatedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {

Review Comment:
   > so either the name is wrong or the type is not in the right package?
   
   Personally, I don't think either of these fragments are correct. This is an 
Event Listener implementation, which stores the event into the Persistence. 
"Persistence" here refers to what outcome the Event Listener achieves for 
events that are going through this implementation.
   
   If you don't find this clear, I'm happy to rename to 
`PolarisPersistenceSinkEventListener`, although I personally find that just 
having more words and not being any more clear than the current name.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import java.util.Map;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableCreatedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+public abstract class PolarisPersistenceEventListener extends 
PolarisEventListener {
+  @Override
+  public final void onBeforeRequestRateLimited(
+      BeforeRequestRateLimitedEvent event, CallContext callCtx, 
SecurityContext securityContext) {}
+
+  @Override
+  public void onBeforeTableCommited(
+      BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableCommited(
+      AfterTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeViewCommited(
+      BeforeViewCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterViewCommited(
+      AfterViewCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeTableRefreshed(
+      BeforeTableRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableRefreshed(
+      AfterTableRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeViewRefreshed(
+      BeforeViewRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterViewRefreshed(
+      AfterViewRefreshedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event, 
CallContext callCtx) {}
+
+  @Override
+  public void onAfterTaskAttempted(AfterTaskAttemptedEvent event, CallContext 
callCtx) {}
+
+  @Override
+  public void onBeforeTableCreated(
+      BeforeTableCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  @Override
+  public void onAfterTableCreated(
+      AfterTableCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new org.apache.polaris.core.entity.PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(callCtx),
+            event.getClass().getSimpleName(),
+            getTimestamp(callCtx),
+            getUsername(securityContext),
+            PolarisEvent.ResourceType.TABLE,
+            event.identifier().toString());
+    Map<String, String> additionalParameters =
+        Map.of(
+            "table-uuid",
+            event.metadata().uuid(),
+            "metadata",
+            TableMetadataParser.toJson(event.metadata()));
+    polarisEvent.setAdditionalParameters(additionalParameters);
+
+    addToBuffer(polarisEvent, callCtx);
+  }
+
+  @Override
+  public void onAfterCatalogCreated(
+      AfterCatalogCreatedEvent event, CallContext callCtx, SecurityContext 
securityContext) {
+    org.apache.polaris.core.entity.PolarisEvent polarisEvent =
+        new PolarisEvent(
+            event.catalogName(),
+            event.eventId(),
+            getRequestId(callCtx),
+            event.getClass().getSimpleName(),
+            getTimestamp(callCtx),
+            getUsername(securityContext),
+            PolarisEvent.ResourceType.CATALOG,
+            event.catalogName());
+    addToBuffer(polarisEvent, callCtx);
+  }
+
+  private long getTimestamp(CallContext callCtx) {
+    return callCtx.getPolarisCallContext().getClock().millis();
+  }
+
+  private String getRequestId(CallContext callCtx) {
+    return callCtx.getPolarisCallContext().getRequestId();
+  }
+
+  private String getUsername(SecurityContext securityContext) {

Review Comment:
   Okay, that's a fair point. I will rename the function to `getPrincipalName`.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
+
+public class PolarisEvent {
+  public static final String EMPTY_MAP_STRING = "{}";
+
+  // to serialize/deserialize properties
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  // catalog id
+  private String catalogId;
+
+  // event id
+  private String id;
+
+  // id of the request that generated this event
+  private String requestId;
+
+  // event type that was fired
+  private String eventType;
+
+  // timestamp in epoch milliseconds of when this event was emitted
+  private long timestampMs;
+
+  // polaris principal who took this action
+  private String principalName;
+
+  // Enum that states the type of resource was being operated on
+  private ResourceType resourceType;
+
+  // Which resource was operated on
+  private String resourceIdentifier;
+
+  // Additional parameters that were not earlier recorded
+  private String additionalProperties;

Review Comment:
   So then what is the preferred way to note down properties of an event that 
are not standard for all event types? I believe we are still doing this in 
other parts of the repo as well...



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferPersistenceListenerConfiguration 
eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse(Duration.of(30, 
ChronoUnit.SECONDS));
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+    String realmId = callCtx.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callCtx.getPolarisCallContext().copy()));
+    futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId)), 1);
+  }
+
+  @VisibleForTesting
+  public void checkAndFlushBufferIfNecessary(String realmId) {
+    ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId);
+    if (queue == null || queue.isEmpty()) {
+      return;
+    }
+
+    // Given that we are using a ConcurrentLinkedQueue, this should not lock 
any calls to `add` on
+    // the queue.
+    synchronized (queue) {

Review Comment:
   Removed.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferPersistenceListenerConfiguration 
eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse(Duration.of(30, 
ChronoUnit.SECONDS));
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();

Review Comment:
   If you're talking about `executor.shutdownNow()`, then I don't see any 
errors that this will throw that would be relevant to our usage. Can you 
explain further?



##########
runtime/defaults/src/main/resources/application.properties:
##########
@@ -109,23 +109,29 @@ polaris.realm-context.header-name=Polaris-Realm
 polaris.realm-context.require-header=false
 
 
polaris.features."ENFORCE_PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_CHECKING"=false
-polaris.features."SUPPORTED_CATALOG_STORAGE_TYPES"=["S3","GCS","AZURE"]
+polaris.features."SUPPORTED_CATALOG_STORAGE_TYPES"=["S3","GCS","AZURE","FILE"]
+polaris.features."ALLOW_INSECURE_STORAGE_TYPES"=true
+polaris.readiness.ignore-severe-issues=true

Review Comment:
   My apologies on this - was using this for testing and did not realize that 
it got committed. Reverted.



##########
runtime/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisInMemoryBufferEventListenerConfiguration.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.quarkus.events;
+
+import io.quarkus.runtime.annotations.StaticInitSafe;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import io.smallrye.config.WithName;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.time.Duration;
+import java.util.Optional;
+import 
org.apache.polaris.service.events.listeners.InMemoryBufferPersistenceListenerConfiguration;
+
+@StaticInitSafe

Review Comment:
   Without this, we are getting the error: 
   
   ```
   Caused by: io.smallrye.config.ConfigValidationException: Configuration 
validation failed:
        SRCFG00050: 
polaris.event-listener.persistence-in-memory-buffer.buffer-time in 
PropertiesConfigSource[source=file:/Users/ahemani/Development/polaris/runtime/defaults/build/resources/main/application.properties]:138
 does not map to any root
        SRCFG00050: 
polaris.event-listener.persistence-in-memory-buffer.max-buffer-size in 
PropertiesConfigSource[source=file:/Users/ahemani/Development/polaris/runtime/defaults/build/resources/main/application.properties]:139
 does not map to any root
   ```
   
   I am not a Quarkus expert - any idea what we can do to get around this?



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();

Review Comment:
   There's no such thing as a `ConcurrentHashSet` so I've added the `Integer` 
as a placeholder.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Map;
+
+public class PolarisEvent {
+  public static final String EMPTY_MAP_STRING = "{}";
+
+  // to serialize/deserialize properties
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  // catalog id
+  private String catalogId;
+
+  // event id
+  private String id;
+
+  // id of the request that generated this event
+  private String requestId;
+
+  // event type that was fired
+  private String eventType;
+
+  // timestamp in epoch milliseconds of when this event was emitted
+  private long timestampMs;
+
+  // polaris principal who took this action
+  private String principalName;
+
+  // Enum that states the type of resource was being operated on
+  private ResourceType resourceType;
+
+  // Which resource was operated on
+  private String resourceIdentifier;
+
+  // Additional parameters that were not earlier recorded
+  private String additionalProperties;
+
+  public String getCatalogId() {
+    return catalogId;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public String getRequestId() {
+    return requestId;
+  }
+
+  public String getEventType() {
+    return eventType;
+  }
+
+  public long getTimestampMs() {
+    return timestampMs;
+  }
+
+  public String getPrincipalName() {
+    return principalName;
+  }
+
+  public ResourceType getResourceType() {
+    return resourceType;
+  }
+
+  public String getResourceIdentifier() {
+    return resourceIdentifier;
+  }
+
+  public String getAdditionalProperties() {
+    return additionalProperties != null ? additionalProperties : 
EMPTY_MAP_STRING;
+  }
+
+  public PolarisEvent(
+      String catalogId,
+      String id,
+      String requestId,
+      String eventType,
+      long timestampMs,
+      String actor,
+      ResourceType resourceType,
+      String resourceIdentifier) {
+    this.catalogId = catalogId;
+    this.id = id;
+    this.requestId = requestId;
+    this.eventType = eventType;
+    this.timestampMs = timestampMs;
+    this.principalName = actor;
+    this.resourceType = resourceType;
+    this.resourceIdentifier = resourceIdentifier;
+  }
+
+  @JsonIgnore
+  public void setAdditionalProperties(Map<String, String> properties) {
+    try {
+      this.additionalProperties = properties == null ? null : 
MAPPER.writeValueAsString(properties);
+    } catch (JsonProcessingException ex) {
+      throw new IllegalStateException(
+          String.format("Failed to serialize json. properties %s", 
properties), ex);
+    }
+  }
+
+  public void setAdditionalProperties(String additionalProperties) {
+    this.additionalProperties = additionalProperties;
+  }
+
+  public enum ResourceType {
+    CATALOG,
+    NAMESPACE,
+    TABLE,
+    VIEW
+  }

Review Comment:
   From the perspective of events, I'm not sure that noting exactly which types 
of tables it is will matter. But even if it does, we can add new values to this 
enum and the persistence should be able to handle new enum values. I'm not sure 
what the implication that you are hinting at is here.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();

Review Comment:
   That's correct, it only starts the thread but without any work submitted. I 
can move this initializer into the constructor.
   
   But moving this into the `@PostConstruct` will need to remove the `final` 
qualifier. Let me know if the refactor does not satisfy you.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;

Review Comment:
   Can you then please suggest a better idea to extract the Request ID from the 
`containerRequestContext`? IIUC, the only other option becomes then to ask for 
it whenever the PolarisEvent is being created - and that is quite unclean as 
well.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+/**
+ * Represents an event listener that can respond to notable moments during 
Polaris's execution.
+ * Event details are documented under the event objects themselves.
+ */
+public abstract class PolarisEventListener {
+
+  /** {@link BeforeRequestRateLimitedEvent} */
+  public void onBeforeRequestRateLimited(
+      BeforeRequestRateLimitedEvent event, SecurityContext securityContext) {}
+
+  /** {@link BeforeTableCommitedEvent} */
+  public void onBeforeTableCommited(
+      BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}
+
+  /** {@link AfterTableCommitedEvent} */
+  public void onAfterTableCommited(
+      AfterTableCommitedEvent event, CallContext callCtx, SecurityContext 
securityContext) {}

Review Comment:
   Rewrote this in the following revision. We are not persisting the 
SecurityContext itself - just using it to extract information we need.



##########
runtime/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisInMemoryBufferEventListenerConfiguration.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.quarkus.events;
+
+import io.quarkus.runtime.annotations.StaticInitSafe;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import io.smallrye.config.WithName;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.time.Duration;
+import java.util.Optional;
+import 
org.apache.polaris.service.events.listeners.InMemoryBufferPersistenceListenerConfiguration;
+
+@StaticInitSafe
+@ConfigMapping(prefix = "polaris.event-listener.persistence-in-memory-buffer")
+@ApplicationScoped
+public interface QuarkusPolarisInMemoryBufferEventListenerConfiguration
+    extends InMemoryBufferPersistenceListenerConfiguration {
+  /**
+   * @return the buffer time in milliseconds
+   */
+  @Override
+  @WithName("buffer-time")
+  @WithDefault("5000ms")
+  Optional<Duration> bufferTime();
+
+  /**
+   * @return the maximum number of cached entries
+   */
+  @Override
+  @WithName("max-buffer-size")
+  @WithDefault("5")
+  Optional<Integer> maxBufferSize();

Review Comment:
   Not sure I see support for `OptionalInt` with this type of Quarkus 
configuration mapping. Nevertheless, I don't see a good reason to use 
`OptionalInt` for this use case - the minor differences between the two 
implementations will not come into play here. Will leave as-is for stylistic 
purposes (all other `Optional` fields in Quarkus Configuration Mappings use the 
Java Optional class).



##########
service/common/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import jakarta.ws.rs.container.ContainerRequestContext;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.threeten.extra.MutableClock;
+
+public class InMemoryBufferPolarisPersistenceEventListenerTest {
+  private InMemoryBufferPolarisPersistenceEventListener eventListener;
+  private PolarisMetaStoreManager polarisMetaStoreManager;
+  private MutableClock clock;
+  private CallContext callContext;
+
+  private static final int CONFIG_MAX_BUFFER_SIZE = 5;
+  private static final Duration CONFIG_TIME_TO_FLUSH_IN_MS = 
Duration.ofMillis(500);
+
+  @BeforeEach
+  public void setUp() {
+    callContext = Mockito.mock(CallContext.class);
+    PolarisCallContext polarisCallContext = 
Mockito.mock(PolarisCallContext.class);
+    when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext);
+    when(polarisCallContext.copy()).thenReturn(polarisCallContext);
+
+    MetaStoreManagerFactory metaStoreManagerFactory = 
Mockito.mock(MetaStoreManagerFactory.class);
+    polarisMetaStoreManager = Mockito.mock(PolarisMetaStoreManager.class);
+    when(metaStoreManagerFactory.getOrCreateMetaStoreManager(Mockito.any()))
+        .thenReturn(polarisMetaStoreManager);
+
+    InMemoryBufferPersistenceListenerConfiguration eventListenerConfiguration =
+        Mockito.mock(InMemoryBufferPersistenceListenerConfiguration.class);
+    when(eventListenerConfiguration.maxBufferSize())
+        .thenReturn(Optional.of(CONFIG_MAX_BUFFER_SIZE));
+    when(eventListenerConfiguration.bufferTime())
+        .thenReturn(Optional.of(CONFIG_TIME_TO_FLUSH_IN_MS));
+
+    clock =
+        MutableClock.of(
+            Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to 
make it easier to test
+
+    eventListener =
+        new InMemoryBufferPolarisPersistenceEventListener(
+            metaStoreManagerFactory, clock, eventListenerConfiguration);
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterConfiguredTime() {
+    String realmId = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realmId);
+
+    // Push clock forwards to flush the buffer
+    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
+    eventListener.checkAndFlushBufferIfNecessary(realmId);
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBuffer));
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterMaxEvents() {
+    String realm1 = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realm1);
+    List<PolarisEvent> eventsAddedToBufferRealm2 = 
addEventsWithoutTriggeringFlush("realm2");
+
+    // Add the last event for realm1 and verify that it did trigger the flush
+    PolarisEvent triggeringEvent = createSampleEvent();
+    RealmContext realmContext = () -> realm1;
+    when(callContext.getRealmContext()).thenReturn(realmContext);
+    eventListener.addToBuffer(triggeringEvent, callContext);
+    eventsAddedToBuffer.add(triggeringEvent);
+
+    // Calling checkAndFlushBufferIfNecessary manually to replicate the 
behavior of the executor
+    // service
+    eventListener.checkAndFlushBufferIfNecessary(realm1);
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBuffer));
+    verify(polarisMetaStoreManager, times(0))
+        .writeEvents(eq(callContext.getPolarisCallContext()), 
eq(eventsAddedToBufferRealm2));
+  }
+
+  @Test
+  public void testCheckAndFlushBufferIfNecessaryIsThreadSafe() throws 
Exception {
+    String realmId = "realm1";
+    int threadCount = 10;
+    List<Thread> threads = new ArrayList<>();
+    ConcurrentLinkedQueue<Exception> exceptions = new 
ConcurrentLinkedQueue<>();
+
+    // Pre-populate the buffer with events
+    List<PolarisEvent> events = addEventsWithoutTriggeringFlush(realmId);
+
+    // Push clock forwards to flush the buffer
+    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2));
+
+    // Each thread will call checkAndFlushBufferIfNecessary concurrently
+    for (int i = 0; i < threadCount; i++) {
+      Thread t =
+          new Thread(
+              () -> {
+                try {
+                  eventListener.checkAndFlushBufferIfNecessary(realmId);
+                } catch (Exception e) {
+                  exceptions.add(e);
+                }
+              });
+      threads.add(t);
+    }
+    // Start all threads
+    threads.forEach(Thread::start);
+    // Wait for all threads to finish
+    for (Thread t : threads) {
+      t.join();
+    }
+    // There should be no exceptions
+    if (!exceptions.isEmpty()) {
+      throw new AssertionError(
+          "Exceptions occurred in concurrent checkAndFlushBufferIfNecessary: 
", exceptions.peek());
+    }
+    // Only one flush should occur
+    verify(polarisMetaStoreManager, times(1))
+        .writeEvents(eq(callContext.getPolarisCallContext()), eq(events));
+  }
+
+  @Test
+  public void testRequestIdFunctionalityWithContainerRequestContext() {
+    // Test when containerRequestContext has requestId property
+    ContainerRequestContext mockContainerRequestContext =
+        Mockito.mock(ContainerRequestContext.class);
+    String expectedRequestId = "custom-request-id-123";
+
+    
when(mockContainerRequestContext.hasProperty("requestId")).thenReturn(true);
+    
when(mockContainerRequestContext.getProperty("requestId")).thenReturn(expectedRequestId);
+
+    // Use reflection to set the containerRequestContext field
+    try {
+      java.lang.reflect.Field field =
+          InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField(
+              "containerRequestContext");
+      field.setAccessible(true);
+      field.set(eventListener, mockContainerRequestContext);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to set containerRequestContext 
field", e);
+    }

Review Comment:
   Unfortunately, there's not a good way to do this with injected/context beans 
unless I add a backdoor into the code that will allow us to manually override 
the beans - which is, in my opinion, less favorable as it modifies server code 
rather than just the test code. Let me know if you have another way to achieve 
this outcome instead.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferPersistenceListenerConfiguration 
eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse(Duration.of(30, 
ChronoUnit.SECONDS));
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));

Review Comment:
   @adutra had earlier commented on what our strategy should be for futures 
when the service is shutdown - citing concerns that incomplete futures may 
interfere with the shutdown process. So I've made it explicit to cancel all 
non-started futures. Futures already started will be given time to complete - I 
will add an explicit `awaitTermination` for this case in the next revision.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferPersistenceListenerConfiguration 
eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse(Duration.of(30, 
ChronoUnit.SECONDS));
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+    String realmId = callCtx.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callCtx.getPolarisCallContext().copy()));
+    futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId)), 1);
+  }
+
+  @VisibleForTesting
+  public void checkAndFlushBufferIfNecessary(String realmId) {

Review Comment:
   Changed to `protected`.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferPersistenceListenerConfiguration 
eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse(Duration.of(30, 
ChronoUnit.SECONDS));
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+    String realmId = callCtx.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callCtx.getPolarisCallContext().copy()));
+    futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId)), 1);
+  }
+
+  @VisibleForTesting
+  public void checkAndFlushBufferIfNecessary(String realmId) {

Review Comment:
   I've added another check to ensure that we're only adding a future if we've 
tripped the maxBufferSize in the next revision.
   
   > The implementation doesn't seem to be resilient against such scenarios, 
like millions of "rate limited" events to bring down the service.
   
   Already discussed above that we will not add "rate limited" events to the 
persistence. If you have any data points to support that this implementation 
will not be able to keep up with a heavy load of other types of events, please 
share explicitly.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.core.Context;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  private static final String REQUEST_ID_KEY = "requestId";
+  private final MetaStoreManagerFactory metaStoreManagerFactory;
+  private final Clock clock;
+
+  private final ConcurrentHashMap<String, 
ConcurrentLinkedQueue<EventAndContext>> buffer =
+      new ConcurrentHashMap<>();
+  private final ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
+  private final ConcurrentHashMap<Future<?>, Integer> futures = new 
ConcurrentHashMap<>();
+  private final Duration timeToFlush;
+  private final int maxBufferSize;
+
+  @Context ContainerRequestContext containerRequestContext;
+
+  private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext 
callContext) {}
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      Clock clock,
+      InMemoryBufferPersistenceListenerConfiguration 
eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse(Duration.of(30, 
ChronoUnit.SECONDS));
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    futures.put(
+        executor.scheduleAtFixedRate(
+            this::runCleanup, 0, timeToFlush.toMillis(), 
TimeUnit.MILLISECONDS),
+        1);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+    // Clean up futures
+    try {
+      futures.keySet().removeIf(future -> future.isCancelled() || 
future.isDone());
+    } catch (Exception e) {
+      LOGGER.debug("Futures reaper task failed.");
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    futures.keySet().forEach(future -> future.cancel(false));
+    executor.shutdownNow();
+  }
+
+  @Override
+  String getRequestId() {
+    if (containerRequestContext != null && 
containerRequestContext.hasProperty(REQUEST_ID_KEY)) {
+      return (String) containerRequestContext.getProperty(REQUEST_ID_KEY);
+    }
+    return UUID.randomUUID().toString();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+    String realmId = callCtx.getRealmContext().getRealmIdentifier();
+
+    buffer
+        .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>())
+        .add(new EventAndContext(polarisEvent, 
callCtx.getPolarisCallContext().copy()));
+    futures.put(executor.submit(() -> 
checkAndFlushBufferIfNecessary(realmId)), 1);
+  }
+
+  @VisibleForTesting
+  public void checkAndFlushBufferIfNecessary(String realmId) {
+    ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId);
+    if (queue == null || queue.isEmpty()) {
+      return;
+    }
+
+    // Given that we are using a ConcurrentLinkedQueue, this should not lock 
any calls to `add` on
+    // the queue.
+    synchronized (queue) {
+      // Double-check inside synchronized block
+      if (queue.isEmpty()) {
+        return;
+      }
+
+      EventAndContext head = queue.peek();
+      if (head == null) {
+        return;
+      }
+
+      Duration elapsed = Duration.ofMillis(clock.millis() - 
head.polarisEvent.getTimestampMs());

Review Comment:
   Not sure why this is the case? `clock.millis()` is relying on the server's 
clock and `head.polarisEvent.getTimestampMs()` was also derived earlier from 
the server's clock as well. Are you implying that the server clock may change 
during the server being up and running?
   
   If so, I'm not sure I know any way to work around that being the case - and 
would think this is a much larger problem than what's in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to