adnanhemani commented on code in PR #1844: URL: https://github.com/apache/polaris/pull/1844#discussion_r2258598256
########## service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import jakarta.ws.rs.core.SecurityContext; +import java.util.Map; +import org.apache.iceberg.TableMetadataParser; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableCreatedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +public abstract class PolarisPersistenceEventListener extends PolarisEventListener { Review Comment: > so either the name is wrong or the type is not in the right package? Personally, I don't think either of these fragments are correct. This is an Event Listener implementation, which stores the event into the Persistence. "Persistence" here refers to what outcome the Event Listener achieves for events that are going through this implementation. If you don't find this clear, I'm happy to rename to `PolarisPersistenceSinkEventListener`, although I personally find that just having more words and not being any more clear than the current name. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisPersistenceEventListener.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import jakarta.ws.rs.core.SecurityContext; +import java.util.Map; +import org.apache.iceberg.TableMetadataParser; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableCreatedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +public abstract class PolarisPersistenceEventListener extends PolarisEventListener { + @Override + public final void onBeforeRequestRateLimited( + BeforeRequestRateLimitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeTableCommited( + BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterTableCommited( + AfterTableCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeViewCommited( + BeforeViewCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterViewCommited( + AfterViewCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeTableRefreshed( + BeforeTableRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterTableRefreshed( + AfterTableRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeViewRefreshed( + BeforeViewRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterViewRefreshed( + AfterViewRefreshedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onBeforeTaskAttempted(BeforeTaskAttemptedEvent event, CallContext callCtx) {} + + @Override + public void onAfterTaskAttempted(AfterTaskAttemptedEvent event, CallContext callCtx) {} + + @Override + public void onBeforeTableCreated( + BeforeTableCreatedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + @Override + public void onAfterTableCreated( + AfterTableCreatedEvent event, CallContext callCtx, SecurityContext securityContext) { + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new org.apache.polaris.core.entity.PolarisEvent( + event.catalogName(), + event.eventId(), + getRequestId(callCtx), + event.getClass().getSimpleName(), + getTimestamp(callCtx), + getUsername(securityContext), + PolarisEvent.ResourceType.TABLE, + event.identifier().toString()); + Map<String, String> additionalParameters = + Map.of( + "table-uuid", + event.metadata().uuid(), + "metadata", + TableMetadataParser.toJson(event.metadata())); + polarisEvent.setAdditionalParameters(additionalParameters); + + addToBuffer(polarisEvent, callCtx); + } + + @Override + public void onAfterCatalogCreated( + AfterCatalogCreatedEvent event, CallContext callCtx, SecurityContext securityContext) { + org.apache.polaris.core.entity.PolarisEvent polarisEvent = + new PolarisEvent( + event.catalogName(), + event.eventId(), + getRequestId(callCtx), + event.getClass().getSimpleName(), + getTimestamp(callCtx), + getUsername(securityContext), + PolarisEvent.ResourceType.CATALOG, + event.catalogName()); + addToBuffer(polarisEvent, callCtx); + } + + private long getTimestamp(CallContext callCtx) { + return callCtx.getPolarisCallContext().getClock().millis(); + } + + private String getRequestId(CallContext callCtx) { + return callCtx.getPolarisCallContext().getRequestId(); + } + + private String getUsername(SecurityContext securityContext) { Review Comment: Okay, that's a fair point. I will rename the function to `getPrincipalName`. ########## polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.entity; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; + +public class PolarisEvent { + public static final String EMPTY_MAP_STRING = "{}"; + + // to serialize/deserialize properties + private static final ObjectMapper MAPPER = new ObjectMapper(); + + // catalog id + private String catalogId; + + // event id + private String id; + + // id of the request that generated this event + private String requestId; + + // event type that was fired + private String eventType; + + // timestamp in epoch milliseconds of when this event was emitted + private long timestampMs; + + // polaris principal who took this action + private String principalName; + + // Enum that states the type of resource was being operated on + private ResourceType resourceType; + + // Which resource was operated on + private String resourceIdentifier; + + // Additional parameters that were not earlier recorded + private String additionalProperties; Review Comment: So then what is the preferred way to note down properties of an event that are not standard for all event types? I believe we are still doing this in other parts of the repo as well... ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import java.time.Clock; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final Clock clock; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferPersistenceListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = + eventListenerConfiguration.bufferTime().orElse(Duration.of(30, ChronoUnit.SECONDS)); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); // 5 events default + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + // Clean up futures + try { + futures.keySet().removeIf(future -> future.isCancelled() || future.isDone()); + } catch (Exception e) { + LOGGER.debug("Futures reaper task failed."); + } + } + + @PreDestroy + void shutdown() { + futures.keySet().forEach(future -> future.cancel(false)); + executor.shutdownNow(); + } + + @Override + String getRequestId() { + if (containerRequestContext != null && containerRequestContext.hasProperty(REQUEST_ID_KEY)) { + return (String) containerRequestContext.getProperty(REQUEST_ID_KEY); + } + return UUID.randomUUID().toString(); + } + + @Override + void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) { + String realmId = callCtx.getRealmContext().getRealmIdentifier(); + + buffer + .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>()) + .add(new EventAndContext(polarisEvent, callCtx.getPolarisCallContext().copy())); + futures.put(executor.submit(() -> checkAndFlushBufferIfNecessary(realmId)), 1); + } + + @VisibleForTesting + public void checkAndFlushBufferIfNecessary(String realmId) { + ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId); + if (queue == null || queue.isEmpty()) { + return; + } + + // Given that we are using a ConcurrentLinkedQueue, this should not lock any calls to `add` on + // the queue. + synchronized (queue) { Review Comment: Removed. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import java.time.Clock; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final Clock clock; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferPersistenceListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = + eventListenerConfiguration.bufferTime().orElse(Duration.of(30, ChronoUnit.SECONDS)); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); // 5 events default + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + // Clean up futures + try { + futures.keySet().removeIf(future -> future.isCancelled() || future.isDone()); + } catch (Exception e) { + LOGGER.debug("Futures reaper task failed."); + } + } + + @PreDestroy + void shutdown() { + futures.keySet().forEach(future -> future.cancel(false)); + executor.shutdownNow(); Review Comment: If you're talking about `executor.shutdownNow()`, then I don't see any errors that this will throw that would be relevant to our usage. Can you explain further? ########## runtime/defaults/src/main/resources/application.properties: ########## @@ -109,23 +109,29 @@ polaris.realm-context.header-name=Polaris-Realm polaris.realm-context.require-header=false polaris.features."ENFORCE_PRINCIPAL_CREDENTIAL_ROTATION_REQUIRED_CHECKING"=false -polaris.features."SUPPORTED_CATALOG_STORAGE_TYPES"=["S3","GCS","AZURE"] +polaris.features."SUPPORTED_CATALOG_STORAGE_TYPES"=["S3","GCS","AZURE","FILE"] +polaris.features."ALLOW_INSECURE_STORAGE_TYPES"=true +polaris.readiness.ignore-severe-issues=true Review Comment: My apologies on this - was using this for testing and did not realize that it got committed. Reverted. ########## runtime/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisInMemoryBufferEventListenerConfiguration.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.quarkus.events; + +import io.quarkus.runtime.annotations.StaticInitSafe; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import io.smallrye.config.WithName; +import jakarta.enterprise.context.ApplicationScoped; +import java.time.Duration; +import java.util.Optional; +import org.apache.polaris.service.events.listeners.InMemoryBufferPersistenceListenerConfiguration; + +@StaticInitSafe Review Comment: Without this, we are getting the error: ``` Caused by: io.smallrye.config.ConfigValidationException: Configuration validation failed: SRCFG00050: polaris.event-listener.persistence-in-memory-buffer.buffer-time in PropertiesConfigSource[source=file:/Users/ahemani/Development/polaris/runtime/defaults/build/resources/main/application.properties]:138 does not map to any root SRCFG00050: polaris.event-listener.persistence-in-memory-buffer.max-buffer-size in PropertiesConfigSource[source=file:/Users/ahemani/Development/polaris/runtime/defaults/build/resources/main/application.properties]:139 does not map to any root ``` I am not a Quarkus expert - any idea what we can do to get around this? ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import java.time.Clock; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final Clock clock; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); Review Comment: There's no such thing as a `ConcurrentHashSet` so I've added the `Integer` as a placeholder. ########## polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.entity; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; + +public class PolarisEvent { + public static final String EMPTY_MAP_STRING = "{}"; + + // to serialize/deserialize properties + private static final ObjectMapper MAPPER = new ObjectMapper(); + + // catalog id + private String catalogId; + + // event id + private String id; + + // id of the request that generated this event + private String requestId; + + // event type that was fired + private String eventType; + + // timestamp in epoch milliseconds of when this event was emitted + private long timestampMs; + + // polaris principal who took this action + private String principalName; + + // Enum that states the type of resource was being operated on + private ResourceType resourceType; + + // Which resource was operated on + private String resourceIdentifier; + + // Additional parameters that were not earlier recorded + private String additionalProperties; + + public String getCatalogId() { + return catalogId; + } + + public String getId() { + return id; + } + + public String getRequestId() { + return requestId; + } + + public String getEventType() { + return eventType; + } + + public long getTimestampMs() { + return timestampMs; + } + + public String getPrincipalName() { + return principalName; + } + + public ResourceType getResourceType() { + return resourceType; + } + + public String getResourceIdentifier() { + return resourceIdentifier; + } + + public String getAdditionalProperties() { + return additionalProperties != null ? additionalProperties : EMPTY_MAP_STRING; + } + + public PolarisEvent( + String catalogId, + String id, + String requestId, + String eventType, + long timestampMs, + String actor, + ResourceType resourceType, + String resourceIdentifier) { + this.catalogId = catalogId; + this.id = id; + this.requestId = requestId; + this.eventType = eventType; + this.timestampMs = timestampMs; + this.principalName = actor; + this.resourceType = resourceType; + this.resourceIdentifier = resourceIdentifier; + } + + @JsonIgnore + public void setAdditionalProperties(Map<String, String> properties) { + try { + this.additionalProperties = properties == null ? null : MAPPER.writeValueAsString(properties); + } catch (JsonProcessingException ex) { + throw new IllegalStateException( + String.format("Failed to serialize json. properties %s", properties), ex); + } + } + + public void setAdditionalProperties(String additionalProperties) { + this.additionalProperties = additionalProperties; + } + + public enum ResourceType { + CATALOG, + NAMESPACE, + TABLE, + VIEW + } Review Comment: From the perspective of events, I'm not sure that noting exactly which types of tables it is will matter. But even if it does, we can add new values to this enum and the persistence should be able to handle new enum values. I'm not sure what the implication that you are hinting at is here. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import java.time.Clock; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final Clock clock; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); Review Comment: That's correct, it only starts the thread but without any work submitted. I can move this initializer into the constructor. But moving this into the `@PostConstruct` will need to remove the `final` qualifier. Let me know if the refactor does not satisfy you. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import java.time.Clock; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final Clock clock; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Context ContainerRequestContext containerRequestContext; Review Comment: Can you then please suggest a better idea to extract the Request ID from the `containerRequestContext`? IIUC, the only other option becomes then to ask for it whenever the PolarisEvent is being created - and that is quite unclean as well. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java: ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import jakarta.ws.rs.core.SecurityContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +/** + * Represents an event listener that can respond to notable moments during Polaris's execution. + * Event details are documented under the event objects themselves. + */ +public abstract class PolarisEventListener { + + /** {@link BeforeRequestRateLimitedEvent} */ + public void onBeforeRequestRateLimited( + BeforeRequestRateLimitedEvent event, SecurityContext securityContext) {} + + /** {@link BeforeTableCommitedEvent} */ + public void onBeforeTableCommited( + BeforeTableCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} + + /** {@link AfterTableCommitedEvent} */ + public void onAfterTableCommited( + AfterTableCommitedEvent event, CallContext callCtx, SecurityContext securityContext) {} Review Comment: Rewrote this in the following revision. We are not persisting the SecurityContext itself - just using it to extract information we need. ########## runtime/service/src/main/java/org/apache/polaris/service/quarkus/events/QuarkusPolarisInMemoryBufferEventListenerConfiguration.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.quarkus.events; + +import io.quarkus.runtime.annotations.StaticInitSafe; +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import io.smallrye.config.WithName; +import jakarta.enterprise.context.ApplicationScoped; +import java.time.Duration; +import java.util.Optional; +import org.apache.polaris.service.events.listeners.InMemoryBufferPersistenceListenerConfiguration; + +@StaticInitSafe +@ConfigMapping(prefix = "polaris.event-listener.persistence-in-memory-buffer") +@ApplicationScoped +public interface QuarkusPolarisInMemoryBufferEventListenerConfiguration + extends InMemoryBufferPersistenceListenerConfiguration { + /** + * @return the buffer time in milliseconds + */ + @Override + @WithName("buffer-time") + @WithDefault("5000ms") + Optional<Duration> bufferTime(); + + /** + * @return the maximum number of cached entries + */ + @Override + @WithName("max-buffer-size") + @WithDefault("5") + Optional<Integer> maxBufferSize(); Review Comment: Not sure I see support for `OptionalInt` with this type of Quarkus configuration mapping. Nevertheless, I don't see a good reason to use `OptionalInt` for this use case - the minor differences between the two implementations will not come into play here. Will leave as-is for stylistic purposes (all other `Optional` fields in Quarkus Configuration Mappings use the Java Optional class). ########## service/common/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java: ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import jakarta.ws.rs.container.ContainerRequestContext; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.core.persistence.PolarisMetaStoreManager; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.threeten.extra.MutableClock; + +public class InMemoryBufferPolarisPersistenceEventListenerTest { + private InMemoryBufferPolarisPersistenceEventListener eventListener; + private PolarisMetaStoreManager polarisMetaStoreManager; + private MutableClock clock; + private CallContext callContext; + + private static final int CONFIG_MAX_BUFFER_SIZE = 5; + private static final Duration CONFIG_TIME_TO_FLUSH_IN_MS = Duration.ofMillis(500); + + @BeforeEach + public void setUp() { + callContext = Mockito.mock(CallContext.class); + PolarisCallContext polarisCallContext = Mockito.mock(PolarisCallContext.class); + when(callContext.getPolarisCallContext()).thenReturn(polarisCallContext); + when(polarisCallContext.copy()).thenReturn(polarisCallContext); + + MetaStoreManagerFactory metaStoreManagerFactory = Mockito.mock(MetaStoreManagerFactory.class); + polarisMetaStoreManager = Mockito.mock(PolarisMetaStoreManager.class); + when(metaStoreManagerFactory.getOrCreateMetaStoreManager(Mockito.any())) + .thenReturn(polarisMetaStoreManager); + + InMemoryBufferPersistenceListenerConfiguration eventListenerConfiguration = + Mockito.mock(InMemoryBufferPersistenceListenerConfiguration.class); + when(eventListenerConfiguration.maxBufferSize()) + .thenReturn(Optional.of(CONFIG_MAX_BUFFER_SIZE)); + when(eventListenerConfiguration.bufferTime()) + .thenReturn(Optional.of(CONFIG_TIME_TO_FLUSH_IN_MS)); + + clock = + MutableClock.of( + Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to make it easier to test + + eventListener = + new InMemoryBufferPolarisPersistenceEventListener( + metaStoreManagerFactory, clock, eventListenerConfiguration); + } + + @Test + public void testAddToBufferFlushesAfterConfiguredTime() { + String realmId = "realm1"; + List<PolarisEvent> eventsAddedToBuffer = addEventsWithoutTriggeringFlush(realmId); + + // Push clock forwards to flush the buffer + clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2)); + eventListener.checkAndFlushBufferIfNecessary(realmId); + verify(polarisMetaStoreManager, times(1)) + .writeEvents(eq(callContext.getPolarisCallContext()), eq(eventsAddedToBuffer)); + } + + @Test + public void testAddToBufferFlushesAfterMaxEvents() { + String realm1 = "realm1"; + List<PolarisEvent> eventsAddedToBuffer = addEventsWithoutTriggeringFlush(realm1); + List<PolarisEvent> eventsAddedToBufferRealm2 = addEventsWithoutTriggeringFlush("realm2"); + + // Add the last event for realm1 and verify that it did trigger the flush + PolarisEvent triggeringEvent = createSampleEvent(); + RealmContext realmContext = () -> realm1; + when(callContext.getRealmContext()).thenReturn(realmContext); + eventListener.addToBuffer(triggeringEvent, callContext); + eventsAddedToBuffer.add(triggeringEvent); + + // Calling checkAndFlushBufferIfNecessary manually to replicate the behavior of the executor + // service + eventListener.checkAndFlushBufferIfNecessary(realm1); + verify(polarisMetaStoreManager, times(1)) + .writeEvents(eq(callContext.getPolarisCallContext()), eq(eventsAddedToBuffer)); + verify(polarisMetaStoreManager, times(0)) + .writeEvents(eq(callContext.getPolarisCallContext()), eq(eventsAddedToBufferRealm2)); + } + + @Test + public void testCheckAndFlushBufferIfNecessaryIsThreadSafe() throws Exception { + String realmId = "realm1"; + int threadCount = 10; + List<Thread> threads = new ArrayList<>(); + ConcurrentLinkedQueue<Exception> exceptions = new ConcurrentLinkedQueue<>(); + + // Pre-populate the buffer with events + List<PolarisEvent> events = addEventsWithoutTriggeringFlush(realmId); + + // Push clock forwards to flush the buffer + clock.add(CONFIG_TIME_TO_FLUSH_IN_MS.multipliedBy(2)); + + // Each thread will call checkAndFlushBufferIfNecessary concurrently + for (int i = 0; i < threadCount; i++) { + Thread t = + new Thread( + () -> { + try { + eventListener.checkAndFlushBufferIfNecessary(realmId); + } catch (Exception e) { + exceptions.add(e); + } + }); + threads.add(t); + } + // Start all threads + threads.forEach(Thread::start); + // Wait for all threads to finish + for (Thread t : threads) { + t.join(); + } + // There should be no exceptions + if (!exceptions.isEmpty()) { + throw new AssertionError( + "Exceptions occurred in concurrent checkAndFlushBufferIfNecessary: ", exceptions.peek()); + } + // Only one flush should occur + verify(polarisMetaStoreManager, times(1)) + .writeEvents(eq(callContext.getPolarisCallContext()), eq(events)); + } + + @Test + public void testRequestIdFunctionalityWithContainerRequestContext() { + // Test when containerRequestContext has requestId property + ContainerRequestContext mockContainerRequestContext = + Mockito.mock(ContainerRequestContext.class); + String expectedRequestId = "custom-request-id-123"; + + when(mockContainerRequestContext.hasProperty("requestId")).thenReturn(true); + when(mockContainerRequestContext.getProperty("requestId")).thenReturn(expectedRequestId); + + // Use reflection to set the containerRequestContext field + try { + java.lang.reflect.Field field = + InMemoryBufferPolarisPersistenceEventListener.class.getDeclaredField( + "containerRequestContext"); + field.setAccessible(true); + field.set(eventListener, mockContainerRequestContext); + } catch (Exception e) { + throw new RuntimeException("Failed to set containerRequestContext field", e); + } Review Comment: Unfortunately, there's not a good way to do this with injected/context beans unless I add a backdoor into the code that will allow us to manually override the beans - which is, in my opinion, less favorable as it modifies server code rather than just the test code. Let me know if you have another way to achieve this outcome instead. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import java.time.Clock; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final Clock clock; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferPersistenceListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = + eventListenerConfiguration.bufferTime().orElse(Duration.of(30, ChronoUnit.SECONDS)); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); // 5 events default + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + // Clean up futures + try { + futures.keySet().removeIf(future -> future.isCancelled() || future.isDone()); + } catch (Exception e) { + LOGGER.debug("Futures reaper task failed."); + } + } + + @PreDestroy + void shutdown() { + futures.keySet().forEach(future -> future.cancel(false)); Review Comment: @adutra had earlier commented on what our strategy should be for futures when the service is shutdown - citing concerns that incomplete futures may interfere with the shutdown process. So I've made it explicit to cancel all non-started futures. Futures already started will be given time to complete - I will add an explicit `awaitTermination` for this case in the next revision. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import java.time.Clock; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final Clock clock; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferPersistenceListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = + eventListenerConfiguration.bufferTime().orElse(Duration.of(30, ChronoUnit.SECONDS)); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); // 5 events default + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + // Clean up futures + try { + futures.keySet().removeIf(future -> future.isCancelled() || future.isDone()); + } catch (Exception e) { + LOGGER.debug("Futures reaper task failed."); + } + } + + @PreDestroy + void shutdown() { + futures.keySet().forEach(future -> future.cancel(false)); + executor.shutdownNow(); + } + + @Override + String getRequestId() { + if (containerRequestContext != null && containerRequestContext.hasProperty(REQUEST_ID_KEY)) { + return (String) containerRequestContext.getProperty(REQUEST_ID_KEY); + } + return UUID.randomUUID().toString(); + } + + @Override + void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) { + String realmId = callCtx.getRealmContext().getRealmIdentifier(); + + buffer + .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>()) + .add(new EventAndContext(polarisEvent, callCtx.getPolarisCallContext().copy())); + futures.put(executor.submit(() -> checkAndFlushBufferIfNecessary(realmId)), 1); + } + + @VisibleForTesting + public void checkAndFlushBufferIfNecessary(String realmId) { Review Comment: Changed to `protected`. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import java.time.Clock; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final Clock clock; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferPersistenceListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = + eventListenerConfiguration.bufferTime().orElse(Duration.of(30, ChronoUnit.SECONDS)); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); // 5 events default + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + // Clean up futures + try { + futures.keySet().removeIf(future -> future.isCancelled() || future.isDone()); + } catch (Exception e) { + LOGGER.debug("Futures reaper task failed."); + } + } + + @PreDestroy + void shutdown() { + futures.keySet().forEach(future -> future.cancel(false)); + executor.shutdownNow(); + } + + @Override + String getRequestId() { + if (containerRequestContext != null && containerRequestContext.hasProperty(REQUEST_ID_KEY)) { + return (String) containerRequestContext.getProperty(REQUEST_ID_KEY); + } + return UUID.randomUUID().toString(); + } + + @Override + void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) { + String realmId = callCtx.getRealmContext().getRealmIdentifier(); + + buffer + .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>()) + .add(new EventAndContext(polarisEvent, callCtx.getPolarisCallContext().copy())); + futures.put(executor.submit(() -> checkAndFlushBufferIfNecessary(realmId)), 1); + } + + @VisibleForTesting + public void checkAndFlushBufferIfNecessary(String realmId) { Review Comment: I've added another check to ensure that we're only adding a future if we've tripped the maxBufferSize in the next revision. > The implementation doesn't seem to be resilient against such scenarios, like millions of "rate limited" events to bring down the service. Already discussed above that we will not add "rate limited" events to the persistence. If you have any data points to support that this implementation will not be able to keep up with a heavy load of other types of events, please share explicitly. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import java.time.Clock; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + private static final String REQUEST_ID_KEY = "requestId"; + private final MetaStoreManagerFactory metaStoreManagerFactory; + private final Clock clock; + + private final ConcurrentHashMap<String, ConcurrentLinkedQueue<EventAndContext>> buffer = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentHashMap<Future<?>, Integer> futures = new ConcurrentHashMap<>(); + private final Duration timeToFlush; + private final int maxBufferSize; + + @Context ContainerRequestContext containerRequestContext; + + private record EventAndContext(PolarisEvent polarisEvent, PolarisCallContext callContext) {} + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + Clock clock, + InMemoryBufferPersistenceListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.clock = clock; + this.timeToFlush = + eventListenerConfiguration.bufferTime().orElse(Duration.of(30, ChronoUnit.SECONDS)); + this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); // 5 events default + } + + @PostConstruct + void start() { + futures.put( + executor.scheduleAtFixedRate( + this::runCleanup, 0, timeToFlush.toMillis(), TimeUnit.MILLISECONDS), + 1); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + // Clean up futures + try { + futures.keySet().removeIf(future -> future.isCancelled() || future.isDone()); + } catch (Exception e) { + LOGGER.debug("Futures reaper task failed."); + } + } + + @PreDestroy + void shutdown() { + futures.keySet().forEach(future -> future.cancel(false)); + executor.shutdownNow(); + } + + @Override + String getRequestId() { + if (containerRequestContext != null && containerRequestContext.hasProperty(REQUEST_ID_KEY)) { + return (String) containerRequestContext.getProperty(REQUEST_ID_KEY); + } + return UUID.randomUUID().toString(); + } + + @Override + void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) { + String realmId = callCtx.getRealmContext().getRealmIdentifier(); + + buffer + .computeIfAbsent(realmId, k -> new ConcurrentLinkedQueue<>()) + .add(new EventAndContext(polarisEvent, callCtx.getPolarisCallContext().copy())); + futures.put(executor.submit(() -> checkAndFlushBufferIfNecessary(realmId)), 1); + } + + @VisibleForTesting + public void checkAndFlushBufferIfNecessary(String realmId) { + ConcurrentLinkedQueue<EventAndContext> queue = buffer.get(realmId); + if (queue == null || queue.isEmpty()) { + return; + } + + // Given that we are using a ConcurrentLinkedQueue, this should not lock any calls to `add` on + // the queue. + synchronized (queue) { + // Double-check inside synchronized block + if (queue.isEmpty()) { + return; + } + + EventAndContext head = queue.peek(); + if (head == null) { + return; + } + + Duration elapsed = Duration.ofMillis(clock.millis() - head.polarisEvent.getTimestampMs()); Review Comment: Not sure why this is the case? `clock.millis()` is relying on the server's clock and `head.polarisEvent.getTimestampMs()` was also derived earlier from the server's clock as well. Are you implying that the server clock may change during the server being up and running? If so, I'm not sure I know any way to work around that being the case - and would think this is a much larger problem than what's in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org