adutra commented on code in PR #1844:
URL: https://github.com/apache/polaris/pull/1844#discussion_r2180356669


##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;
+  PolarisConfigurationStore polarisConfigurationStore;
+  Clock clock;
+
+  private final HashMap<String, List<PolarisEvent>> buffer = new HashMap<>();
+  private final ScheduledExecutorService thread = 
Executors.newSingleThreadScheduledExecutor();
+  private final long timeToFlush;
+  private final int maxBufferSize;
+  private Future<?> backgroundTask;
+
+  @Inject
+  public InMemoryBufferPolarisPersistenceEventListener(
+      MetaStoreManagerFactory metaStoreManagerFactory,
+      PolarisConfigurationStore polarisConfigurationStore,
+      Clock clock,
+      EventListenerConfiguration eventListenerConfiguration) {
+    this.metaStoreManagerFactory = metaStoreManagerFactory;
+    this.polarisConfigurationStore = polarisConfigurationStore;
+    this.clock = clock;
+    this.timeToFlush =
+        eventListenerConfiguration.bufferTime().orElse((long) 30 * 1000); // 
30s default
+    this.maxBufferSize = eventListenerConfiguration.maxBufferSize().orElse(5); 
// 5 events default
+  }
+
+  @PostConstruct
+  void start() {
+    backgroundTask =
+        thread.scheduleAtFixedRate(this::runCleanup, 0, timeToFlush, 
TimeUnit.MILLISECONDS);
+  }
+
+  void runCleanup() {
+    for (String realmId : buffer.keySet()) {
+      try {
+        checkAndFlushBufferIfNecessary(realmId);
+      } catch (Exception e) {
+        LOGGER.debug("Buffer checking task failed for realm ({}): {}", 
realmId, e);
+      }
+    }
+  }
+
+  @PreDestroy
+  void shutdown() {
+    backgroundTask.cancel(false);
+    thread.shutdownNow();
+  }
+
+  @Override
+  void addToBuffer(PolarisEvent polarisEvent, CallContext callCtx) {
+    String realmId = callCtx.getRealmContext().getRealmIdentifier();
+    buffer.computeIfAbsent(realmId, k -> new ArrayList<>()).add(polarisEvent);
+    checkAndFlushBufferIfNecessary(realmId);
+  }
+
+  @VisibleForTesting
+  public void checkAndFlushBufferIfNecessary(String realmId) {

Review Comment:
   Is this safe to be called concurrently? What happens if this method is 
called for the same realm ID by 2 threads?



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;
+  PolarisConfigurationStore polarisConfigurationStore;
+  Clock clock;
+
+  private final HashMap<String, List<PolarisEvent>> buffer = new HashMap<>();
+  private final ScheduledExecutorService thread = 
Executors.newSingleThreadScheduledExecutor();

Review Comment:
   `thread` is a weird name for an executor, mind renaming to `executor`?



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;

Review Comment:
   ```suggestion
     private final MetaStoreManagerFactory metaStoreManagerFactory;
   ```



##########
polaris-core/src/main/java/org/apache/polaris/core/PolarisCallContext.java:
##########
@@ -85,6 +91,10 @@ public Clock getClock() {
     return clock;
   }
 
+  public String getRequestId() {

Review Comment:
   I'm not thrilled about the idea of adding more stuff to this class, which is 
already a bag of unrelated beans. 
   
   Also, we already have the notion of a request ID for logging purposes:
   
   
https://github.com/apache/polaris/blob/ab228afa4d975faabb7aaf1e8abb0804f5b9d353/runtime/service/src/main/java/org/apache/polaris/service/quarkus/logging/QuarkusLoggingConfiguration.java#L28
   
   I would prefer to use the same request ID coming from HTTP headers if 
available, and if not, use a random UUID. Wdyt?



##########
polaris-core/src/main/java/org/apache/polaris/core/utils/CachedSupplier.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.utils;
+
+import java.util.function.Supplier;
+
+public class CachedSupplier<T> implements Supplier<T> {

Review Comment:
   Use Guava `Suppliers.memoize` instead.



##########
service/common/src/main/java/org/apache/polaris/service/events/EventListenerConfiguration.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import java.util.Optional;
+
+public interface EventListenerConfiguration {
+  Optional<Long> bufferTime();

Review Comment:
   There is no javadoc, and we don't know which time unit to use here: is it 
milliseconds? Seconds? Hours? Please use `Duration` whenever possible, it's 
much more user-friendly.



##########
service/common/src/main/java/org/apache/polaris/service/events/AfterTableCreatedEvent.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/** Emitted when Polaris intends to create a table. */

Review Comment:
   "Intends"?



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.Serializable;
+import java.util.Map;
+
+public class PolarisEvent implements Serializable {

Review Comment:
   Isn't this a Pojo? Why not use `@PolarisImmutable`?



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/PolarisEventListener.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import jakarta.ws.rs.core.SecurityContext;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableCreatedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeCatalogCreatedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableCreatedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+
+/**
+ * Represents an event listener that can respond to notable moments during 
Polaris's execution.
+ * Event details are documented under the event objects themselves.
+ */
+public class PolarisEventListener {

Review Comment:
   Since this class is not abstract anymore why not turn it into an interface 
with default methods? It's more flexible to define a listener as an interface 
than forcing implementors to extend a specific class.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;
+  PolarisConfigurationStore polarisConfigurationStore;

Review Comment:
   Unused bean.



##########
runtime/defaults/src/main/resources/application.properties:
##########
@@ -120,12 +120,16 @@ 
polaris.features."SUPPORTED_EXTERNAL_CATALOG_AUTHENTICATION_TYPES"=["OAUTH", "BE
 # polaris.persistence.type=eclipse-link
 # polaris.persistence.type=in-memory-atomic
 polaris.persistence.type=in-memory
+# polaris.persistence.type=relational-jdbc
 
 polaris.secrets-manager.type=in-memory
 
 polaris.file-io.type=default
 
 polaris.event-listener.type=no-op
+# polaris.event-listener.type=persistence-file-buffer

Review Comment:
   I think the default values for these properties are no longer relevant.



##########
polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEvent.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.entity;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.Serializable;
+import java.util.Map;
+
+public class PolarisEvent implements Serializable {
+  public static final String EMPTY_MAP_STRING = "{}";
+
+  // to serialize/deserialize properties
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  // catalog id
+  private String catalogId;
+
+  // event id
+  private String id;
+
+  // id of the request that generated this event
+  private String requestId;
+
+  // event type that was fired
+  private String eventType;
+
+  // timestamp in epoch milliseconds of when this event was emitted
+  private long timestampMs;
+
+  // polaris principal who took this action
+  private String principalName;
+
+  // Enum that states the type of resource was being operated on
+  private ResourceType resourceType;
+
+  // Which resource was operated on
+  private String resourceIdentifier;
+
+  // Additional parameters that were not earlier recorded
+  private String additionalParameters;
+
+  public String getCatalogId() {
+    return catalogId;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public String getRequestId() {
+    return requestId;
+  }
+
+  public String getEventType() {
+    return eventType;
+  }
+
+  public long getTimestampMs() {
+    return timestampMs;
+  }
+
+  public String getPrincipalName() {
+    return principalName;
+  }
+
+  public ResourceType getResourceType() {
+    return resourceType;
+  }
+
+  public String getResourceIdentifier() {
+    return resourceIdentifier;
+  }
+
+  public String getAdditionalParameters() {
+    return additionalParameters != null ? additionalParameters : 
EMPTY_MAP_STRING;
+  }
+
+  public Map<String, String> getAdditionalParametersAsMap() {

Review Comment:
   This method seems unused.



##########
service/common/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.threeten.extra.MutableClock;
+
+public class InMemoryBufferPolarisPersistenceEventListenerTest {
+  private InMemoryBufferPolarisPersistenceEventListener eventListener;
+  private PolarisConfigurationStore configurationStore;
+  private BasePersistence basePersistence;
+  private MutableClock clock;
+  private CallContext callContext;
+
+  private static final int CONFIG_MAX_BUFFER_SIZE = 5;
+  private static final long CONFIG_TIME_TO_FLUSH_IN_MS = 500;
+
+  @BeforeEach
+  public void setUp() {
+    callContext = Mockito.mock(CallContext.class);
+    basePersistence = mock(BasePersistence.class);
+    Supplier basePersistenceSupplier = () -> basePersistence;

Review Comment:
   Nit: raw class.



##########
persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/DatasourceOperations.java:
##########
@@ -200,6 +202,61 @@ public int executeUpdate(QueryGenerator.PreparedQuery 
preparedQuery) throws SQLE
         });
   }
 
+  /**
+   * Executes the INSERT/UPDATE Queries in batches. Requires that all SQL 
queries have the same
+   * parameterized form.
+   *
+   * @param preparedQueries : queries to be executed
+   * @return : Number of rows modified / inserted.
+   * @throws SQLException : Exception during Query Execution.
+   */
+  public int executeBatchUpdate(List<QueryGenerator.PreparedQuery> 
preparedQueries)
+      throws SQLException {
+    if (preparedQueries.isEmpty()) {
+      return 0;
+    }
+    int batchSize = 100;
+    AtomicInteger successCount = new AtomicInteger();
+    return withRetries(
+        () -> {
+          String sql = preparedQueries.get(0).sql();

Review Comment:
   Am I correct that this is relying on the assumption that all `PreparedQuery` 
instances have the exact same SQL query?
   
   The issue is that nothing in the method signature prevents a caller from 
calling this method with different SQL queries. I think at the very least, you 
should add a check for this invariant. But this hints at a badly designed 
signature. A proper signature would be something like:
   
   ```java
   public int executeBatchUpdate(QueryGenerator.PreparedBatchQuery 
preparedBatchQuery)
   ```



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;
+  PolarisConfigurationStore polarisConfigurationStore;
+  Clock clock;
+
+  private final HashMap<String, List<PolarisEvent>> buffer = new HashMap<>();

Review Comment:
   Again, a non-concurrent map is being used for concurrent access. At some 
point, I'm getting tired of constantly reminding basic Java concepts to Polaris 
contributors. This **must be a concurrent map** or you **must synchronize 
externally**.



##########
service/common/src/main/java/org/apache/polaris/service/events/EventListenerConfiguration.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events;
+
+import java.util.Optional;
+
+public interface EventListenerConfiguration {
+  Optional<Long> bufferTime();
+
+  Optional<Integer> maxBufferSize();

Review Comment:
   In which unit? Bytes? Megabytes? Number of cached entries?



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListener.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Event listener that buffers in memory and then dumps to persistence. */
+@ApplicationScoped
+@Identifier("persistence-in-memory-buffer")
+public class InMemoryBufferPolarisPersistenceEventListener extends 
PolarisPersistenceEventListener {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(InMemoryBufferPolarisPersistenceEventListener.class);
+  MetaStoreManagerFactory metaStoreManagerFactory;
+  PolarisConfigurationStore polarisConfigurationStore;
+  Clock clock;

Review Comment:
   ```suggestion
     private final Clock clock;
   ```



##########
runtime/service/src/main/java/org/apache/polaris/service/quarkus/config/ProductionReadinessChecks.java:
##########
@@ -42,8 +42,8 @@
 import org.apache.polaris.service.context.DefaultRealmContextResolver;
 import org.apache.polaris.service.context.RealmContextResolver;
 import org.apache.polaris.service.context.TestRealmContextResolver;
-import org.apache.polaris.service.events.PolarisEventListener;
-import org.apache.polaris.service.events.TestPolarisEventListener;
+import org.apache.polaris.service.events.listeners.PolarisEventListener;
+import org.apache.polaris.service.events.listeners.TestPolarisEventListener;
 import 
org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
 import 
org.apache.polaris.service.quarkus.auth.QuarkusAuthenticationConfiguration;
 import org.eclipse.microprofile.config.Config;

Review Comment:
   I just noticed: the production readiness check is displaying an incorrect 
property name: it should be `polaris.event-listener.type` instead of 
`polaris.events.type`.



##########
service/common/src/main/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.events.listeners;
+
+import com.google.common.collect.Streams;
+import io.smallrye.common.annotation.Identifier;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.ws.rs.core.SecurityContext;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.service.events.AfterTableCommitedEvent;
+import org.apache.polaris.service.events.AfterTableRefreshedEvent;
+import org.apache.polaris.service.events.AfterTaskAttemptedEvent;
+import org.apache.polaris.service.events.AfterViewCommitedEvent;
+import org.apache.polaris.service.events.AfterViewRefreshedEvent;
+import org.apache.polaris.service.events.BeforeRequestRateLimitedEvent;
+import org.apache.polaris.service.events.BeforeTableCommitedEvent;
+import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
+import org.apache.polaris.service.events.BeforeTaskAttemptedEvent;
+import org.apache.polaris.service.events.BeforeViewCommitedEvent;
+import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
+import org.apache.polaris.service.events.PolarisEvent;
+
+/** Event listener that stores all emitted events forever. Not recommended for 
use in production. */
+@ApplicationScoped
+@Identifier("test")
+public class TestPolarisEventListener extends PolarisEventListener {

Review Comment:
   I know this wasn't introduced in this PR, but I'm wondering what usage a 
Polaris user could possibly make of this listener? The default one is `no-op` 
which imho is fine. If this one is only used in our own test suite, and has no 
value for users, could we move it to the `src/test` or `src/testFixtures` 
folder?



##########
service/common/src/main/java/org/apache/polaris/service/config/DefaultConfigurationStore.java:
##########
@@ -47,7 +46,10 @@ public DefaultConfigurationStore(
   }
 
   @Override
-  public <T> @Nullable T getConfiguration(@Nonnull RealmContext realmContext, 
String configName) {
+  public <T> @Nullable T getConfiguration(RealmContext realmContext, String 
configName) {
+    if (realmContext == null) {

Review Comment:
   Unrelated change?



##########
service/common/src/test/java/org/apache/polaris/service/events/listeners/InMemoryBufferPolarisPersistenceEventListenerTest.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.events.listeners;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEvent;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.service.events.EventListenerConfiguration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.threeten.extra.MutableClock;
+
+public class InMemoryBufferPolarisPersistenceEventListenerTest {
+  private InMemoryBufferPolarisPersistenceEventListener eventListener;
+  private PolarisConfigurationStore configurationStore;
+  private BasePersistence basePersistence;
+  private MutableClock clock;
+  private CallContext callContext;
+
+  private static final int CONFIG_MAX_BUFFER_SIZE = 5;
+  private static final long CONFIG_TIME_TO_FLUSH_IN_MS = 500;
+
+  @BeforeEach
+  public void setUp() {
+    callContext = Mockito.mock(CallContext.class);
+    basePersistence = mock(BasePersistence.class);
+    Supplier basePersistenceSupplier = () -> basePersistence;
+    MetaStoreManagerFactory metaStoreManagerFactory = 
Mockito.mock(MetaStoreManagerFactory.class);
+    when(metaStoreManagerFactory.getOrCreateSessionSupplier(Mockito.any()))
+        .thenReturn(basePersistenceSupplier);
+
+    EventListenerConfiguration eventListenerConfiguration =
+        Mockito.mock(EventListenerConfiguration.class);
+    when(eventListenerConfiguration.maxBufferSize())
+        .thenReturn(Optional.of(CONFIG_MAX_BUFFER_SIZE));
+    when(eventListenerConfiguration.bufferTime())
+        .thenReturn(Optional.of(CONFIG_TIME_TO_FLUSH_IN_MS));
+
+    clock =
+        MutableClock.of(
+            Instant.ofEpochSecond(0), ZoneOffset.UTC); // Use 0 Epoch Time to 
make it easier to test
+
+    eventListener =
+        new InMemoryBufferPolarisPersistenceEventListener(
+            metaStoreManagerFactory, configurationStore, clock, 
eventListenerConfiguration);
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterConfiguredTime() {
+    String realmId = "realm1";
+    List<PolarisEvent> eventsAddedToBuffer = 
addEventsWithoutTriggeringFlush(realmId);
+
+    // Push clock forwards to flush the buffer
+    clock.add(CONFIG_TIME_TO_FLUSH_IN_MS * 2, ChronoUnit.MILLIS);
+    eventListener.checkAndFlushBufferIfNecessary(realmId);
+    verify(basePersistence, times(1)).writeEvents(eq(eventsAddedToBuffer));
+  }
+
+  @Test
+  public void testAddToBufferFlushesAfterMaxEvents() {

Review Comment:
   Suggestion: add a test that exercises concurrent thread access.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to