adnanhemani commented on code in PR #1844: URL: https://github.com/apache/polaris/pull/1844#discussion_r2153267622
########## service/common/src/main/java/org/apache/polaris/service/events/AfterTableCommitedEvent.java: ########## @@ -21,14 +21,35 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.catalog.TableIdentifier; -/** - * Emitted after Polaris performs a commit to a table. This is not emitted if there's an exception - * while committing. - * - * @param identifier The identifier. - * @param base The old metadata. - * @param metadata The new metadata. - */ -public record AfterTableCommitedEvent( - TableIdentifier identifier, TableMetadata base, TableMetadata metadata) - implements PolarisEvent {} +public final class AfterTableCommitedEvent extends PolarisEvent { Review Comment: That's right; if we keep records, then we will need to generate the `eventId` repeatedly in-line when we generate the event. ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java: ########## @@ -225,6 +228,39 @@ public void writeToGrantRecords( } } + @Override + public void writeEvents(@Nonnull List<PolarisEvent> events) { + int batchSize = 10; + + try { + datasourceOperations.runWithinTransaction( Review Comment: > If I understand this correctly, events are being persisted in their own transaction, and only after a buffer flush. This means that events, even "before" events, could only show up in queries way after the business operation they refer to has been persisted. Are we OK with that? Yes, your understanding is correct - and I think we should be okay with that based on the implementation that the customer configures for their Event Listener. If the customer sees this as a hard requirement that events MUST flush to persistence in real time (i.e. no buffering) - and are okay with all the ramifications (increased load to the persistence and increased call latencies), then they are free to create/use an Event Listener that commits to the persistence in-line. Bottom line: this is a customer-driven decision that they should make. We can guide them towards a default option but this default option is not binding for customers. > It also means that if a transaction writing events fail, we could end up with dozens of missing events in the database. Are we OK with that? While this is, in theory, correct - I think we have build enough robustness, especially in the file-buffer implementation, where there is not much of a risk of missing events. Again, risk-tolerances and what to balance is up to the customer to decide through their configurations. > I guess my base question is: do we have a clear understanding of which minimal consistency and ordering guarantees users can expect from the events subsystem? This is up to the customer based on the implementation they use. I can answer this for the two implementations I have provided here: 1. In-Memory Buffer: No consistency or ordering guarantees. This is purely a best-effort implementation. 2. File-based buffer: There is a durability guarantee that events cannot get lost, given that the connected hard-drive/local-storage does not get overfilled by the customer's settings. Ordering can be determined by the events themselves - but there is no guarantee on ordering within the persistence itself. ########## polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.core.entity; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.Serializable; +import java.util.Map; + +public class PolarisEvent implements Serializable { + public static final String EMPTY_MAP_STRING = "{}"; + + // to serialize/deserialize properties + private static final ObjectMapper MAPPER = new ObjectMapper(); + + // catalog id + private String catalogId; + + // event id + private String id; + + // id of the request that generated this event + private String requestId; + + // event type that was fired + private String eventType; + + // timestamp in epoch milliseconds of when this event was emitted + private long timestampMs; Review Comment: I'm definitely not a Quarkus/CDI expert - but my understanding was that because the Event was always being called in a RequestScoped class, the event was treated as a Dependent and got the clock injected due to the annotation. I have unit tests in this PR that are working (and tested it locally as well) - so not sure if I've done something completely off-the-mark here. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Clock; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-file-buffer") +public class FileBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; Review Comment: That's a good point, will change this. ########## polaris-core/src/main/java/org/apache/polaris/core/config/PolarisConfigurationStore.java: ########## @@ -59,7 +59,7 @@ public interface PolarisConfigurationStore { * @param <T> the type of the configuration value */ default <T> @Nonnull T getConfiguration( - @Nonnull RealmContext realmContext, String configName, @Nonnull T defaultValue) { + RealmContext realmContext, String configName, @Nonnull T defaultValue) { Review Comment: There was a recent change that added this `@Nonnull` that did not take into account that there may be application-wide configurations (such as some of the ones I added as part of this PR). There exists code paths that handle `null` values for `realmContext` already for this case - but the change to this function signature was not investigated at that depth to realize this. ########## service/common/src/main/java/org/apache/polaris/service/events/AfterCatalogCreatedEvent.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events; + +import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal; + +/** Emitted when Polaris intends to create a catalog. */ +public final class AfterCatalogCreatedEvent extends PolarisEvent { Review Comment: Ditto as comment above. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Clock; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-file-buffer") +public class FileBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + // Key: str - Realm + // Value: + // Key: int - shard number + // Value: BufferShard - an object representing the directory and file where events are + // persisted on the filesystem + private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new HashMap<>(); Review Comment: Not sure if there's something I'm missing with this field being `final` (versus not being `final`) - but the outer HashMap shouldn't really ever have concurrency issues, even in an application-scoped setting. The key (`realmId`) will never be re-written or replaced, all interactions are purely to interact with the inner HashMap. I can see a point for the inner HashMap to be a ConcurrentHashMap, though, as there may be changes to the BufferShards. I'll make a fix for this. ########## persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java: ########## @@ -135,6 +135,47 @@ public static PreparedQuery generateInsertQuery( return new PreparedQuery(sql, finalValues); } + /** + * Generates an INSERT query for multiple values into a given table. + * + * @param allColumns Columns to insert values into. + * @param tableName Target table name. + * @param values Values for each column (must match order of columns). + * @param realmId Realm value to append. + * @return INSERT query with value bindings. + */ + public static PreparedQuery generateMultipleInsertQuery( Review Comment: This is a good call to look into JDBC batch updates. Let me revise the PR with this soon. ########## service/common/src/main/java/org/apache/polaris/service/events/AfterViewCommitedEvent.java: ########## @@ -21,13 +21,36 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.view.ViewMetadata; -/** - * Emitted after Polaris performs a commit to a view. This is not emitted if there's an exception - * while committing. - * - * @param identifier The identifier. - * @param base The old metadata. - * @param metadata The new metadata. - */ -public record AfterViewCommitedEvent( - TableIdentifier identifier, ViewMetadata base, ViewMetadata metadata) implements PolarisEvent {} +public final class AfterViewCommitedEvent extends PolarisEvent { Review Comment: Ditto on comment above. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Clock; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-file-buffer") +public class FileBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + // Key: str - Realm + // Value: + // Key: int - shard number + // Value: BufferShard - an object representing the directory and file where events are + // persisted on the filesystem + private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new HashMap<>(); + ConcurrentHashMap<String, Future> activeFlushFutures = new ConcurrentHashMap<>(); + + ScheduledExecutorService threadPool; + private static int shardCount; + private static int maxBufferSize; + private final Kryo kryo = new Kryo(); + private static final String BUFFER_SHARD_PREFIX = "polaris-event-buffer-shard-"; + + @Inject + public FileBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + PolarisConfigurationStore polarisConfigurationStore, + Clock clock) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.polarisConfigurationStore = polarisConfigurationStore; + this.clock = clock; + shardCount = Review Comment: We do not have benchmarks, so I agree this may be slightly over-engineered. But this was to mitigate concerns regarding putting too much pressure on a single file. If the rest of the community agrees that this is not a concern, I'm happy to rollback the sharding code here. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Clock; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-file-buffer") +public class FileBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + // Key: str - Realm + // Value: + // Key: int - shard number + // Value: BufferShard - an object representing the directory and file where events are + // persisted on the filesystem + private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new HashMap<>(); + ConcurrentHashMap<String, Future> activeFlushFutures = new ConcurrentHashMap<>(); + + ScheduledExecutorService threadPool; + private static int shardCount; + private static int maxBufferSize; + private final Kryo kryo = new Kryo(); + private static final String BUFFER_SHARD_PREFIX = "polaris-event-buffer-shard-"; + + @Inject + public FileBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + PolarisConfigurationStore polarisConfigurationStore, + Clock clock) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.polarisConfigurationStore = polarisConfigurationStore; + this.clock = clock; + shardCount = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_NUM_SHARDS); + maxBufferSize = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_MAX_SIZE); + int timeToFlush = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_TIME_TO_FLUSH_IN_MS); + int numThreads = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_NUM_THREADS); + threadPool = Executors.newScheduledThreadPool(numThreads); + kryo.register(PolarisEvent.class); + kryo.register(PolarisEvent.ResourceType.class); + + // Start BufferListingTask + Function<FileBufferListingTask.TaskSubmissionInput, Future> taskSubmissionFunction = + input -> threadPool.schedule(input.task(), input.delayInMs(), TimeUnit.MILLISECONDS); Review Comment: Yes, this is by design. The FileBufferListingTask should never stop or else there is not a way to restart this task and resume buffer flushing. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Clock; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-file-buffer") +public class FileBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + // Key: str - Realm + // Value: + // Key: int - shard number + // Value: BufferShard - an object representing the directory and file where events are + // persisted on the filesystem + private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new HashMap<>(); + ConcurrentHashMap<String, Future> activeFlushFutures = new ConcurrentHashMap<>(); + + ScheduledExecutorService threadPool; + private static int shardCount; + private static int maxBufferSize; + private final Kryo kryo = new Kryo(); + private static final String BUFFER_SHARD_PREFIX = "polaris-event-buffer-shard-"; + + @Inject + public FileBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + PolarisConfigurationStore polarisConfigurationStore, + Clock clock) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.polarisConfigurationStore = polarisConfigurationStore; + this.clock = clock; + shardCount = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_NUM_SHARDS); + maxBufferSize = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_MAX_SIZE); + int timeToFlush = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_TIME_TO_FLUSH_IN_MS); + int numThreads = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_NUM_THREADS); + threadPool = Executors.newScheduledThreadPool(numThreads); + kryo.register(PolarisEvent.class); + kryo.register(PolarisEvent.ResourceType.class); + + // Start BufferListingTask + Function<FileBufferListingTask.TaskSubmissionInput, Future> taskSubmissionFunction = + input -> threadPool.schedule(input.task(), input.delayInMs(), TimeUnit.MILLISECONDS); + BiConsumer<String, List<PolarisEvent>> eventWriter = + (realmId, polarisEvents) -> getBasePersistenceInstance(realmId).writeEvents(polarisEvents); + var future = Review Comment: Same concern as comment above. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Clock; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-file-buffer") +public class FileBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + // Key: str - Realm + // Value: + // Key: int - shard number + // Value: BufferShard - an object representing the directory and file where events are + // persisted on the filesystem + private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new HashMap<>(); + ConcurrentHashMap<String, Future> activeFlushFutures = new ConcurrentHashMap<>(); + + ScheduledExecutorService threadPool; + private static int shardCount; + private static int maxBufferSize; + private final Kryo kryo = new Kryo(); + private static final String BUFFER_SHARD_PREFIX = "polaris-event-buffer-shard-"; + + @Inject + public FileBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + PolarisConfigurationStore polarisConfigurationStore, + Clock clock) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.polarisConfigurationStore = polarisConfigurationStore; + this.clock = clock; + shardCount = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_NUM_SHARDS); + maxBufferSize = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_MAX_SIZE); + int timeToFlush = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_TIME_TO_FLUSH_IN_MS); + int numThreads = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_NUM_THREADS); + threadPool = Executors.newScheduledThreadPool(numThreads); + kryo.register(PolarisEvent.class); + kryo.register(PolarisEvent.ResourceType.class); + + // Start BufferListingTask + Function<FileBufferListingTask.TaskSubmissionInput, Future> taskSubmissionFunction = + input -> threadPool.schedule(input.task(), input.delayInMs(), TimeUnit.MILLISECONDS); + BiConsumer<String, List<PolarisEvent>> eventWriter = + (realmId, polarisEvents) -> getBasePersistenceInstance(realmId).writeEvents(polarisEvents); + var future = + threadPool.schedule( + new FileBufferListingTask( + getBufferDirectory(), + taskSubmissionFunction, + activeFlushFutures, + timeToFlush, + clock, + this::rotateShard, + eventWriter), + timeToFlush, + TimeUnit.MILLISECONDS); + } + + @Override + void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) { + int shardNum = polarisEvent.hashCode() % shardCount; + String realmId = callCtx.getRealmContext().getRealmIdentifier(); + if (!buffers.containsKey(realmId)) { + createBuffersForRealm(realmId); + } + BufferShard bufferShard = buffers.get(realmId).get(shardNum); + if (bufferShard == null) { + LOGGER.error( + "No buffer shard found for realm: #{}, shard #{}. Event dropped: {}", + realmId, + shardNum, + polarisEvent); + return; + } + + kryo.writeObject(bufferShard.output, polarisEvent); + bufferShard.output.flush(); + + // If too many events in this buffer shard, start a new shard + int bufferEventCount = bufferShard.eventCount.getAndIncrement() + 1; + if (bufferEventCount >= maxBufferSize) { + rotateShard(realmId, shardNum); + } + } + + private void createBuffersForRealm(String realmId) { + HashMap<Integer, BufferShard> bufferShardsForRealm = new HashMap<>(); + buffers.put(realmId, bufferShardsForRealm); + for (int i = 0; i < shardCount; i++) { + bufferShardsForRealm.put(i, createShard(realmId, i)); + } + } + + private BufferShard createShard(String realmId, int shardNum) { + String bufferDirName = + getBufferDirectory() + realmId + "/" + BUFFER_SHARD_PREFIX + shardNum + "/"; + File file = new File(bufferDirName + "buffer_timestamp-" + clock.millis()); + File parent = file.getParentFile(); + if (parent != null && !parent.exists()) { + parent.mkdirs(); // Creates all missing parent directories + } + try { + file.createNewFile(); + Output output = new Output(new FileOutputStream(file)); Review Comment: This is by design here - we want to keep the open file handle to write the buffer file quickly. But this is a good call out - I can close the output streams before I rotate the buffershards. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Clock; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-file-buffer") +public class FileBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + // Key: str - Realm + // Value: + // Key: int - shard number + // Value: BufferShard - an object representing the directory and file where events are + // persisted on the filesystem + private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new HashMap<>(); + ConcurrentHashMap<String, Future> activeFlushFutures = new ConcurrentHashMap<>(); + + ScheduledExecutorService threadPool; + private static int shardCount; Review Comment: They cannot be both `static` and `final` because we cannot move the code to read the configuration reading code outside the initializer. I made the call to make this `static` based on the fact that there is only really one instance of this EventListener for a `@ApplicationScoped` class. > This class looks badly designed imho. Please do expand on this - would like to get your feedback. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Clock; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-file-buffer") +public class FileBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + // Key: str - Realm + // Value: + // Key: int - shard number + // Value: BufferShard - an object representing the directory and file where events are + // persisted on the filesystem + private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new HashMap<>(); + ConcurrentHashMap<String, Future> activeFlushFutures = new ConcurrentHashMap<>(); + + ScheduledExecutorService threadPool; + private static int shardCount; + private static int maxBufferSize; + private final Kryo kryo = new Kryo(); + private static final String BUFFER_SHARD_PREFIX = "polaris-event-buffer-shard-"; + + @Inject + public FileBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + PolarisConfigurationStore polarisConfigurationStore, + Clock clock) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.polarisConfigurationStore = polarisConfigurationStore; + this.clock = clock; + shardCount = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_NUM_SHARDS); + maxBufferSize = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_MAX_SIZE); + int timeToFlush = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_TIME_TO_FLUSH_IN_MS); + int numThreads = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_NUM_THREADS); + threadPool = Executors.newScheduledThreadPool(numThreads); + kryo.register(PolarisEvent.class); + kryo.register(PolarisEvent.ResourceType.class); + + // Start BufferListingTask + Function<FileBufferListingTask.TaskSubmissionInput, Future> taskSubmissionFunction = + input -> threadPool.schedule(input.task(), input.delayInMs(), TimeUnit.MILLISECONDS); + BiConsumer<String, List<PolarisEvent>> eventWriter = + (realmId, polarisEvents) -> getBasePersistenceInstance(realmId).writeEvents(polarisEvents); + var future = + threadPool.schedule( + new FileBufferListingTask( + getBufferDirectory(), + taskSubmissionFunction, + activeFlushFutures, + timeToFlush, + clock, + this::rotateShard, + eventWriter), + timeToFlush, + TimeUnit.MILLISECONDS); + } + + @Override + void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) { + int shardNum = polarisEvent.hashCode() % shardCount; + String realmId = callCtx.getRealmContext().getRealmIdentifier(); + if (!buffers.containsKey(realmId)) { + createBuffersForRealm(realmId); + } + BufferShard bufferShard = buffers.get(realmId).get(shardNum); + if (bufferShard == null) { + LOGGER.error( + "No buffer shard found for realm: #{}, shard #{}. Event dropped: {}", Review Comment: This is something we should have a discussion about. In theory, there should not be a way that a Buffer Shard did not get created (as per the code that's also written in this class). But in the case that it did - what should we do? This is my attempt to just send it to the overall application logs to alert the admin that something has gone wrong. But we could also just retry creating a BufferShard? WDYT? ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Clock; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-file-buffer") +public class FileBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + // Key: str - Realm + // Value: + // Key: int - shard number + // Value: BufferShard - an object representing the directory and file where events are + // persisted on the filesystem + private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new HashMap<>(); + ConcurrentHashMap<String, Future> activeFlushFutures = new ConcurrentHashMap<>(); + + ScheduledExecutorService threadPool; + private static int shardCount; + private static int maxBufferSize; + private final Kryo kryo = new Kryo(); + private static final String BUFFER_SHARD_PREFIX = "polaris-event-buffer-shard-"; + + @Inject + public FileBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + PolarisConfigurationStore polarisConfigurationStore, + Clock clock) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.polarisConfigurationStore = polarisConfigurationStore; + this.clock = clock; + shardCount = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_NUM_SHARDS); + maxBufferSize = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_MAX_SIZE); + int timeToFlush = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_TIME_TO_FLUSH_IN_MS); + int numThreads = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_NUM_THREADS); + threadPool = Executors.newScheduledThreadPool(numThreads); Review Comment: > But even more importantly: it is not being closed at all. That is the design of the thread pools in both Event Listener classes right now - there is a dedicated thread pool that is solely in charge of clearing out the buffers and sending the buffered events to the persistence. More on your first comment in the overall review response. ########## service/common/src/main/java/org/apache/polaris/service/events/listeners/FileBufferPolarisPersistenceEventListener.java: ########## @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.service.events.listeners; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; +import io.smallrye.common.annotation.Identifier; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.time.Clock; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Function; +import org.apache.polaris.core.config.FeatureConfiguration; +import org.apache.polaris.core.config.PolarisConfigurationStore; +import org.apache.polaris.core.context.CallContext; +import org.apache.polaris.core.context.RealmContext; +import org.apache.polaris.core.entity.PolarisEvent; +import org.apache.polaris.core.persistence.BasePersistence; +import org.apache.polaris.core.persistence.MetaStoreManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Event listener that buffers in memory and then dumps to persistence. */ +@ApplicationScoped +@Identifier("persistence-file-buffer") +public class FileBufferPolarisPersistenceEventListener extends PolarisPersistenceEventListener { + private static final Logger LOGGER = + LoggerFactory.getLogger(FileBufferPolarisPersistenceEventListener.class); + MetaStoreManagerFactory metaStoreManagerFactory; + PolarisConfigurationStore polarisConfigurationStore; + Clock clock; + + // Key: str - Realm + // Value: + // Key: int - shard number + // Value: BufferShard - an object representing the directory and file where events are + // persisted on the filesystem + private final HashMap<String, HashMap<Integer, BufferShard>> buffers = new HashMap<>(); + ConcurrentHashMap<String, Future> activeFlushFutures = new ConcurrentHashMap<>(); + + ScheduledExecutorService threadPool; + private static int shardCount; + private static int maxBufferSize; + private final Kryo kryo = new Kryo(); + private static final String BUFFER_SHARD_PREFIX = "polaris-event-buffer-shard-"; + + @Inject + public FileBufferPolarisPersistenceEventListener( + MetaStoreManagerFactory metaStoreManagerFactory, + PolarisConfigurationStore polarisConfigurationStore, + Clock clock) { + this.metaStoreManagerFactory = metaStoreManagerFactory; + this.polarisConfigurationStore = polarisConfigurationStore; + this.clock = clock; + shardCount = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_NUM_SHARDS); + maxBufferSize = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_MAX_SIZE); + int timeToFlush = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_TIME_TO_FLUSH_IN_MS); + int numThreads = + polarisConfigurationStore.getConfiguration( + null, FeatureConfiguration.EVENT_BUFFER_NUM_THREADS); + threadPool = Executors.newScheduledThreadPool(numThreads); + kryo.register(PolarisEvent.class); + kryo.register(PolarisEvent.ResourceType.class); + + // Start BufferListingTask + Function<FileBufferListingTask.TaskSubmissionInput, Future> taskSubmissionFunction = + input -> threadPool.schedule(input.task(), input.delayInMs(), TimeUnit.MILLISECONDS); + BiConsumer<String, List<PolarisEvent>> eventWriter = + (realmId, polarisEvents) -> getBasePersistenceInstance(realmId).writeEvents(polarisEvents); + var future = + threadPool.schedule( + new FileBufferListingTask( + getBufferDirectory(), + taskSubmissionFunction, + activeFlushFutures, + timeToFlush, + clock, + this::rotateShard, + eventWriter), + timeToFlush, + TimeUnit.MILLISECONDS); + } + + @Override + void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) { + int shardNum = polarisEvent.hashCode() % shardCount; + String realmId = callCtx.getRealmContext().getRealmIdentifier(); + if (!buffers.containsKey(realmId)) { + createBuffersForRealm(realmId); + } + BufferShard bufferShard = buffers.get(realmId).get(shardNum); + if (bufferShard == null) { + LOGGER.error( + "No buffer shard found for realm: #{}, shard #{}. Event dropped: {}", + realmId, + shardNum, + polarisEvent); + return; + } + + kryo.writeObject(bufferShard.output, polarisEvent); + bufferShard.output.flush(); + + // If too many events in this buffer shard, start a new shard + int bufferEventCount = bufferShard.eventCount.getAndIncrement() + 1; + if (bufferEventCount >= maxBufferSize) { + rotateShard(realmId, shardNum); + } + } + + private void createBuffersForRealm(String realmId) { + HashMap<Integer, BufferShard> bufferShardsForRealm = new HashMap<>(); + buffers.put(realmId, bufferShardsForRealm); + for (int i = 0; i < shardCount; i++) { + bufferShardsForRealm.put(i, createShard(realmId, i)); + } + } + + private BufferShard createShard(String realmId, int shardNum) { + String bufferDirName = Review Comment: That's a good call out - I will make these changes in the next revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org