adutra commented on code in PR #1844: URL: https://github.com/apache/polaris/pull/1844#discussion_r2180356669
########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.service.events.EventListenerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + private final HashMap<String, List<PolarisEvent>> buffer = new HashMap<>(); + private final ScheduledExecutorService thread = Executors.newSingleThreadScheduledExecutor(); + private final long timeToFlush; + private final int maxBufferSize; + private Future<?> backgroundTask; + + @Inject + public InMemoryBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + PolarisConfigurationStore polarisConfigurationStore, + Clock clock, + EventListenerConfiguration eventListenerConfiguration) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.polarisConfigurationStore = polarisConfigurationStore; + this.clock = clock; + this.timeToFlush = + eventListenerConfiguration.bufferTime().orElse((long) 30 * 1000); // 30s default + this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); // 5 events default + } + + @PostConstruct + void start() { + backgroundTask = + thread.scheduleAtFixedRate(this::runCleanup, 0, timeToFlush, TimeUnit.MILLISECONDS); + } + + void runCleanup() { + for (String realmId : buffer.keySet()) { + try { + checkAndFlushBufferIfNecessary(realmId); + } catch (Exception e) { + LOGGER.debug("Buffer checking task failed for realm ({}): {}", realmId, e); + } + } + } + + @PreDestroy + void shutdown() { + backgroundTask.cancel(false); + thread.shutdownNow(); + } + + @Override + void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) { + String realmId = callCtx.getRealmContext().getRealmIdentifier(); + buffer.computeIfAbsent(realmId, k -> new ArrayList<>()).add(polarisEvent); + checkAndFlushBufferIfNecessary(realmId); + } + + @VisibleForTesting + public void checkAndFlushBufferIfNecessary(String realmId) { Review Comment: Is this safe to be called concurrently? What happens if this method is called for the same realm ID by 2 threads? ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.service.events.EventListenerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + private final HashMap<String, List<PolarisEvent>> buffer = new HashMap<>(); + private final ScheduledExecutorService thread = Executors.newSingleThreadScheduledExecutor(); Review Comment: `thread` is a weird name for an executor, mind renaming to `executor`? ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.service.events.EventListenerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; Review Comment: ```suggestion private final MetaStoreManagerFactory metaStoreManagerFactory; ``` ########## polaris-core/src/main/java/org/apache/polaris/core/PolarisCallContext.java: ########## @@ -85,6 +91,10 @@ public Clock getClock() { return clock; } + public String getRequestId() { Review Comment: I'm not thrilled about the idea of adding more stuff to this class, which is already a bag of unrelated beans. Also, we already have the notion of a request ID for logging purposes: https://github.com/apache/polaris/blob/ab228afa4d975faabb7aaf1e8abb0804f5b9d353/runtime/service/src/main/java/org/apache/polaris/service/quarkus/logging/QuarkusLoggingConfiguration.java#L28 I would prefer to use the same request ID coming from HTTP headers if available, and if not, use a random UUID. Wdyt? ########## polaris-core/src/main/java/org/apache/polaris/core/utils/CachedSupplier.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.utils; + +import java.util.function.Supplier; + +public class CachedSupplier<T> implements Supplier<T> { Review Comment: Use Guava `Suppliers.memoize` instead. ########## service/common/src/main/java/org/apache/polaris/service/events/EventListenerConfiguration.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import java.util.Optional; + +public interface EventListenerConfiguration { + Optional<Long> bufferTime(); Review Comment: There is no javadoc, and we don't know which time unit to use here: is it milliseconds? Seconds? Hours? Please use `Duration` whenever possible, it's much more user-friendly. ########## service/common/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.TableIdentifier; + +/** Emitted when Polaris intends to create a table. */ Review Comment: "Intends"? ########## polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.Serializable; +import java.util.Map; + +public class PolarisEvent implements Serializable { Review Comment: Isn't this a Pojo? Why not use `@PolarisImmutable`? ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import jakarta.ws.rs.core.SecurityContext; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.service.events.AfterCatalogCreatedEvent; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableCreatedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeCatalogCreatedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableCreatedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; + +/** + * Represents an event listener that can respond to notable moments during Polaris's execution. + * Event details are documented under the event objects themselves. + */ +public class PolarisEventListener { Review Comment: Since this class is not abstract anymore why not turn it into an interface with default methods? It's more flexible to define a listener as an interface than forcing implementors to extend a specific class. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.service.events.EventListenerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; Review Comment: Unused bean. ########## runtime/defaults/src/main/resources/application.properties: ########## @@ -120,12 +120,16 @@ polaris.features."SUPPORTED_EXTERNAL_CATALOG_AUTHENTICATION_TYPES"=["OAUTH", "BE # polaris.persistence.type=eclipse-link # polaris.persistence.type=in-memory-atomic polaris.persistence.type=in-memory +# polaris.persistence.type=relational-jdbc polaris.secrets-manager.type=in-memory polaris.file-io.type=default polaris.event-listener.type=no-op +# polaris.event-listener.type=persistence-file-buffer Review Comment: I think the default values for these properties are no longer relevant. ########## polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.Serializable; +import java.util.Map; + +public class PolarisEvent implements Serializable { + public static final String EMPTY_MAP_STRING = "{}"; + + // to serialize/deserialize properties + private static final ObjectMapper MAPPER = new ObjectMapper(); + + // catalog id + private String catalogId; + + // event id + private String id; + + // id of the request that generated this event + private String requestId; + + // event type that was fired + private String eventType; + + // timestamp in epoch milliseconds of when this event was emitted + private long timestampMs; + + // polaris principal who took this action + private String principalName; + + // Enum that states the type of resource was being operated on + private ResourceType resourceType; + + // Which resource was operated on + private String resourceIdentifier; + + // Additional parameters that were not earlier recorded + private String additionalParameters; + + public String getCatalogId() { + return catalogId; + } + + public String getId() { + return id; + } + + public String getRequestId() { + return requestId; + } + + public String getEventType() { + return eventType; + } + + public long getTimestampMs() { + return timestampMs; + } + + public String getPrincipalName() { + return principalName; + } + + public ResourceType getResourceType() { + return resourceType; + } + + public String getResourceIdentifier() { + return resourceIdentifier; + } + + public String getAdditionalParameters() { + return additionalParameters != null ? additionalParameters : EMPTY_MAP_STRING; + } + + public Map<String, String> getAdditionalParametersAsMap() { Review Comment: This method seems unused. ########## service/common/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.service.events.EventListenerConfiguration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.threeten.extra.MutableClock; + +public class InMemoryBufferPolarisPersistenceEventListenerTest { + private InMemoryBufferPolarisPersistenceEventListener eventListener; + private PolarisConfigurationStore configurationStore; + private BasePersistence basePersistence; + private MutableClock clock; + private CallContext callContext; + + private static final int CONFIG_MAX_BUFFER_SIZE = 5; + private static final long CONFIG_TIME_TO_FLUSH_IN_MS = 500; + + @BeforeEach + public void setUp() { + callContext = Mockito.mock(CallContext.class); + basePersistence = mock(BasePersistence.class); + Supplier basePersistenceSupplier = () -> basePersistence; Review Comment: Nit: raw class. ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java: ########## @@ -200,6 +202,61 @@ public int executeUpdate(QueryGenerator.PreparedQuery preparedQuery) throws SQLE }); } + /** + * Executes the INSERT/UPDATE Queries in batches. Requires that all SQL queries have the same + * parameterized form. + * + * @param preparedQueries : queries to be executed + * @return : Number of rows modified / inserted. + * @throws SQLException : Exception during Query Execution. + */ + public int executeBatchUpdate(List<QueryGenerator.PreparedQuery> preparedQueries) + throws SQLException { + if (preparedQueries.isEmpty()) { + return 0; + } + int batchSize = 100; + AtomicInteger successCount = new AtomicInteger(); + return withRetries( + () -> { + String sql = preparedQueries.get(0).sql(); Review Comment: Am I correct that this is relying on the assumption that all `PreparedQuery` instances have the exact same SQL query? The issue is that nothing in the method signature prevents a caller from calling this method with different SQL queries. I think at the very least, you should add a check for this invariant. But this hints at a badly designed signature. A proper signature would be something like: ```java public int executeBatchUpdate(QueryGenerator.PreparedBatchQuery preparedBatchQuery) ``` ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.service.events.EventListenerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + private final HashMap<String, List<PolarisEvent>> buffer = new HashMap<>(); Review Comment: Again, a non-concurrent map is being used for concurrent access. At some point, I'm getting tired of constantly reminding basic Java concepts to Polaris contributors. This **must be a concurrent map** or you **must synchronize externally**. ########## service/common/src/main/java/org/apache/polaris/service/events/EventListenerConfiguration.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import java.util.Optional; + +public interface EventListenerConfiguration { + Optional<Long> bufferTime(); + + Optional<Integer> maxBufferSize(); Review Comment: In which unit? Bytes? Megabytes? Number of cached entries? ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.service.events.EventListenerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-in-memory-buffer") +public class InMemoryBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; Review Comment: ```suggestion private final Clock clock; ``` ########## runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java: ########## @@ -42,8 +42,8 @@ import org.apache.polaris.service.context.DefaultRealmContextResolver; import org.apache.polaris.service.context.RealmContextResolver; import org.apache.polaris.service.context.TestRealmContextResolver; -import org.apache.polaris.service.events.PolarisEventListener; -import org.apache.polaris.service.events.TestPolarisEventListener; +import org.apache.polaris.service.events.listeners.PolarisEventListener; +import org.apache.polaris.service.events.listeners.TestPolarisEventListener; import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory; import org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration; import org.eclipse.microprofile.config.Config; Review Comment: I just noticed: the production readiness check is displaying an incorrect property name: it should be `polaris.event-listener.type` instead of `polaris.events.type`. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java: ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.google.common.collect.Streams; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.ws.rs.core.SecurityContext; +import java.util.ArrayList; +import java.util.List; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.service.events.AfterTableCommitedEvent; +import org.apache.polaris.service.events.AfterTableRefreshedEvent; +import org.apache.polaris.service.events.AfterTaskAttemptedEvent; +import org.apache.polaris.service.events.AfterViewCommitedEvent; +import org.apache.polaris.service.events.AfterViewRefreshedEvent; +import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent; +import org.apache.polaris.service.events.BeforeTableCommitedEvent; +import org.apache.polaris.service.events.BeforeTableRefreshedEvent; +import org.apache.polaris.service.events.BeforeTaskAttemptedEvent; +import org.apache.polaris.service.events.BeforeViewCommitedEvent; +import org.apache.polaris.service.events.BeforeViewRefreshedEvent; +import org.apache.polaris.service.events.PolarisEvent; + +/** Event listener that stores all emitted events forever. Not recommended for use in production. */ +@ApplicationScoped +@Identifier("test") +public class TestPolarisEventListener extends PolarisEventListener { Review Comment: I know this wasn't introduced in this PR, but I'm wondering what usage a Polaris user could possibly make of this listener? The default one is `no-op` which imho is fine. If this one is only used in our own test suite, and has no value for users, could we move it to the `src/test` or `src/testFixtures` folder? ########## service/common/src/main/java/org/apache/polaris/service/config/DefaultConfigurationStore.java: ########## @@ -47,7 +46,10 @@ public DefaultConfigurationStore( } @Override - public <T> @Nullable T getConfiguration(@Nonnull RealmContext realmContext, String configName) { + public <T> @Nullable T getConfiguration(RealmContext realmContext, String configName) { + if (realmContext == null) { Review Comment: Unrelated change? ########## service/common/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.apache.polaris.service.events.EventListenerConfiguration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.threeten.extra.MutableClock; + +public class InMemoryBufferPolarisPersistenceEventListenerTest { + private InMemoryBufferPolarisPersistenceEventListener eventListener; + private PolarisConfigurationStore configurationStore; + private BasePersistence basePersistence; + private MutableClock clock; + private CallContext callContext; + + private static final int CONFIG_MAX_BUFFER_SIZE = 5; + private static final long CONFIG_TIME_TO_FLUSH_IN_MS = 500; + + @BeforeEach + public void setUp() { + callContext = Mockito.mock(CallContext.class); + basePersistence = mock(BasePersistence.class); + Supplier basePersistenceSupplier = () -> basePersistence; + MetaStoreManagerFactory metaStoreManagerFactory = Mockito.mock(MetaStoreManagerFactory.class); + when(metaStoreManagerFactory.getOrCreateSessionSupplier(Mockito.any())) + .thenReturn(basePersistenceSupplier); + + EventListenerConfiguration eventListenerConfiguration = + Mockito.mock(EventListenerConfiguration.class); + when(eventListenerConfiguration.maxBufferSize()) + .thenReturn(Optional.of(CONFIG_MAX_BUFFER_SIZE)); + when(eventListenerConfiguration.bufferTime()) + .thenReturn(Optional.of(CONFIG_TIME_TO_FLUSH_IN_MS)); + + clock = + MutableClock.of( + Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to make it easier to test + + eventListener = + new InMemoryBufferPolarisPersistenceEventListener( + metaStoreManagerFactory, configurationStore, clock, eventListenerConfiguration); + } + + @Test + public void testAddToBufferFlushesAfterConfiguredTime() { + String realmId = "realm1"; + List<PolarisEvent> eventsAddedToBuffer = addEventsWithoutTriggeringFlush(realmId); + + // Push clock forwards to flush the buffer + clock.add(CONFIG_TIME_TO_FLUSH_IN_MS * 2, ChronoUnit.MILLIS); + eventListener.checkAndFlushBufferIfNecessary(realmId); + verify(basePersistence, times(1)).writeEvents(eq(eventsAddedToBuffer)); + } + + @Test + public void testAddToBufferFlushesAfterMaxEvents() { Review Comment: Suggestion: add a test that exercises concurrent thread access. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org